mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
grandpa: reannounce voted blocks periodically (#3602)
* grandpa: reannounce latest voted blocks periodically * grandpa: add test for background block announcement * grandpa: configurable delay for background block announcer * grandpa: nits
This commit is contained in:
@@ -233,6 +233,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
|
||||
service: N,
|
||||
validator: Arc<GossipValidator<B>>,
|
||||
neighbor_sender: periodic::NeighborPacketSender<B>,
|
||||
announce_sender: periodic::BlockAnnounceSender<B>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
@@ -299,9 +300,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
}
|
||||
|
||||
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
|
||||
let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone());
|
||||
let reporting_job = report_stream.consume(service.clone());
|
||||
|
||||
let bridge = NetworkBridge { service, validator, neighbor_sender };
|
||||
let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender };
|
||||
|
||||
let startup_work = futures::future::lazy(move || {
|
||||
// lazily spawn these jobs onto their own tasks. the lazy future has access
|
||||
@@ -309,6 +311,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
let mut executor = tokio_executor::DefaultExecutor::current();
|
||||
executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa rebroadcast job task");
|
||||
executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa block announce job task");
|
||||
executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa reporting job task");
|
||||
Ok(())
|
||||
@@ -424,6 +428,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
network: self.service.clone(),
|
||||
locals,
|
||||
sender: tx,
|
||||
announce_sender: self.announce_sender.clone(),
|
||||
has_voted,
|
||||
};
|
||||
|
||||
@@ -616,6 +621,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
|
||||
service: self.service.clone(),
|
||||
validator: Arc::clone(&self.validator),
|
||||
neighbor_sender: self.neighbor_sender.clone(),
|
||||
announce_sender: self.announce_sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -662,6 +668,7 @@ struct OutgoingMessages<Block: BlockT, N: Network<Block>> {
|
||||
set_id: SetIdNumber,
|
||||
locals: Option<(AuthorityPair, AuthorityId)>,
|
||||
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
|
||||
announce_sender: periodic::BlockAnnounceSender<Block>,
|
||||
network: N,
|
||||
has_voted: HasVoted<Block>,
|
||||
}
|
||||
@@ -719,10 +726,10 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
|
||||
"block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
|
||||
);
|
||||
|
||||
// announce our block hash to peers and propagate the
|
||||
// message.
|
||||
self.network.announce(target_hash);
|
||||
// send the target block hash to the background block announcer
|
||||
self.announce_sender.send(target_hash);
|
||||
|
||||
// propagate the message to peers
|
||||
let topic = round_topic::<Block>(self.round, self.set_id);
|
||||
self.network.gossip_message(topic, message.encode(), false);
|
||||
|
||||
|
||||
@@ -16,20 +16,27 @@
|
||||
|
||||
//! Periodic rebroadcast of neighbor packets.
|
||||
|
||||
use super::{gossip::{NeighborPacket, GossipMessage}, Network};
|
||||
use std::collections::VecDeque;
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
use codec::Encode;
|
||||
use futures::prelude::*;
|
||||
use futures::sync::mpsc;
|
||||
use sr_primitives::traits::{NumberFor, Block as BlockT};
|
||||
use network::PeerId;
|
||||
use tokio_timer::Delay;
|
||||
use log::{debug, warn};
|
||||
use codec::Encode;
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use std::time::{Instant, Duration};
|
||||
use network::PeerId;
|
||||
use sr_primitives::traits::{NumberFor, Block as BlockT};
|
||||
use super::{gossip::{NeighborPacket, GossipMessage}, Network};
|
||||
|
||||
// how often to rebroadcast, if no other
|
||||
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
|
||||
|
||||
/// The number of block hashes that we have previously voted on that we should
|
||||
/// keep around for announcement. The current value should be enough for 3
|
||||
/// rounds assuming we have prevoted and precommited on different blocks.
|
||||
const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6;
|
||||
|
||||
fn rebroadcast_instant() -> Instant {
|
||||
Instant::now() + REBROADCAST_AFTER
|
||||
}
|
||||
@@ -41,6 +48,7 @@ pub(super) struct NeighborPacketSender<B: BlockT>(
|
||||
);
|
||||
|
||||
impl<B: BlockT> NeighborPacketSender<B> {
|
||||
/// Send a neighbor packet for the background worker to gossip to peers.
|
||||
pub fn send(
|
||||
&self,
|
||||
who: Vec<network::PeerId>,
|
||||
@@ -106,3 +114,142 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
|
||||
|
||||
(work, NeighborPacketSender(tx))
|
||||
}
|
||||
|
||||
/// A background worker for performing block announcements.
|
||||
struct BlockAnnouncer<B: BlockT, N> {
|
||||
net: N,
|
||||
block_rx: mpsc::UnboundedReceiver<B::Hash>,
|
||||
latest_voted_blocks: VecDeque<B::Hash>,
|
||||
reannounce_after: Duration,
|
||||
delay: Delay,
|
||||
}
|
||||
|
||||
/// A background worker for announcing block hashes to peers. The worker keeps
|
||||
/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these
|
||||
/// blocks to all peers if no new blocks to announce are noted (i.e. presumably
|
||||
/// GRANDPA progress is stalled).
|
||||
pub(super) fn block_announce_worker<B: BlockT, N: Network<B>>(net: N) -> (
|
||||
impl Future<Item = (), Error = ()>,
|
||||
BlockAnnounceSender<B>,
|
||||
) {
|
||||
block_announce_worker_aux(net, REBROADCAST_AFTER)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn block_announce_worker_with_delay<B: BlockT, N: Network<B>>(
|
||||
net: N,
|
||||
reannounce_after: Duration,
|
||||
) -> (
|
||||
impl Future<Item = (), Error = ()>,
|
||||
BlockAnnounceSender<B>,
|
||||
) {
|
||||
block_announce_worker_aux(net, reannounce_after)
|
||||
}
|
||||
|
||||
fn block_announce_worker_aux<B: BlockT, N: Network<B>>(
|
||||
net: N,
|
||||
reannounce_after: Duration,
|
||||
) -> (
|
||||
impl Future<Item = (), Error = ()>,
|
||||
BlockAnnounceSender<B>,
|
||||
) {
|
||||
let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE);
|
||||
|
||||
let (block_tx, block_rx) = mpsc::unbounded();
|
||||
|
||||
let announcer = BlockAnnouncer {
|
||||
net,
|
||||
block_rx,
|
||||
latest_voted_blocks,
|
||||
reannounce_after,
|
||||
delay: Delay::new(Instant::now() + reannounce_after),
|
||||
};
|
||||
|
||||
(announcer, BlockAnnounceSender(block_tx))
|
||||
}
|
||||
|
||||
|
||||
impl<B: BlockT, N> BlockAnnouncer<B, N> {
|
||||
fn note_block(&mut self, block: B::Hash) -> bool {
|
||||
if !self.latest_voted_blocks.contains(&block) {
|
||||
if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE {
|
||||
self.latest_voted_blocks.pop_front();
|
||||
}
|
||||
|
||||
self.latest_voted_blocks.push_back(block);
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_delay(&mut self) {
|
||||
self.delay.reset(Instant::now() + self.reannounce_after);
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> Future for BlockAnnouncer<B, N> {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
// note any new blocks to announce and announce them
|
||||
loop {
|
||||
match self.block_rx.poll().expect("unbounded receivers do not error; qed") {
|
||||
Async::Ready(None) => return Ok(Async::Ready(())),
|
||||
Async::Ready(Some(block)) => {
|
||||
if self.note_block(block) {
|
||||
self.net.announce(block);
|
||||
self.reset_delay();
|
||||
}
|
||||
},
|
||||
Async::NotReady => break,
|
||||
}
|
||||
}
|
||||
|
||||
// check the reannouncement delay timer, has to be done in a loop
|
||||
// because it needs to be polled after re-scheduling.
|
||||
loop {
|
||||
match self.delay.poll() {
|
||||
Err(e) => {
|
||||
warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e);
|
||||
self.reset_delay();
|
||||
},
|
||||
// after the delay fires announce all blocks that we have
|
||||
// stored. note that this only happens if we don't receive any
|
||||
// new blocks above for the duration of `reannounce_after`.
|
||||
Ok(Async::Ready(())) => {
|
||||
self.reset_delay();
|
||||
|
||||
debug!(
|
||||
target: "afg",
|
||||
"Re-announcing latest voted blocks due to lack of progress: {:?}",
|
||||
self.latest_voted_blocks,
|
||||
);
|
||||
|
||||
for block in self.latest_voted_blocks.iter() {
|
||||
self.net.announce(*block);
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A sender used to send block hashes to announce to a background job.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct BlockAnnounceSender<B: BlockT>(mpsc::UnboundedSender<B::Hash>);
|
||||
|
||||
impl<B: BlockT> BlockAnnounceSender<B> {
|
||||
/// Send a block hash for the background worker to announce.
|
||||
pub fn send(
|
||||
&self,
|
||||
block: B::Hash,
|
||||
) {
|
||||
if let Err(err) = self.0.unbounded_send(block) {
|
||||
debug!(target: "afg", "Failed to send block to background announcer: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -498,3 +498,79 @@ fn peer_with_higher_view_leads_to_catch_up_request() {
|
||||
|
||||
current_thread::block_on_all(test).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn periodically_reannounce_voted_blocks_on_stall() {
|
||||
use futures::try_ready;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
let (tester, net) = make_test_network();
|
||||
let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay(
|
||||
net,
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
|
||||
let hashes = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future<Item = Tester, Error = ()> {
|
||||
struct WaitAll {
|
||||
remaining_hashes: Arc<Mutex<HashSet<Hash>>>,
|
||||
events_fut: Box<dyn Future<Item = Tester, Error = ()>>,
|
||||
}
|
||||
|
||||
impl Future for WaitAll {
|
||||
type Item = Tester;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let tester = try_ready!(self.events_fut.poll());
|
||||
|
||||
if self.remaining_hashes.lock().is_empty() {
|
||||
return Ok(Async::Ready(tester));
|
||||
}
|
||||
|
||||
let remaining_hashes = self.remaining_hashes.clone();
|
||||
self.events_fut = Box::new(tester.filter_network_events(move |event| match event {
|
||||
Event::Announce(h) =>
|
||||
remaining_hashes.lock().remove(&h) || panic!("unexpected announce"),
|
||||
_ => false,
|
||||
}));
|
||||
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
|
||||
WaitAll {
|
||||
remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())),
|
||||
events_fut: Box::new(futures::future::ok(tester)),
|
||||
}
|
||||
}
|
||||
|
||||
let test = tester
|
||||
.and_then(move |tester| {
|
||||
current_thread::spawn(announce_worker);
|
||||
Ok(tester)
|
||||
})
|
||||
.and_then(|tester| {
|
||||
// announce 12 blocks
|
||||
for _ in 0..=12 {
|
||||
let hash = Hash::random();
|
||||
hashes.lock().push(hash);
|
||||
announce_sender.send(hash);
|
||||
}
|
||||
|
||||
// we should see an event for each of those announcements
|
||||
wait_all(tester, &hashes.lock())
|
||||
})
|
||||
.and_then(|tester| {
|
||||
// after a period of inactivity we should see the last
|
||||
// `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast
|
||||
wait_all(tester, &hashes.lock()[7..=12])
|
||||
});
|
||||
|
||||
let mut runtime = current_thread::Runtime::new().unwrap();
|
||||
runtime.block_on(test).unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user