diff --git a/substrate/core/finality-grandpa/src/communication/mod.rs b/substrate/core/finality-grandpa/src/communication/mod.rs index 732c14c1a9..ea78c3ea9d 100644 --- a/substrate/core/finality-grandpa/src/communication/mod.rs +++ b/substrate/core/finality-grandpa/src/communication/mod.rs @@ -328,18 +328,12 @@ impl> NetworkBridge { self.validator.note_set( set_id, voters.voters().iter().map(|(v, _)| v.clone()).collect(), - |to, neighbor| self.service.send_message( - to, - GossipMessage::::from(neighbor).encode() - ), + |to, neighbor| self.neighbor_sender.send(to, neighbor), ); self.validator.note_round( round, - |to, neighbor| self.service.send_message( - to, - GossipMessage::::from(neighbor).encode() - ), + |to, neighbor| self.neighbor_sender.send(to, neighbor), ); } @@ -455,18 +449,25 @@ impl> NetworkBridge { self.validator.note_set( set_id, voters.voters().iter().map(|(v, _)| v.clone()).collect(), - |to, neighbor| self.service.send_message(to, GossipMessage::::from(neighbor).encode()), + |to, neighbor| self.neighbor_sender.send(to, neighbor), ); let service = self.service.clone(); let topic = global_topic::(set_id.0); - let incoming = incoming_global(service, topic, voters, self.validator.clone()); + let incoming = incoming_global( + service, + topic, + voters, + self.validator.clone(), + self.neighbor_sender.clone(), + ); let outgoing = CommitsOut::::new( self.service.clone(), set_id.0, is_voter, self.validator.clone(), + self.neighbor_sender.clone(), ); let outgoing = outgoing.with(|out| { @@ -483,6 +484,7 @@ fn incoming_global>( topic: B::Hash, voters: Arc>, gossip_validator: Arc>, + neighbor_sender: periodic::NeighborPacketSender, ) -> impl Stream, Error = Error> { let process_commit = move | msg: FullCommitMessage, @@ -520,6 +522,7 @@ fn incoming_global>( let finalized_number = commit.target_number; let gossip_validator = gossip_validator.clone(); let service = service.clone(); + let neighbor_sender = neighbor_sender.clone(); let cb = move |outcome| match outcome { voter::CommitProcessingOutcome::Good(_) => { // if it checks out, gossip it. not accounting for @@ -527,10 +530,7 @@ fn incoming_global>( // finalized number. gossip_validator.note_commit_finalized( finalized_number, - |to, neighbor_msg| service.send_message( - to, - GossipMessage::::from(neighbor_msg).encode(), - ), + |to, neighbor| neighbor_sender.send(to, neighbor), ); service.gossip_message(topic, notification.message.clone(), false); @@ -915,6 +915,7 @@ struct CommitsOut> { set_id: SetId, is_voter: bool, gossip_validator: Arc>, + neighbor_sender: periodic::NeighborPacketSender, } impl> CommitsOut { @@ -924,12 +925,14 @@ impl> CommitsOut { set_id: SetIdNumber, is_voter: bool, gossip_validator: Arc>, + neighbor_sender: periodic::NeighborPacketSender, ) -> Self { CommitsOut { network, set_id: SetId(set_id), is_voter, gossip_validator, + neighbor_sender, } } } @@ -972,10 +975,7 @@ impl> Sink for CommitsOut { // before gossiping self.gossip_validator.note_commit_finalized( commit.target_number, - |to, neighbor| self.network.send_message( - to, - GossipMessage::::from(neighbor).encode(), - ), + |to, neighbor| self.neighbor_sender.send(to, neighbor), ); self.network.gossip_message(topic, message.encode(), false); diff --git a/substrate/core/finality-grandpa/src/communication/periodic.rs b/substrate/core/finality-grandpa/src/communication/periodic.rs index 7265fe34a2..54383bb63d 100644 --- a/substrate/core/finality-grandpa/src/communication/periodic.rs +++ b/substrate/core/finality-grandpa/src/communication/periodic.rs @@ -22,7 +22,7 @@ use futures::sync::mpsc; use sr_primitives::traits::{NumberFor, Block as BlockT}; use network::PeerId; use tokio_timer::Delay; -use log::warn; +use log::{debug, warn}; use codec::Encode; use std::time::{Instant, Duration}; @@ -35,7 +35,22 @@ fn rebroadcast_instant() -> Instant { } /// A sender used to send neighbor packets to a background job. -pub(super) type NeighborPacketSender = mpsc::UnboundedSender<(Vec, NeighborPacket>)>; +#[derive(Clone)] +pub(super) struct NeighborPacketSender( + mpsc::UnboundedSender<(Vec, NeighborPacket>)> +); + +impl NeighborPacketSender { + pub fn send( + &self, + who: Vec, + neighbor_packet: NeighborPacket>, + ) { + if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) { + debug!(target: "afg", "Failed to send neighbor packet: {:?}", err); + } + } +} /// Does the work of sending neighbor packets, asynchronously. /// @@ -89,5 +104,5 @@ pub(super) fn neighbor_packet_worker(net: N) -> ( } }); - (work, tx) + (work, NeighborPacketSender(tx)) }