// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! Communication streams for the polite-grandpa networking protocol. //! //! GRANDPA nodes communicate over a gossip network, where messages are not sent to //! peers until they have reached a given round. //! //! Rather than expressing protocol rules, //! polite-grandpa just carries a notion of impoliteness. Nodes which pass some arbitrary //! threshold of impoliteness are removed. Messages are either costly, or beneficial. //! //! For instance, it is _impolite_ to send the same message more than once. //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. use std::sync::Arc; use grandpa::{voter, voter_set::VoterSet}; use grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use futures::prelude::*; use futures::sync::{oneshot, mpsc}; use log::{debug, trace}; use tokio_executor::Executor; use parity_codec::{Encode, Decode}; use substrate_primitives::{ed25519, Pair}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; use network::{consensus_gossip as network_gossip, NetworkService}; use network_gossip::ConsensusMessage; use crate::{ CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error, Message, SignedMessage, }; use crate::environment::HasVoted; use gossip::{ GossipMessage, FullCatchUpMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator }; use substrate_primitives::ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; pub mod gossip; mod periodic; #[cfg(test)] mod tests; pub use fg_primitives::GRANDPA_ENGINE_ID; // cost scalars for reporting peers. mod cost { pub(super) const PAST_REJECTION: i32 = -50; pub(super) const BAD_SIGNATURE: i32 = -100; pub(super) const MALFORMED_CATCH_UP: i32 = -1000; pub(super) const MALFORMED_COMMIT: i32 = -1000; pub(super) const FUTURE_MESSAGE: i32 = -500; pub(super) const UNKNOWN_VOTER: i32 = -150; pub(super) const INVALID_VIEW_CHANGE: i32 = -500; pub(super) const PER_UNDECODABLE_BYTE: i32 = -5; pub(super) const PER_SIGNATURE_CHECKED: i32 = -25; pub(super) const PER_BLOCK_LOADED: i32 = -10; pub(super) const INVALID_CATCH_UP: i32 = -5000; pub(super) const INVALID_COMMIT: i32 = -5000; pub(super) const OUT_OF_SCOPE_MESSAGE: i32 = -500; pub(super) const CATCH_UP_REQUEST_TIMEOUT: i32 = -200; // cost of answering a catch up request pub(super) const CATCH_UP_REPLY: i32 = -200; pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: i32 = -200; } // benefit scalars for reporting peers. mod benefit { pub(super) const NEIGHBOR_MESSAGE: i32 = 100; pub(super) const ROUND_MESSAGE: i32 = 100; pub(super) const BASIC_VALIDATED_CATCH_UP: i32 = 200; pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100; pub(super) const PER_EQUIVOCATION: i32 = 10; } /// A handle to the network. This is generally implemented by providing some /// handle to a gossip service or similar. /// /// Intended to be a lightweight handle such as an `Arc`. pub trait Network: Clone + Send + 'static { /// A stream of input messages for a topic. type In: Stream; /// Get a stream of messages for a specific gossip topic. fn messages_for(&self, topic: Block::Hash) -> Self::In; /// Register a gossip validator. fn register_validator(&self, validator: Arc>); /// Gossip a message out to all connected peers. /// /// Force causes it to be sent to all peers, even if they've seen it already. /// Only should be used in case of consensus stall. fn gossip_message(&self, topic: Block::Hash, data: Vec, force: bool); /// Register a message with the gossip service, it isn't broadcast right /// away to any peers, but may be sent to new peers joining or when asked to /// broadcast the topic. Useful to register previous messages on node /// startup. fn register_gossip_message(&self, topic: Block::Hash, data: Vec); /// Send a message to a bunch of specific peers, even if they've seen it already. fn send_message(&self, who: Vec, data: Vec); /// Report a peer's cost or benefit after some action. fn report(&self, who: network::PeerId, cost_benefit: i32); /// Inform peers that a block with given hash should be downloaded. fn announce(&self, block: Block::Hash); } /// Create a unique topic for a round and set-id combo. pub(crate) fn round_topic(round: u64, set_id: u64) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) } /// Create a unique topic for global messages on a set ID. pub(crate) fn global_topic(set_id: u64) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes()) } impl Network for Arc> where B: BlockT, S: network::specialization::NetworkSpecialization, H: network::ExHashT, { type In = NetworkStream; fn messages_for(&self, topic: B::Hash) -> Self::In { let (tx, rx) = oneshot::channel(); self.with_gossip(move |gossip, _| { let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, topic); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } } fn register_validator(&self, validator: Arc>) { self.with_gossip( move |gossip, context| gossip.register_validator(context, GRANDPA_ENGINE_ID, validator) ) } fn gossip_message(&self, topic: B::Hash, data: Vec, force: bool) { let msg = ConsensusMessage { engine_id: GRANDPA_ENGINE_ID, data, }; self.with_gossip( move |gossip, ctx| gossip.multicast(ctx, topic, msg, force) ) } fn register_gossip_message(&self, topic: B::Hash, data: Vec) { let msg = ConsensusMessage { engine_id: GRANDPA_ENGINE_ID, data, }; self.with_gossip(move |gossip, _| gossip.register_message(topic, msg)) } fn send_message(&self, who: Vec, data: Vec) { let msg = ConsensusMessage { engine_id: GRANDPA_ENGINE_ID, data, }; self.with_gossip(move |gossip, ctx| for who in &who { gossip.send_message(ctx, who, msg.clone()) }) } fn report(&self, who: network::PeerId, cost_benefit: i32) { self.report_peer(who, cost_benefit) } fn announce(&self, block: B::Hash) { self.announce_block(block) } } /// A stream used by NetworkBridge in its implementation of Network. pub struct NetworkStream { inner: Option>, outer: oneshot::Receiver> } impl Stream for NetworkStream { type Item = network_gossip::TopicNotification; type Error = (); fn poll(&mut self) -> Poll, Self::Error> { if let Some(ref mut inner) = self.inner { return inner.poll(); } match self.outer.poll() { Ok(futures::Async::Ready(mut inner)) => { let poll_result = inner.poll(); self.inner = Some(inner); poll_result }, Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady), Err(_) => Err(()) } } } /// Bridge between the underlying network service, gossiping consensus messages and Grandpa pub(crate) struct NetworkBridge> { service: N, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, } impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service /// handle and a future that must be polled to completion to finish startup. /// If a voter set state is given it registers previous round votes with the /// gossip service. pub(crate) fn new( service: N, config: crate::Config, set_state: crate::environment::SharedVoterSetState, on_exit: impl Future + Clone + Send + 'static, ) -> ( Self, impl futures::Future + Send + 'static, ) { let (validator, report_stream) = GossipValidator::new(config, set_state.clone()); let validator = Arc::new(validator); service.register_validator(validator.clone()); { // register all previous votes with the gossip service so that they're // available to peers potentially stuck on a previous round. let completed = set_state.read().completed_rounds(); let (set_id, voters) = completed.set_info(); validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {}); for round in completed.iter() { let topic = round_topic::(round.number, set_id); // we need to note the round with the gossip validator otherwise // messages will be ignored. validator.note_round(Round(round.number), |_, _| {}); for signed in round.votes.iter() { let message = gossip::GossipMessage::VoteOrPrecommit( gossip::VoteOrPrecommitMessage:: { message: signed.clone(), round: Round(round.number), set_id: SetId(set_id), } ); service.register_gossip_message( topic, message.encode(), ); } trace!(target: "afg", "Registered {} messages for topic {:?} (round: {}, set_id: {})", round.votes.len(), topic, round.number, set_id, ); } } let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); let reporting_job = report_stream.consume(service.clone()); let bridge = NetworkBridge { service, validator, neighbor_sender }; let startup_work = futures::future::lazy(move || { // lazily spawn these jobs onto their own tasks. the lazy future has access // to tokio globals, which aren't available outside. let mut executor = tokio_executor::DefaultExecutor::current(); executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa rebroadcast job task"); executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa reporting job task"); Ok(()) }); (bridge, startup_work) } /// Note the beginning of a new round to the `GossipValidator`. pub(crate) fn note_round( &self, round: Round, set_id: SetId, voters: &VoterSet, ) { // is a no-op if currently in that set. self.validator.note_set( set_id, voters.voters().iter().map(|(v, _)| v.clone()).collect(), |to, neighbor| self.service.send_message( to, GossipMessage::::from(neighbor).encode() ), ); self.validator.note_round( round, |to, neighbor| self.service.send_message( to, GossipMessage::::from(neighbor).encode() ), ); } /// Get the round messages for a round in the current set ID. These are signature-checked. pub(crate) fn round_communication( &self, round: Round, set_id: SetId, voters: Arc>, local_key: Option>, has_voted: HasVoted, ) -> ( impl Stream,Error=Error>, impl Sink,SinkError=Error>, ) { self.note_round( round, set_id, &*voters, ); let locals = local_key.and_then(|pair| { let public = pair.public(); let id = AuthorityId(public.0); if voters.contains_key(&id) { Some((pair, id)) } else { None } }); let topic = round_topic::(round.0, set_id.0); let incoming = self.service.messages_for(topic) .filter_map(|notification| { let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); if decoded.is_none() { debug!(target: "afg", "Skipping malformed message {:?}", notification); } decoded }) .and_then(move |msg| { match msg { GossipMessage::VoteOrPrecommit(msg) => { // check signature. if !voters.contains_key(&msg.message.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); return Ok(None); } match &msg.message.message { PrimaryPropose(propose) => { telemetry!(CONSENSUS_INFO; "afg.received_propose"; "voter" => ?format!("{}", msg.message.id), "target_number" => ?propose.target_number, "target_hash" => ?propose.target_hash, ); }, Prevote(prevote) => { telemetry!(CONSENSUS_INFO; "afg.received_prevote"; "voter" => ?format!("{}", msg.message.id), "target_number" => ?prevote.target_number, "target_hash" => ?prevote.target_hash, ); }, Precommit(precommit) => { telemetry!(CONSENSUS_INFO; "afg.received_precommit"; "voter" => ?format!("{}", msg.message.id), "target_number" => ?precommit.target_number, "target_hash" => ?precommit.target_hash, ); }, }; Ok(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); return Ok(None); } } }) .filter_map(|x| x) .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); let (tx, out_rx) = mpsc::unbounded(); let outgoing = OutgoingMessages:: { round: round.0, set_id: set_id.0, network: self.service.clone(), locals, sender: tx, has_voted, }; let out_rx = out_rx.map_err(move |()| Error::Network( format!("Failed to receive on unbounded receiver for round {}", round.0) )); let incoming = incoming.select(out_rx); (incoming, outgoing) } /// Set up the global communication streams. pub(crate) fn global_communication( &self, set_id: SetId, voters: Arc>, is_voter: bool, ) -> ( impl Stream, Error = Error>, impl Sink, SinkError = Error>, ) { self.validator.note_set( set_id, voters.voters().iter().map(|(v, _)| v.clone()).collect(), |to, neighbor| self.service.send_message(to, GossipMessage::::from(neighbor).encode()), ); let service = self.service.clone(); let topic = global_topic::(set_id.0); let incoming = incoming_global(service, topic, voters, self.validator.clone()); let outgoing = CommitsOut::::new( self.service.clone(), set_id.0, is_voter, self.validator.clone(), ); let outgoing = outgoing.with(|out| { let voter::CommunicationOut::Commit(round, commit) = out; Ok((round, commit)) }); (incoming, outgoing) } } fn incoming_global>( mut service: N, topic: B::Hash, voters: Arc>, gossip_validator: Arc>, ) -> impl Stream, Error = Error> { let process_commit = move | msg: FullCommitMessage, mut notification: network_gossip::TopicNotification, service: &mut N, gossip_validator: &Arc>, voters: &VoterSet, | { let precommits_signed_by: Vec = msg.message.auth_data.iter().map(move |(_, a)| { format!("{}", a) }).collect(); telemetry!(CONSENSUS_INFO; "afg.received_commit"; "contains_precommits_signed_by" => ?precommits_signed_by, "target_number" => ?msg.message.target_number.clone(), "target_hash" => ?msg.message.target_hash.clone(), ); if let Err(cost) = check_compact_commit::( &msg.message, voters, msg.round, msg.set_id, ) { if let Some(who) = notification.sender { service.report(who, cost); } return None; } let round = msg.round.0; let commit = msg.message; let finalized_number = commit.target_number; let gossip_validator = gossip_validator.clone(); let service = service.clone(); let cb = move |outcome| match outcome { voter::CommitProcessingOutcome::Good(_) => { // if it checks out, gossip it. not accounting for // any discrepancy between the actual ghost and the claimed // finalized number. gossip_validator.note_commit_finalized( finalized_number, |to, neighbor_msg| service.send_message( to, GossipMessage::::from(neighbor_msg).encode(), ), ); service.gossip_message(topic, notification.message.clone(), false); } voter::CommitProcessingOutcome::Bad(_) => { // report peer and do not gossip. if let Some(who) = notification.sender.take() { service.report(who, cost::INVALID_COMMIT); } } }; let cb = voter::Callback::Work(Box::new(cb)); Some(voter::CommunicationIn::Commit(round, commit, cb)) }; let process_catch_up = move | msg: FullCatchUpMessage, mut notification: network_gossip::TopicNotification, service: &mut N, gossip_validator: &Arc>, voters: &VoterSet, | { let gossip_validator = gossip_validator.clone(); let service = service.clone(); if let Err(cost) = check_catch_up::( &msg.message, voters, msg.set_id, ) { if let Some(who) = notification.sender { service.report(who, cost); } return None; } let cb = move |outcome| { if let voter::CatchUpProcessingOutcome::Bad(_) = outcome { // report peer if let Some(who) = notification.sender.take() { service.report(who, cost::INVALID_CATCH_UP); } } gossip_validator.note_catch_up_message_processed(); }; let cb = voter::Callback::Work(Box::new(cb)); Some(voter::CommunicationIn::CatchUp(msg.message, cb)) }; service.messages_for(topic) .filter_map(|notification| { // this could be optimized by decoding piecewise. let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); if decoded.is_none() { trace!(target: "afg", "Skipping malformed commit message {:?}", notification); } decoded.map(move |d| (notification, d)) }) .filter_map(move |(notification, msg)| { match msg { GossipMessage::Commit(msg) => process_commit(msg, notification, &mut service, &gossip_validator, &*voters), GossipMessage::CatchUp(msg) => process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters), _ => { debug!(target: "afg", "Skipping unknown message type"); return None; } } }) .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } impl> Clone for NetworkBridge { fn clone(&self) -> Self { NetworkBridge { service: self.service.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), } } } fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { (message, round, set_id).encode() } /// Type-safe wrapper around u64 when indicating that it's a round number. #[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)] pub struct Round(pub u64); /// Type-safe wrapper around u64 when indicating that it's a set ID. #[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)] pub struct SetId(pub u64); // check a message. pub(crate) fn check_message_sig( message: &Message, id: &AuthorityId, signature: &AuthoritySignature, round: u64, set_id: u64, ) -> Result<(), ()> { let as_public = AuthorityId::from_raw(id.0); let encoded_raw = localized_payload(round, set_id, message); if ed25519::Pair::verify(signature, &encoded_raw, as_public) { Ok(()) } else { debug!(target: "afg", "Bad signature on message from {:?}", id); Err(()) } } /// A sink for outgoing messages to the network. Any messages that are sent will /// be replaced, as appropriate, according to the given `HasVoted`. /// NOTE: The votes are stored unsigned, which means that the signatures need to /// be "stable", i.e. we should end up with the exact same signed message if we /// use the same raw message and key to sign. This is currently true for /// `ed25519` and `BLS` signatures (which we might use in the future), care must /// be taken when switching to different key types. struct OutgoingMessages> { round: u64, set_id: u64, locals: Option<(Arc, AuthorityId)>, sender: mpsc::UnboundedSender>, network: N, has_voted: HasVoted, } impl> Sink for OutgoingMessages { type SinkItem = Message; type SinkError = Error; fn start_send(&mut self, mut msg: Message) -> StartSend, Error> { // if we've voted on this round previously under the same key, send that vote instead match &mut msg { grandpa::Message::PrimaryPropose(ref mut vote) => if let Some(propose) = self.has_voted.propose() { *vote = propose.clone(); }, grandpa::Message::Prevote(ref mut vote) => if let Some(prevote) = self.has_voted.prevote() { *vote = prevote.clone(); }, grandpa::Message::Precommit(ref mut vote) => if let Some(precommit) = self.has_voted.precommit() { *vote = precommit.clone(); }, } // when locals exist, sign messages on import if let Some((ref pair, ref local_id)) = self.locals { let encoded = localized_payload(self.round, self.set_id, &msg); let signature = pair.sign(&encoded[..]); let target_hash = msg.target().0.clone(); let signed = SignedMessage:: { message: msg, signature, id: local_id.clone(), }; let message = GossipMessage::VoteOrPrecommit(VoteOrPrecommitMessage:: { message: signed.clone(), round: Round(self.round), set_id: SetId(self.set_id), }); debug!( target: "afg", "Announcing block {} to peers which we voted on in round {} in set {}", target_hash, self.round, self.set_id, ); telemetry!( CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers"; "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id, ); // announce our block hash to peers and propagate the // message. self.network.announce(target_hash); let topic = round_topic::(self.round, self.set_id); self.network.gossip_message(topic, message.encode(), false); // forward the message to the inner sender. let _ = self.sender.unbounded_send(signed); } Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } fn close(&mut self) -> Poll<(), Error> { // ignore errors since we allow this inner sender to be closed already. self.sender.close().or_else(|_| Ok(Async::Ready(()))) } } // checks a compact commit. returns the cost associated with processing it if // the commit was bad. fn check_compact_commit( msg: &CompactCommit, voters: &VoterSet, round: Round, set_id: SetId, ) -> Result<(), i32> { // 4f + 1 = equivocations from f voters. let f = voters.total_weight() - voters.threshold(); let full_threshold = voters.total_weight() + f; // check total weight is not out of range. let mut total_weight = 0; for (_, ref id) in &msg.auth_data { if let Some(weight) = voters.info(id).map(|info| info.weight()) { total_weight += weight; if total_weight > full_threshold { return Err(cost::MALFORMED_COMMIT); } } else { debug!(target: "afg", "Skipping commit containing unknown voter {}", id); return Err(cost::MALFORMED_COMMIT); } } if total_weight < voters.threshold() { return Err(cost::MALFORMED_COMMIT); } // check signatures on all contained precommits. for (i, (precommit, &(ref sig, ref id))) in msg.precommits.iter() .zip(&msg.auth_data) .enumerate() { use crate::communication::gossip::Misbehavior; use grandpa::Message as GrandpaMessage; if let Err(()) = check_message_sig::( &GrandpaMessage::Precommit(precommit.clone()), id, sig, round.0, set_id.0, ) { debug!(target: "afg", "Bad commit message signature {}", id); telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); let cost = Misbehavior::BadCommitMessage { signatures_checked: i as i32, blocks_loaded: 0, equivocations_caught: 0, }.cost(); return Err(cost); } } Ok(()) } // checks a catch up. returns the cost associated with processing it if // the catch up was bad. fn check_catch_up( msg: &CatchUp, voters: &VoterSet, set_id: SetId, ) -> Result<(), i32> { // 4f + 1 = equivocations from f voters. let f = voters.total_weight() - voters.threshold(); let full_threshold = voters.total_weight() + f; // check total weight is not out of range for a set of votes. fn check_weight<'a>( voters: &'a VoterSet, votes: impl Iterator, full_threshold: u64, ) -> Result<(), i32> { let mut total_weight = 0; for id in votes { if let Some(weight) = voters.info(&id).map(|info| info.weight()) { total_weight += weight; if total_weight > full_threshold { return Err(cost::MALFORMED_CATCH_UP); } } else { debug!(target: "afg", "Skipping catch up message containing unknown voter {}", id); return Err(cost::MALFORMED_CATCH_UP); } } if total_weight < voters.threshold() { return Err(cost::MALFORMED_CATCH_UP); } Ok(()) }; check_weight( voters, msg.prevotes.iter().map(|vote| &vote.id), full_threshold, )?; check_weight( voters, msg.precommits.iter().map(|vote| &vote.id), full_threshold, )?; fn check_signatures<'a, B, I>( messages: I, round: u64, set_id: u64, mut signatures_checked: usize, ) -> Result where B: BlockT, I: Iterator, &'a AuthorityId, &'a AuthoritySignature)>, { use crate::communication::gossip::Misbehavior; for (msg, id, sig) in messages { signatures_checked += 1; if let Err(()) = check_message_sig::( &msg, id, sig, round, set_id, ) { debug!(target: "afg", "Bad catch up message signature {}", id); telemetry!(CONSENSUS_DEBUG; "afg.bad_catch_up_msg_signature"; "id" => ?id); let cost = Misbehavior::BadCatchUpMessage { signatures_checked: signatures_checked as i32, }.cost(); return Err(cost); } } Ok(signatures_checked) } // check signatures on all contained prevotes. let signatures_checked = check_signatures::( msg.prevotes.iter().map(|vote| { (grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature) }), msg.round_number, set_id.0, 0, )?; // check signatures on all contained precommits. let _ = check_signatures::( msg.precommits.iter().map(|vote| { (grandpa::Message::Precommit(vote.precommit.clone()), &vote.id, &vote.signature) }), msg.round_number, set_id.0, signatures_checked, )?; Ok(()) } /// An output sink for commit messages. struct CommitsOut> { network: N, set_id: SetId, is_voter: bool, gossip_validator: Arc>, } impl> CommitsOut { /// Create a new commit output stream. pub(crate) fn new( network: N, set_id: u64, is_voter: bool, gossip_validator: Arc>, ) -> Self { CommitsOut { network, set_id: SetId(set_id), is_voter, gossip_validator, } } } impl> Sink for CommitsOut { type SinkItem = (u64, Commit); type SinkError = Error; fn start_send(&mut self, input: (u64, Commit)) -> StartSend { if !self.is_voter { return Ok(AsyncSink::Ready); } let (round, commit) = input; let round = Round(round); telemetry!(CONSENSUS_DEBUG; "afg.commit_issued"; "target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash, ); let (precommits, auth_data) = commit.precommits.into_iter() .map(|signed| (signed.precommit, (signed.signature, signed.id))) .unzip(); let compact_commit = CompactCommit:: { target_hash: commit.target_hash, target_number: commit.target_number, precommits, auth_data }; let message = GossipMessage::Commit(FullCommitMessage:: { round: round, set_id: self.set_id, message: compact_commit, }); let topic = global_topic::(self.set_id.0); // the gossip validator needs to be made aware of the best commit-height we know of // before gossiping self.gossip_validator.note_commit_finalized( commit.target_number, |to, neighbor| self.network.send_message( to, GossipMessage::::from(neighbor).encode(), ), ); self.network.gossip_message(topic, message.encode(), false); Ok(AsyncSink::Ready) } fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } }