grandpa: use periodic neighbor packet sender (#3594)

This commit is contained in:
André Silva
2019-09-11 17:08:15 +01:00
committed by Gavin Wood
parent d908f32c30
commit 433d752831
2 changed files with 36 additions and 21 deletions
@@ -328,18 +328,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
self.validator.note_set(
set_id,
voters.voters().iter().map(|(v, _)| v.clone()).collect(),
|to, neighbor| self.service.send_message(
to,
GossipMessage::<B>::from(neighbor).encode()
),
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
self.validator.note_round(
round,
|to, neighbor| self.service.send_message(
to,
GossipMessage::<B>::from(neighbor).encode()
),
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
}
@@ -455,18 +449,25 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
self.validator.note_set(
set_id,
voters.voters().iter().map(|(v, _)| v.clone()).collect(),
|to, neighbor| self.service.send_message(to, GossipMessage::<B>::from(neighbor).encode()),
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
let service = self.service.clone();
let topic = global_topic::<B>(set_id.0);
let incoming = incoming_global(service, topic, voters, self.validator.clone());
let incoming = incoming_global(
service,
topic,
voters,
self.validator.clone(),
self.neighbor_sender.clone(),
);
let outgoing = CommitsOut::<B, N>::new(
self.service.clone(),
set_id.0,
is_voter,
self.validator.clone(),
self.neighbor_sender.clone(),
);
let outgoing = outgoing.with(|out| {
@@ -483,6 +484,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
) -> impl Stream<Item = CommunicationIn<B>, Error = Error> {
let process_commit = move |
msg: FullCommitMessage<B>,
@@ -520,6 +522,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
let finalized_number = commit.target_number;
let gossip_validator = gossip_validator.clone();
let service = service.clone();
let neighbor_sender = neighbor_sender.clone();
let cb = move |outcome| match outcome {
voter::CommitProcessingOutcome::Good(_) => {
// if it checks out, gossip it. not accounting for
@@ -527,10 +530,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
// finalized number.
gossip_validator.note_commit_finalized(
finalized_number,
|to, neighbor_msg| service.send_message(
to,
GossipMessage::<B>::from(neighbor_msg).encode(),
),
|to, neighbor| neighbor_sender.send(to, neighbor),
);
service.gossip_message(topic, notification.message.clone(), false);
@@ -915,6 +915,7 @@ struct CommitsOut<Block: BlockT, N: Network<Block>> {
set_id: SetId,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
neighbor_sender: periodic::NeighborPacketSender<Block>,
}
impl<Block: BlockT, N: Network<Block>> CommitsOut<Block, N> {
@@ -924,12 +925,14 @@ impl<Block: BlockT, N: Network<Block>> CommitsOut<Block, N> {
set_id: SetIdNumber,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
neighbor_sender: periodic::NeighborPacketSender<Block>,
) -> Self {
CommitsOut {
network,
set_id: SetId(set_id),
is_voter,
gossip_validator,
neighbor_sender,
}
}
}
@@ -972,10 +975,7 @@ impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> {
// before gossiping
self.gossip_validator.note_commit_finalized(
commit.target_number,
|to, neighbor| self.network.send_message(
to,
GossipMessage::<Block>::from(neighbor).encode(),
),
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
self.network.gossip_message(topic, message.encode(), false);
@@ -22,7 +22,7 @@ use futures::sync::mpsc;
use sr_primitives::traits::{NumberFor, Block as BlockT};
use network::PeerId;
use tokio_timer::Delay;
use log::warn;
use log::{debug, warn};
use codec::Encode;
use std::time::{Instant, Duration};
@@ -35,7 +35,22 @@ fn rebroadcast_instant() -> Instant {
}
/// A sender used to send neighbor packets to a background job.
pub(super) type NeighborPacketSender<B> = mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>;
#[derive(Clone)]
pub(super) struct NeighborPacketSender<B: BlockT>(
mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
);
impl<B: BlockT> NeighborPacketSender<B> {
pub fn send(
&self,
who: Vec<network::PeerId>,
neighbor_packet: NeighborPacket<NumberFor<B>>,
) {
if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) {
debug!(target: "afg", "Failed to send neighbor packet: {:?}", err);
}
}
}
/// Does the work of sending neighbor packets, asynchronously.
///
@@ -89,5 +104,5 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
}
});
(work, tx)
(work, NeighborPacketSender(tx))
}