client/finality-grandpa: Reintegrate gossip validator report stream (#4661)

* client/finality-grandpa: Reintegrate gossip validator report stream

The `finality-grandpa` `GossipValidator` is called by the `GossipEngine`
in a synchronous fashion on each gossip message. Its main task is to
decide whether to gossip the given message on, or whether to drop it.

In addition it also updates the reputation of a node's peers based on
the incoming gossip messages. To do so it needs to be able to report the
reputation change which it does through an unbounded channel (in order
to stay synchronous).

Previously the receiving side of this channel would be handled by a new
task, polling the channel and forwarding the changes to a clone of the
`GossipEngine` that it would own.

Instead the receiver of the above mentioned channel is now being polled
by the `NetworkBridge` within its `Future::poll` implementation.
Reputation changes are reported through the already existing
`GossipEngine` instance within `NetworkBridge`.

For details on the overall goal, see d9837d7dd.

* client/finality-grandpa: Remove exit future from test NetworkBridges
This commit is contained in:
Max Inden
2020-01-17 18:28:32 +01:00
committed by Gavin Wood
parent 8c789806f2
commit 5f80929dce
6 changed files with 52 additions and 77 deletions
@@ -83,15 +83,15 @@
//! We only send polite messages to peers,
use sp_runtime::traits::{NumberFor, Block as BlockT, Zero};
use sc_network_gossip::{GossipEngine, MessageIntent, ValidatorContext};
use sc_network_gossip::{MessageIntent, ValidatorContext};
use sc_network::{config::Roles, PeerId, ReputationChange};
use parity_scale_codec::{Encode, Decode};
use sp_finality_grandpa::AuthorityId;
use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug, warn};
use log::{trace, debug};
use futures::prelude::*;
use futures::sync::mpsc;
use futures03::channel::mpsc;
use rand::seq::SliceRandom;
use crate::{environment, CatchUp, CompactCommit, SignedMessage};
@@ -1178,7 +1178,7 @@ impl<Block: BlockT> GossipValidator<Block> {
pub(super) fn new(
config: crate::Config,
set_state: environment::SharedVoterSetState<Block>,
) -> (GossipValidator<Block>, ReportStream) {
) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) {
let (tx, rx) = mpsc::unbounded();
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
@@ -1186,7 +1186,7 @@ impl<Block: BlockT> GossipValidator<Block> {
report_sender: tx,
};
(val, ReportStream { reports: rx })
(val, rx)
}
/// Note a round in the current set has started.
@@ -1445,57 +1445,9 @@ impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Bloc
}
}
struct PeerReport {
who: PeerId,
cost_benefit: ReputationChange,
}
// wrapper around a stream of reports.
#[must_use = "The report stream must be consumed"]
pub(super) struct ReportStream {
reports: mpsc::UnboundedReceiver<PeerReport>,
}
impl ReportStream {
/// Consume the report stream, converting it into a future that
/// handles all reports.
pub(super) fn consume<B>(self, net: GossipEngine<B>)
-> impl Future<Item=(),Error=()> + Send + 'static
where
B: BlockT,
{
ReportingTask {
reports: self.reports,
net,
}
}
}
/// A future for reporting peers.
#[must_use = "Futures do nothing unless polled"]
struct ReportingTask<B: BlockT> {
reports: mpsc::UnboundedReceiver<PeerReport>,
net: GossipEngine<B>,
}
impl<B: BlockT> Future for ReportingTask<B> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.reports.poll() {
Err(_) => {
warn!(target: "afg", "Report stream terminated unexpectedly");
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(PeerReport { who, cost_benefit }))) =>
self.net.report(who, cost_benefit),
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
}
pub(super) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}
#[cfg(test)]