mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 12:17:58 +00:00
client/finality-grandpa: Reintegrate gossip validator report stream (#4661)
* client/finality-grandpa: Reintegrate gossip validator report stream
The `finality-grandpa` `GossipValidator` is called by the `GossipEngine`
in a synchronous fashion on each gossip message. Its main task is to
decide whether to gossip the given message on, or whether to drop it.
In addition it also updates the reputation of a node's peers based on
the incoming gossip messages. To do so it needs to be able to report the
reputation change which it does through an unbounded channel (in order
to stay synchronous).
Previously the receiving side of this channel would be handled by a new
task, polling the channel and forwarding the changes to a clone of the
`GossipEngine` that it would own.
Instead the receiver of the above mentioned channel is now being polled
by the `NetworkBridge` within its `Future::poll` implementation.
Reputation changes are reported through the already existing
`GossipEngine` instance within `NetworkBridge`.
For details on the overall goal, see d9837d7dd.
* client/finality-grandpa: Remove exit future from test NetworkBridges
This commit is contained in:
@@ -83,15 +83,15 @@
|
||||
//! We only send polite messages to peers,
|
||||
|
||||
use sp_runtime::traits::{NumberFor, Block as BlockT, Zero};
|
||||
use sc_network_gossip::{GossipEngine, MessageIntent, ValidatorContext};
|
||||
use sc_network_gossip::{MessageIntent, ValidatorContext};
|
||||
use sc_network::{config::Roles, PeerId, ReputationChange};
|
||||
use parity_scale_codec::{Encode, Decode};
|
||||
use sp_finality_grandpa::AuthorityId;
|
||||
|
||||
use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
|
||||
use log::{trace, debug, warn};
|
||||
use log::{trace, debug};
|
||||
use futures::prelude::*;
|
||||
use futures::sync::mpsc;
|
||||
use futures03::channel::mpsc;
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
use crate::{environment, CatchUp, CompactCommit, SignedMessage};
|
||||
@@ -1178,7 +1178,7 @@ impl<Block: BlockT> GossipValidator<Block> {
|
||||
pub(super) fn new(
|
||||
config: crate::Config,
|
||||
set_state: environment::SharedVoterSetState<Block>,
|
||||
) -> (GossipValidator<Block>, ReportStream) {
|
||||
) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let val = GossipValidator {
|
||||
inner: parking_lot::RwLock::new(Inner::new(config)),
|
||||
@@ -1186,7 +1186,7 @@ impl<Block: BlockT> GossipValidator<Block> {
|
||||
report_sender: tx,
|
||||
};
|
||||
|
||||
(val, ReportStream { reports: rx })
|
||||
(val, rx)
|
||||
}
|
||||
|
||||
/// Note a round in the current set has started.
|
||||
@@ -1445,57 +1445,9 @@ impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Bloc
|
||||
}
|
||||
}
|
||||
|
||||
struct PeerReport {
|
||||
who: PeerId,
|
||||
cost_benefit: ReputationChange,
|
||||
}
|
||||
|
||||
// wrapper around a stream of reports.
|
||||
#[must_use = "The report stream must be consumed"]
|
||||
pub(super) struct ReportStream {
|
||||
reports: mpsc::UnboundedReceiver<PeerReport>,
|
||||
}
|
||||
|
||||
impl ReportStream {
|
||||
/// Consume the report stream, converting it into a future that
|
||||
/// handles all reports.
|
||||
pub(super) fn consume<B>(self, net: GossipEngine<B>)
|
||||
-> impl Future<Item=(),Error=()> + Send + 'static
|
||||
where
|
||||
B: BlockT,
|
||||
{
|
||||
ReportingTask {
|
||||
reports: self.reports,
|
||||
net,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A future for reporting peers.
|
||||
#[must_use = "Futures do nothing unless polled"]
|
||||
struct ReportingTask<B: BlockT> {
|
||||
reports: mpsc::UnboundedReceiver<PeerReport>,
|
||||
net: GossipEngine<B>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> Future for ReportingTask<B> {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<(), ()> {
|
||||
loop {
|
||||
match self.reports.poll() {
|
||||
Err(_) => {
|
||||
warn!(target: "afg", "Report stream terminated unexpectedly");
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some(PeerReport { who, cost_benefit }))) =>
|
||||
self.net.report(who, cost_benefit),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(super) struct PeerReport {
|
||||
pub who: PeerId,
|
||||
pub cost_benefit: ReputationChange,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -27,11 +27,12 @@
|
||||
//! In the future, there will be a fallback for allowing sending the same message
|
||||
//! under certain conditions that are used to un-stick the protocol.
|
||||
|
||||
use futures::{prelude::*, future::Executor as _, sync::mpsc};
|
||||
use futures::{prelude::*, sync::mpsc};
|
||||
use futures03::{
|
||||
channel::mpsc as mpsc03,
|
||||
compat::Compat,
|
||||
future::{Future as Future03},
|
||||
stream::StreamExt,
|
||||
future::{Future as Future03, FutureExt as _, TryFutureExt as _},
|
||||
};
|
||||
use log::{debug, trace};
|
||||
use parking_lot::Mutex;
|
||||
@@ -52,7 +53,12 @@ use crate::{
|
||||
};
|
||||
use crate::environment::HasVoted;
|
||||
use gossip::{
|
||||
GossipMessage, FullCatchUpMessage, FullCommitMessage, VoteMessage, GossipValidator
|
||||
FullCatchUpMessage,
|
||||
FullCommitMessage,
|
||||
GossipMessage,
|
||||
GossipValidator,
|
||||
PeerReport,
|
||||
VoteMessage,
|
||||
};
|
||||
use sp_finality_grandpa::{
|
||||
AuthorityPair, AuthorityId, AuthoritySignature, SetId as SetIdNumber, RoundNumber,
|
||||
@@ -148,9 +154,18 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
|
||||
|
||||
/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
|
||||
//
|
||||
// NetworkBridge is required to be clonable, thus one needs to be able to clone its children,
|
||||
// thus one has to wrap neighor_packet_worker with an Arc Mutex.
|
||||
// `NetworkBridge` is required to be clonable, thus one needs to be able to clone its children,
|
||||
// thus one has to wrap neighor_packet_worker with an `Arc` `Mutex`.
|
||||
neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
|
||||
|
||||
/// Receiver side of the peer report stream populated by the gossip validator, forwarded to the
|
||||
/// gossip engine.
|
||||
//
|
||||
// `NetworkBridge` is required to be clonable, thus one needs to be able to clone its children,
|
||||
// thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is
|
||||
// just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer
|
||||
// channel implementation.
|
||||
gossip_validator_report_stream: Arc<Mutex<mpsc03::UnboundedReceiver<PeerReport>>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
|
||||
@@ -165,7 +180,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
config: crate::Config,
|
||||
set_state: crate::environment::SharedVoterSetState<B>,
|
||||
executor: &impl futures03::task::Spawn,
|
||||
on_exit: impl futures03::Future<Output = ()> + Clone + Send + Unpin + 'static,
|
||||
) -> Self {
|
||||
let (validator, report_stream) = GossipValidator::new(
|
||||
config,
|
||||
@@ -214,7 +228,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
}
|
||||
|
||||
let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
|
||||
let reporting_job = report_stream.consume(gossip_engine.clone());
|
||||
|
||||
let bridge = NetworkBridge {
|
||||
service,
|
||||
@@ -222,12 +235,9 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
validator,
|
||||
neighbor_sender: neighbor_packet_sender,
|
||||
neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
|
||||
gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)),
|
||||
};
|
||||
|
||||
let executor = Compat::new(executor);
|
||||
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
|
||||
.expect("failed to spawn grandpa reporting job task");
|
||||
|
||||
bridge
|
||||
}
|
||||
|
||||
@@ -418,13 +428,30 @@ impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> {
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
|
||||
loop {
|
||||
match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) {
|
||||
None => return Poll03::Ready(
|
||||
Err(Error::Network("NeighborPacketWorker stream closed.".into()))
|
||||
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
|
||||
Poll03::Ready(Some((to, packet))) => {
|
||||
self.gossip_engine.send_message(to, packet.encode());
|
||||
},
|
||||
Poll03::Ready(None) => return Poll03::Ready(
|
||||
Err(Error::Network("Neighbor packet worker stream closed.".into()))
|
||||
),
|
||||
Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()),
|
||||
Poll03::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
|
||||
Poll03::Ready(Some(PeerReport { who, cost_benefit })) => {
|
||||
self.gossip_engine.report(who, cost_benefit);
|
||||
},
|
||||
Poll03::Ready(None) => return Poll03::Ready(
|
||||
Err(Error::Network("Gossip validator report stream closed.".into()))
|
||||
),
|
||||
Poll03::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
Poll03::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,6 +595,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
|
||||
validator: Arc::clone(&self.validator),
|
||||
neighbor_sender: self.neighbor_sender.clone(),
|
||||
neighbor_packet_worker: self.neighbor_packet_worker.clone(),
|
||||
gossip_validator_report_stream: self.gossip_validator_report_stream.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,7 +172,6 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> (
|
||||
config(),
|
||||
voter_set_state(),
|
||||
executor,
|
||||
Exit,
|
||||
);
|
||||
|
||||
(
|
||||
|
||||
@@ -584,7 +584,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
|
||||
config.clone(),
|
||||
persistent_data.set_state.clone(),
|
||||
&executor,
|
||||
on_exit.clone(),
|
||||
);
|
||||
|
||||
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
|
||||
|
||||
@@ -178,7 +178,6 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
|
||||
config.clone(),
|
||||
persistent_data.set_state.clone(),
|
||||
&executor,
|
||||
on_exit.clone(),
|
||||
);
|
||||
|
||||
let observer_work = ObserverWork::new(
|
||||
|
||||
@@ -25,7 +25,7 @@ use sc_network_test::{
|
||||
use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
|
||||
use parking_lot::Mutex;
|
||||
use futures_timer::Delay;
|
||||
use futures03::{StreamExt as _, TryStreamExt as _};
|
||||
use futures03::TryStreamExt as _;
|
||||
use tokio::runtime::current_thread;
|
||||
use sp_keyring::Ed25519Keyring;
|
||||
use sc_client::LongestChain;
|
||||
@@ -1270,7 +1270,6 @@ fn voter_persists_its_votes() {
|
||||
config.clone(),
|
||||
set_state,
|
||||
&threads_pool,
|
||||
Exit,
|
||||
);
|
||||
|
||||
let (round_rx, round_tx) = network.round_communication(
|
||||
@@ -1675,7 +1674,6 @@ fn grandpa_environment_respects_voting_rules() {
|
||||
config.clone(),
|
||||
set_state.clone(),
|
||||
&threads_pool,
|
||||
Exit,
|
||||
);
|
||||
|
||||
Environment {
|
||||
|
||||
Reference in New Issue
Block a user