mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 02:51:01 +00:00
client/finality-grandpa: Reintegrate periodic neighbor packet worker (#4631)
The `NeighborPacketWorker` within `client/finality-grandpa` does two things: 1. It receives neighbor packets from components within `client/finality-grandpa`, sends them down to the `GossipEngine` in order for neighboring nodes to receive. 2. It periodically sends out the most recent neighbor packet to the `GossipEngine`. In order to send out packets it had a clone to a `GossipEgine` within an atomic reference counter and a mutex. The `NeighborPacketWorker` was then spawned onto its own asynchronous task. Instead of running in its own task, this patch reintegrates the `NeighborPacketWorker` into the main `client/finality-grandpa` task not requiring the `NeighborPacketWorker` to own a clone of the `GossipEngine`. The greater picture This is a tiny change within a greater refactoring. The overall goal is to **simplify** how finality-grandpa interacts with the network and to **reduce** the amount of **unbounded channels** within the logic. Why no unbounded channels: Bounding channels is needed for backpressure and proper scheduling. With unbounded channels there is no way of telling the producer side to slow down for the consumer side to catch up. Rephrased, there is no way for the scheduler to know when to favour the consumer task over the producer task on a crowded channel and the other way round for an empty channel. Reducing the amount of shared ownership simplifies the logic and enables one to use async-await syntax-suggar, given that one does not need to hold a lock across poll invocations. Using async-await enables one to use bounded channels without complex logic.
This commit is contained in:
@@ -27,13 +27,18 @@
|
||||
//! In the future, there will be a fallback for allowing sending the same message
|
||||
//! under certain conditions that are used to un-stick the protocol.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{prelude::*, future::Executor as _, sync::mpsc};
|
||||
use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _};
|
||||
use futures03::{
|
||||
compat::Compat,
|
||||
stream::StreamExt,
|
||||
future::{Future as Future03, FutureExt as _, TryFutureExt as _},
|
||||
};
|
||||
use log::{debug, trace};
|
||||
use parking_lot::Mutex;
|
||||
use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}};
|
||||
|
||||
use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
|
||||
use finality_grandpa::{voter, voter_set::VoterSet};
|
||||
use log::{debug, trace};
|
||||
use sc_network::{NetworkService, ReputationChange};
|
||||
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
|
||||
use parity_scale_codec::{Encode, Decode};
|
||||
@@ -134,9 +139,22 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
|
||||
service: N,
|
||||
gossip_engine: GossipEngine<B>,
|
||||
validator: Arc<GossipValidator<B>>,
|
||||
|
||||
/// Sender side of the neighbor packet channel.
|
||||
///
|
||||
/// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to
|
||||
/// the underlying `GossipEngine`.
|
||||
neighbor_sender: periodic::NeighborPacketSender<B>,
|
||||
|
||||
/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
|
||||
//
|
||||
// NetworkBridge is required to be clonable, thus one needs to be able to clone its children,
|
||||
// thus one has to wrap neighor_packet_worker with an Arc Mutex.
|
||||
neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
/// Create a new NetworkBridge to the given NetworkService. Returns the service
|
||||
/// handle.
|
||||
@@ -195,14 +213,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
}
|
||||
}
|
||||
|
||||
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone());
|
||||
let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
|
||||
let reporting_job = report_stream.consume(gossip_engine.clone());
|
||||
|
||||
let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender };
|
||||
let bridge = NetworkBridge {
|
||||
service,
|
||||
gossip_engine,
|
||||
validator,
|
||||
neighbor_sender: neighbor_packet_sender,
|
||||
neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
|
||||
};
|
||||
|
||||
let executor = Compat::new(executor);
|
||||
executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa rebroadcast job task");
|
||||
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa reporting job task");
|
||||
|
||||
@@ -391,6 +413,21 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> {
|
||||
type Output = Result<(), Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
|
||||
loop {
|
||||
match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) {
|
||||
None => return Poll03::Ready(
|
||||
Err(Error::Network("NeighborPacketWorker stream closed.".into()))
|
||||
),
|
||||
Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn incoming_global<B: BlockT>(
|
||||
mut gossip_engine: GossipEngine<B>,
|
||||
topic: B::Hash,
|
||||
@@ -530,6 +567,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
|
||||
gossip_engine: self.gossip_engine.clone(),
|
||||
validator: Arc::clone(&self.validator),
|
||||
neighbor_sender: self.neighbor_sender.clone(),
|
||||
neighbor_packet_worker: self.neighbor_packet_worker.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user