From 9ae1316c86559c5c4a341b028594b725ba53d876 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 21 Jan 2019 16:33:07 -0300 Subject: [PATCH] a wrapper network for rebroadcasting GRANDPA messages periodically (#1513) --- .../finality-grandpa/src/communication.rs | 156 +++++++++++++++++- substrate/core/finality-grandpa/src/lib.rs | 15 +- substrate/core/finality-grandpa/src/tests.rs | 2 +- 3 files changed, 168 insertions(+), 5 deletions(-) diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs index b8435cef6c..32b2cc3a76 100644 --- a/substrate/core/finality-grandpa/src/communication.rs +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -22,6 +22,7 @@ use futures::sync::mpsc; use codec::{Encode, Decode}; use substrate_primitives::{ed25519, Ed25519AuthorityId}; use runtime_primitives::traits::Block as BlockT; +use tokio::timer::Interval; use {Error, Network, Message, SignedMessage, Commit, CompactCommit}; use std::collections::HashMap; @@ -31,6 +32,159 @@ fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec (message, round, set_id).encode() } +enum Broadcast { + // set_id, round, encoded commit. + Commit(u64, u64, Vec), + // set_id, round, encoded signed message. + Message(u64, u64, Vec), +} + +impl Broadcast { + fn set_id(&self) -> u64 { + match *self { + Broadcast::Commit(s, _, _) => s, + Broadcast::Message(s, _, _) => s, + } + } +} + +/// Produces a future that should be run in the background and proxies +/// and rebroadcasts messages. +pub(crate) fn rebroadcasting_network(network: N) -> (BroadcastWorker, BroadcastHandle) { + use std::time::Duration; + const REBROADCAST_PERIOD: Duration = Duration::from_secs(60); + + let (tx, rx) = mpsc::unbounded(); + + ( + BroadcastWorker { + interval: Interval::new_interval(REBROADCAST_PERIOD), + set_id: 0, // will be overwritten on first item to broadcast. + last_commit: None, + round_messages: (0, Vec::new()), + network: network.clone(), + incoming_broadcast: rx, + }, + BroadcastHandle { + relay: tx, + network, + }, + ) +} + +// A worker which broadcasts messages to the background, potentially +// rebroadcasting. +#[must_use = "network rebroadcast future must be driven to completion"] +pub(crate) struct BroadcastWorker { + interval: Interval, + set_id: u64, + last_commit: Option<(u64, Vec)>, + round_messages: (u64, Vec>), + network: N, + incoming_broadcast: mpsc::UnboundedReceiver, +} + +/// A handle used by communication work to broadcast to network. +#[derive(Clone)] +pub(crate) struct BroadcastHandle { + relay: mpsc::UnboundedSender, + network: N, +} + +impl Future for BroadcastWorker { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll<(), Error> { + { + let mut rebroadcast = false; + loop { + match self.interval.poll().map_err(Error::Timer)? { + Async::NotReady => break, + Async::Ready(_) => { rebroadcast = true; } + } + } + + if rebroadcast { + if let Some((c_round, ref c_commit)) = self.last_commit { + self.network.send_commit(c_round, self.set_id, c_commit.clone()); + } + + let round = self.round_messages.0; + for message in self.round_messages.1.iter().cloned() { + self.network.send_message(round, self.set_id, message); + } + } + } + loop { + match self.incoming_broadcast.poll().expect("UnboundedReceiver does not yield errors; qed") { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Err(Error::Network( + "all broadcast handles dropped, connection to network severed".into() + )), + Async::Ready(Some(item)) => { + if item.set_id() > self.set_id { + self.set_id = item.set_id(); + self.last_commit = None; + self.round_messages = (0, Vec::new()); + } + + match item { + Broadcast::Commit(set_id, round, commit) => { + if self.set_id == set_id { + if round >= self.last_commit.as_ref().map_or(0, |&(r, _)| r) { + self.last_commit = Some((round, commit.clone())); + } + } + + // always send out to network. + self.network.send_commit(round, self.set_id, commit); + } + Broadcast::Message(set_id, round, message) => { + if self.set_id == set_id { + if round > self.round_messages.0 { + self.round_messages = (round, vec![message.clone()]); + } else if round == self.round_messages.0 { + self.round_messages.1.push(message.clone()); + }; + + // ignore messages from earlier rounds. + } + + // always send out to network. + self.network.send_message(round, set_id, message); + } + } + } + } + } + } +} + +impl Network for BroadcastHandle { + type In = N::In; + + fn messages_for(&self, round: u64, set_id: u64) -> Self::In { + self.network.messages_for(round, set_id) + } + + fn send_message(&self, round: u64, set_id: u64, message: Vec) { + let _ = self.relay.unbounded_send(Broadcast::Message(set_id, round, message)); + } + + fn drop_messages(&self, round: u64, set_id: u64) { + self.network.drop_messages(round, set_id); + } + + fn commit_messages(&self, set_id: u64) -> Self::In { + self.network.commit_messages(set_id) + } + + fn send_commit(&self, round: u64, set_id: u64, message: Vec) { + let _ = self.relay.unbounded_send(Broadcast::Commit(round, set_id, message)); + } +} + // check a message. pub(crate) fn check_message_sig( message: &Message, @@ -271,7 +425,7 @@ impl Sink for CommitsOut { auth_data }; - self.network.send_commit(self.set_id, Encode::encode(&(round, compact_commit))); + self.network.send_commit(round, self.set_id, Encode::encode(&(round, compact_commit))); Ok(AsyncSink::Ready) } diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index cadbeff5ef..ba7888e0d7 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -238,7 +238,7 @@ pub trait Network: Clone { fn commit_messages(&self, set_id: u64) -> Self::In; /// Send message over the commit channel. - fn send_commit(&self, set_id: u64, message: Vec); + fn send_commit(&self, round: u64, set_id: u64, message: Vec); } /// Bridge between NetworkService, gossiping consensus messages and Grandpa @@ -289,7 +289,7 @@ impl, H: ExHashT self.service.consensus_gossip().write().messages_for(commit_topic::(set_id)) } - fn send_commit(&self, set_id: u64, message: Vec) { + fn send_commit(&self, _round: u64, set_id: u64, message: Vec) { let topic = commit_topic::(set_id); self.service.gossip_consensus_message(topic, message, true); } @@ -1496,6 +1496,10 @@ pub fn run_grandpa, N, RA>( let chain_info = client.info()?; let genesis_hash = chain_info.chain.genesis_hash; + // we shadow network with the wrapping/rebroadcasting network to avoid + // accidental reuse. + let (broadcast_worker, network) = communication::rebroadcasting_network(network); + let (last_round_number, last_state) = match Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)? { None => (0, RoundState::genesis((genesis_hash, >::zero()))), Some(raw) => LastCompleted::decode(&mut &raw[..]) @@ -1601,7 +1605,12 @@ pub fn run_grandpa, N, RA>( trigger_authority_set_change(new, authority_set_change) }, })) - }).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); + }); + + let voter_work = voter_work + .join(broadcast_worker) + .map(|((), ())| ()) + .map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); Ok(voter_work.select(on_exit).then(|_| Ok(()))) } diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index d973481af2..b3f1d1e11c 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -222,7 +222,7 @@ impl Network for MessageRouting { Box::new(messages) } - fn send_commit(&self, set_id: u64, message: Vec) { + fn send_commit(&self, _round: u64, set_id: u64, message: Vec) { let mut inner = self.inner.lock(); inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, true); inner.route_until_complete();