mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-06 18:28:03 +00:00
grandpa: remove the periodic block announcer (#4062)
* grandpa: remove the periodic block announcer * grandpa: remove periodic block announcer test
This commit is contained in:
@@ -276,7 +276,6 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
|
||||
service: N,
|
||||
validator: Arc<GossipValidator<B>>,
|
||||
neighbor_sender: periodic::NeighborPacketSender<B>,
|
||||
announce_sender: periodic::BlockAnnounceSender<B>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
@@ -341,10 +340,9 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
}
|
||||
|
||||
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
|
||||
let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone());
|
||||
let reporting_job = report_stream.consume(service.clone());
|
||||
|
||||
let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender };
|
||||
let bridge = NetworkBridge { service, validator, neighbor_sender };
|
||||
|
||||
let startup_work = futures::future::lazy(move || {
|
||||
// lazily spawn these jobs onto their own tasks. the lazy future has access
|
||||
@@ -352,8 +350,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
let mut executor = tokio_executor::DefaultExecutor::current();
|
||||
executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa rebroadcast job task");
|
||||
executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa block announce job task");
|
||||
executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa reporting job task");
|
||||
Ok(())
|
||||
@@ -470,7 +466,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
network: self.service.clone(),
|
||||
locals,
|
||||
sender: tx,
|
||||
announce_sender: self.announce_sender.clone(),
|
||||
has_voted,
|
||||
};
|
||||
|
||||
@@ -676,7 +671,6 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
|
||||
service: self.service.clone(),
|
||||
validator: Arc::clone(&self.validator),
|
||||
neighbor_sender: self.neighbor_sender.clone(),
|
||||
announce_sender: self.announce_sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -723,7 +717,6 @@ struct OutgoingMessages<Block: BlockT, N: Network<Block>> {
|
||||
set_id: SetIdNumber,
|
||||
locals: Option<(AuthorityPair, AuthorityId)>,
|
||||
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
|
||||
announce_sender: periodic::BlockAnnounceSender<Block>,
|
||||
network: N,
|
||||
has_voted: HasVoted<Block>,
|
||||
}
|
||||
@@ -781,8 +774,8 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
|
||||
"block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
|
||||
);
|
||||
|
||||
// send the target block hash to the background block announcer
|
||||
self.announce_sender.send(target_hash, Vec::new());
|
||||
// announce the block we voted on to our peers.
|
||||
self.network.announce(target_hash, Vec::new());
|
||||
|
||||
// propagate the message to peers
|
||||
let topic = round_topic::<Block>(self.round, self.set_id);
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
//! Periodic rebroadcast of neighbor packets.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
use codec::Encode;
|
||||
@@ -32,11 +31,6 @@ use super::{gossip::{NeighborPacket, GossipMessage}, Network};
|
||||
// how often to rebroadcast, if no other
|
||||
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
|
||||
|
||||
/// The number of block hashes that we have previously voted on that we should
|
||||
/// keep around for announcement. The current value should be enough for 3
|
||||
/// rounds assuming we have prevoted and precommited on different blocks.
|
||||
const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6;
|
||||
|
||||
fn rebroadcast_instant() -> Instant {
|
||||
Instant::now() + REBROADCAST_AFTER
|
||||
}
|
||||
@@ -114,139 +108,3 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
|
||||
|
||||
(work, NeighborPacketSender(tx))
|
||||
}
|
||||
|
||||
/// A background worker for performing block announcements.
|
||||
struct BlockAnnouncer<B: BlockT, N> {
|
||||
net: N,
|
||||
block_rx: mpsc::UnboundedReceiver<(B::Hash, Vec<u8>)>,
|
||||
latest_voted_blocks: VecDeque<B::Hash>,
|
||||
reannounce_after: Duration,
|
||||
delay: Delay,
|
||||
}
|
||||
|
||||
/// A background worker for announcing block hashes to peers. The worker keeps
|
||||
/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these
|
||||
/// blocks to all peers if no new blocks to announce are noted (i.e. presumably
|
||||
/// GRANDPA progress is stalled).
|
||||
pub(super) fn block_announce_worker<B: BlockT, N: Network<B>>(net: N) -> (
|
||||
impl Future<Item = (), Error = ()>,
|
||||
BlockAnnounceSender<B>,
|
||||
) {
|
||||
block_announce_worker_aux(net, REBROADCAST_AFTER)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn block_announce_worker_with_delay<B: BlockT, N: Network<B>>(
|
||||
net: N,
|
||||
reannounce_after: Duration,
|
||||
) -> (
|
||||
impl Future<Item = (), Error = ()>,
|
||||
BlockAnnounceSender<B>,
|
||||
) {
|
||||
block_announce_worker_aux(net, reannounce_after)
|
||||
}
|
||||
|
||||
fn block_announce_worker_aux<B: BlockT, N: Network<B>>(
|
||||
net: N,
|
||||
reannounce_after: Duration,
|
||||
) -> (
|
||||
impl Future<Item = (), Error = ()>,
|
||||
BlockAnnounceSender<B>,
|
||||
) {
|
||||
let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE);
|
||||
|
||||
let (block_tx, block_rx) = mpsc::unbounded();
|
||||
|
||||
let announcer = BlockAnnouncer {
|
||||
net,
|
||||
block_rx,
|
||||
latest_voted_blocks,
|
||||
reannounce_after,
|
||||
delay: Delay::new(Instant::now() + reannounce_after),
|
||||
};
|
||||
|
||||
(announcer, BlockAnnounceSender(block_tx))
|
||||
}
|
||||
|
||||
|
||||
impl<B: BlockT, N> BlockAnnouncer<B, N> {
|
||||
fn note_block(&mut self, block: B::Hash) -> bool {
|
||||
if !self.latest_voted_blocks.contains(&block) {
|
||||
if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE {
|
||||
self.latest_voted_blocks.pop_front();
|
||||
}
|
||||
|
||||
self.latest_voted_blocks.push_back(block);
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_delay(&mut self) {
|
||||
self.delay.reset(Instant::now() + self.reannounce_after);
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> Future for BlockAnnouncer<B, N> {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
// note any new blocks to announce and announce them
|
||||
loop {
|
||||
match self.block_rx.poll().expect("unbounded receivers do not error; qed") {
|
||||
Async::Ready(None) => return Ok(Async::Ready(())),
|
||||
Async::Ready(Some(block)) => {
|
||||
if self.note_block(block.0) {
|
||||
self.net.announce(block.0, block.1);
|
||||
self.reset_delay();
|
||||
}
|
||||
},
|
||||
Async::NotReady => break,
|
||||
}
|
||||
}
|
||||
|
||||
// check the reannouncement delay timer, has to be done in a loop
|
||||
// because it needs to be polled after re-scheduling.
|
||||
loop {
|
||||
match self.delay.poll() {
|
||||
Err(e) => {
|
||||
warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e);
|
||||
self.reset_delay();
|
||||
},
|
||||
// after the delay fires announce all blocks that we have
|
||||
// stored. note that this only happens if we don't receive any
|
||||
// new blocks above for the duration of `reannounce_after`.
|
||||
Ok(Async::Ready(())) => {
|
||||
self.reset_delay();
|
||||
|
||||
debug!(
|
||||
target: "afg",
|
||||
"Re-announcing latest voted blocks due to lack of progress: {:?}",
|
||||
self.latest_voted_blocks,
|
||||
);
|
||||
|
||||
for block in self.latest_voted_blocks.iter() {
|
||||
self.net.announce(*block, Vec::new());
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A sender used to send block hashes to announce to a background job.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct BlockAnnounceSender<B: BlockT>(mpsc::UnboundedSender<(B::Hash, Vec<u8>)>);
|
||||
|
||||
impl<B: BlockT> BlockAnnounceSender<B> {
|
||||
/// Send a block hash for the background worker to announce.
|
||||
pub fn send(&self, block: B::Hash, associated_data: Vec<u8>) {
|
||||
if let Err(err) = self.0.unbounded_send((block, associated_data)) {
|
||||
debug!(target: "afg", "Failed to send block to background announcer: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -504,79 +504,3 @@ fn peer_with_higher_view_leads_to_catch_up_request() {
|
||||
|
||||
current_thread::block_on_all(test).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn periodically_reannounce_voted_blocks_on_stall() {
|
||||
use futures::try_ready;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
let (tester, net) = make_test_network();
|
||||
let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay(
|
||||
net,
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
|
||||
let hashes = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future<Item = Tester, Error = ()> {
|
||||
struct WaitAll {
|
||||
remaining_hashes: Arc<Mutex<HashSet<Hash>>>,
|
||||
events_fut: Box<dyn Future<Item = Tester, Error = ()>>,
|
||||
}
|
||||
|
||||
impl Future for WaitAll {
|
||||
type Item = Tester;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let tester = try_ready!(self.events_fut.poll());
|
||||
|
||||
if self.remaining_hashes.lock().is_empty() {
|
||||
return Ok(Async::Ready(tester));
|
||||
}
|
||||
|
||||
let remaining_hashes = self.remaining_hashes.clone();
|
||||
self.events_fut = Box::new(tester.filter_network_events(move |event| match event {
|
||||
Event::Announce(h) =>
|
||||
remaining_hashes.lock().remove(&h) || panic!("unexpected announce"),
|
||||
_ => false,
|
||||
}));
|
||||
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
|
||||
WaitAll {
|
||||
remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())),
|
||||
events_fut: Box::new(futures::future::ok(tester)),
|
||||
}
|
||||
}
|
||||
|
||||
let test = tester
|
||||
.and_then(move |tester| {
|
||||
current_thread::spawn(announce_worker);
|
||||
Ok(tester)
|
||||
})
|
||||
.and_then(|tester| {
|
||||
// announce 12 blocks
|
||||
for _ in 0..=12 {
|
||||
let hash = Hash::random();
|
||||
hashes.lock().push(hash);
|
||||
announce_sender.send(hash, Vec::new());
|
||||
}
|
||||
|
||||
// we should see an event for each of those announcements
|
||||
wait_all(tester, &hashes.lock())
|
||||
})
|
||||
.and_then(|tester| {
|
||||
// after a period of inactivity we should see the last
|
||||
// `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast
|
||||
wait_all(tester, &hashes.lock()[7..=12])
|
||||
});
|
||||
|
||||
let mut runtime = current_thread::Runtime::new().unwrap();
|
||||
runtime.block_on(test).unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user