diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs index 926b0e01e2..9d661ae31b 100644 --- a/substrate/core/finality-grandpa/src/communication.rs +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -32,25 +32,31 @@ fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec (message, round, set_id).encode() } -enum Broadcast { +enum Broadcast { // set_id, round, encoded commit. Commit(u64, u64, Vec), // set_id, round, encoded signed message. Message(u64, u64, Vec), + // set_id, round, announcement of block hash that should be downloaded + Announcement(u64, u64, Block::Hash), + // set_id, round being dropped. + DropRound(u64, u64), } -impl Broadcast { +impl Broadcast { fn set_id(&self) -> u64 { match *self { Broadcast::Commit(s, _, _) => s, Broadcast::Message(s, _, _) => s, + Broadcast::Announcement(s, _, _) => s, + Broadcast::DropRound(s, _) => s, } } } /// Produces a future that should be run in the background and proxies /// and rebroadcasts messages. -pub(crate) fn rebroadcasting_network(network: N) -> (BroadcastWorker, BroadcastHandle) { +pub(crate) fn rebroadcasting_network>(network: N) -> (BroadcastWorker, BroadcastHandle) { use std::time::Duration; const REBROADCAST_PERIOD: Duration = Duration::from_secs(60); @@ -62,6 +68,7 @@ pub(crate) fn rebroadcasting_network(network: N) -> (BroadcastWorker set_id: 0, // will be overwritten on first item to broadcast. last_commit: None, round_messages: (0, Vec::new()), + announcements: HashMap::new(), network: network.clone(), incoming_broadcast: rx, }, @@ -75,23 +82,24 @@ pub(crate) fn rebroadcasting_network(network: N) -> (BroadcastWorker // A worker which broadcasts messages to the background, potentially // rebroadcasting. #[must_use = "network rebroadcast future must be driven to completion"] -pub(crate) struct BroadcastWorker { +pub(crate) struct BroadcastWorker> { interval: Interval, set_id: u64, last_commit: Option<(u64, Vec)>, round_messages: (u64, Vec>), + announcements: HashMap, network: N, - incoming_broadcast: mpsc::UnboundedReceiver, + incoming_broadcast: mpsc::UnboundedReceiver>, } /// A handle used by communication work to broadcast to network. #[derive(Clone)] -pub(crate) struct BroadcastHandle { - relay: mpsc::UnboundedSender, +pub(crate) struct BroadcastHandle { + relay: mpsc::UnboundedSender>, network: N, } -impl Future for BroadcastWorker { +impl> Future for BroadcastWorker { type Item = (); type Error = Error; @@ -114,6 +122,10 @@ impl Future for BroadcastWorker { for message in self.round_messages.1.iter().cloned() { self.network.send_message(round, self.set_id, message); } + + for (&announce_hash, &round) in &self.announcements { + self.network.announce(round, self.set_id, announce_hash); + } } } loop { @@ -127,6 +139,7 @@ impl Future for BroadcastWorker { self.set_id = item.set_id(); self.last_commit = None; self.round_messages = (0, Vec::new()); + self.announcements.clear(); } match item { @@ -154,6 +167,19 @@ impl Future for BroadcastWorker { // always send out to network. self.network.send_message(round, set_id, message); } + Broadcast::Announcement(set_id, round, hash) => { + if self.set_id == set_id { + self.announcements.insert(hash, round); + } + + // always send out. + self.network.announce(round, set_id, hash); + } + Broadcast::DropRound(set_id, round) => { + // stop making announcements for any dead rounds. + self.announcements.retain(|_, &mut r| r > round); + self.network.drop_messages(round, set_id); + } } } } @@ -161,7 +187,7 @@ impl Future for BroadcastWorker { } } -impl Network for BroadcastHandle { +impl> Network for BroadcastHandle { type In = N::In; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { @@ -173,7 +199,7 @@ impl Network for BroadcastHandle { } fn drop_messages(&self, round: u64, set_id: u64) { - self.network.drop_messages(round, set_id); + let _ = self.relay.unbounded_send(Broadcast::DropRound(set_id, round)); } fn commit_messages(&self, set_id: u64) -> Self::In { @@ -183,6 +209,10 @@ impl Network for BroadcastHandle { fn send_commit(&self, round: u64, set_id: u64, message: Vec) { let _ = self.relay.unbounded_send(Broadcast::Commit(round, set_id, message)); } + + fn announce(&self, round: u64, set_id: u64, block: B::Hash) { + let _ = self.relay.unbounded_send(Broadcast::Announcement(round, set_id, block)); + } } // check a message. @@ -243,7 +273,7 @@ pub(crate) fn checked_message_stream( .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } -struct OutgoingMessages { +pub(crate) struct OutgoingMessages> { round: u64, set_id: u64, locals: Option<(Arc, Ed25519AuthorityId)>, @@ -251,7 +281,8 @@ struct OutgoingMessages { network: N, } -impl Sink for OutgoingMessages { +impl> Sink for OutgoingMessages +{ type SinkItem = Message; type SinkError = Error; @@ -260,14 +291,20 @@ impl Sink for OutgoingMessages { if let Some((ref pair, local_id)) = self.locals { let encoded = localized_payload(self.round, self.set_id, &msg); let signature = pair.sign(&encoded[..]); + + let target_hash = msg.target().0.clone(); let signed = SignedMessage:: { message: msg, signature, id: local_id, }; - // forward to network and to inner sender. + // announce our block hash to peers and propagate the + // message. + self.network.announce(self.round, self.set_id, target_hash); self.network.send_message(self.round, self.set_id, signed.encode()); + + // forward the message to the inner sender. let _ = self.sender.unbounded_send(signed); } @@ -282,7 +319,7 @@ impl Sink for OutgoingMessages { } } -impl Drop for OutgoingMessages { +impl> Drop for OutgoingMessages { fn drop(&mut self) { self.network.drop_messages(self.round, self.set_id); } @@ -293,7 +330,7 @@ impl Drop for OutgoingMessages { /// /// A future can push unsigned messages into the sink. They will be automatically /// broadcast to the network. The returned stream should be combined with other input. -pub(crate) fn outgoing_messages( +pub(crate) fn outgoing_messages>( round: u64, set_id: u64, local_key: Option>, @@ -301,7 +338,7 @@ pub(crate) fn outgoing_messages( network: N, ) -> ( impl Stream,Error=Error>, - impl Sink,SinkError=Error>, + OutgoingMessages, ) { let locals = local_key.and_then(|pair| { let public = pair.public(); @@ -410,7 +447,7 @@ impl CommitsOut { } } -impl Sink for CommitsOut { +impl> Sink for CommitsOut { type SinkItem = (u64, Commit); type SinkError = Error; diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index ddf3874546..d84cf00728 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -219,7 +219,7 @@ impl From for Error { /// handle to a gossip service or similar. /// /// Intended to be a lightweight handle such as an `Arc`. -pub trait Network: Clone { +pub trait Network: Clone { /// A stream of input messages for a topic. type In: Stream,Error=()>; @@ -239,6 +239,9 @@ pub trait Network: Clone { /// Send message over the commit channel. fn send_commit(&self, round: u64, set_id: u64, message: Vec); + + /// Inform peers that a block with given hash should be downloaded. + fn announce(&self, round: u64, set_id: u64, block: Block::Hash); } /// Bridge between NetworkService, gossiping consensus messages and Grandpa @@ -269,7 +272,7 @@ fn commit_topic(set_id: u64) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes()) } -impl, H: ExHashT> Network for NetworkBridge { +impl, H: ExHashT> Network for NetworkBridge { type In = mpsc::UnboundedReceiver; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { self.service.consensus_gossip().write().messages_for(message_topic::(round, set_id)) @@ -293,6 +296,11 @@ impl, H: ExHashT let topic = commit_topic::(set_id); self.service.gossip_consensus_message(topic, message, true); } + + fn announce(&self, round: u64, _set_id: u64, block: B::Hash) { + debug!(target: "afg", "Announcing block {} to peers which we voted on in round {}", block, round); + self.service.announce_block(block) + } } /// Something which can determine if a block is known. @@ -368,7 +376,7 @@ impl ConsensusChanges { type SharedConsensusChanges = Arc>>; /// The environment we run GRANDPA in. -struct Environment { +struct Environment, RA> { inner: Arc>, voters: Arc>, config: Config, @@ -382,7 +390,7 @@ impl, B, E, N, RA> grandpa::Chain + 'static, E: CallExecutor + 'static, - N: Network + 'static, + N: Network + 'static, N::In: 'static, NumberFor: BlockNumberOps, { @@ -540,7 +548,7 @@ impl, N, RA> voter::Environment + 'static, E: CallExecutor + 'static + Send + Sync, - N: Network + 'static + Send, + N: Network + 'static + Send, N::In: 'static + Send, RA: 'static + Send + Sync, NumberFor: BlockNumberOps, @@ -1476,7 +1484,7 @@ fn committer_communication, B, E, N, RA>( ) where B: Backend, E: CallExecutor + Send + Sync, - N: Network, + N: Network, RA: Send + Sync, NumberFor: BlockNumberOps, DigestItemFor: DigestItem, @@ -1522,7 +1530,7 @@ pub fn run_grandpa, N, RA>( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Sync + 'static, + N: Network + Send + Sync + 'static, N::In: Send + 'static, NumberFor: BlockNumberOps, DigestFor: Encode, diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 61147a4b22..1e2b25ad4b 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -175,7 +175,7 @@ fn make_commit_topic(set_id: u64) -> Hash { hash } -impl Network for MessageRouting { +impl Network for MessageRouting { type In = Box,Error=()> + Send>; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { @@ -229,6 +229,10 @@ impl Network for MessageRouting { inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, true); inner.route_until_complete(); } + + fn announce(&self, _round: u64, _set_id: u64, _block: H256) { + + } } #[derive(Default, Clone)] diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index d31bf43716..02b2ff7963 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -686,6 +686,34 @@ impl, H: ExHashT> Protocol { self.transaction_pool.on_broadcasted(propagated_to); } + /// Make sure an important block is propagated to peers. + /// + /// In chain-based consensus, we often need to make sure non-best forks are + /// at least temporarily synced. + pub fn announce_block(&self, io: &mut SyncIo, hash: B::Hash) { + let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { + Ok(Some(header)) => header, + Ok(None) => { + warn!("Trying to announce unknown block: {}", hash); + return; + } + Err(e) => { + warn!("Error reading block header {}: {:?}", hash, e); + return; + } + }; + let mut peers = self.context_data.peers.write(); + let hash = header.hash(); + for (who, ref mut peer) in peers.iter_mut() { + if peer.known_blocks.insert(hash.clone()) { + trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); + self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce { + header: header.clone() + })); + } + } + } + /// Send Status message fn send_status(&self, io: &mut SyncIo, who: NodeIndex) { if let Ok(info) = self.context_data.chain.info() { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 91840063b2..05b455aead 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -200,6 +200,14 @@ impl, H: ExHashT> Service, broadcast: bool) { self.handler.gossip_consensus_message( diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 629e8e3d1b..dcb38d7829 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -38,12 +38,15 @@ const MAX_IMPORTING_BLOCKS: usize = 2048; const MAJOR_SYNC_BLOCKS: usize = 5; // Time to wait before trying to get a justification from the same peer. const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10); +// Number of recently announced blocks to track for each peer. +const ANNOUNCE_HISTORY_SIZE: usize = 64; struct PeerSync { pub common_number: NumberFor, pub best_hash: B::Hash, pub best_number: NumberFor, pub state: PeerSyncState, + pub recently_announced: VecDeque, } #[derive(Copy, Clone, Eq, PartialEq, Debug)] @@ -334,6 +337,7 @@ impl ChainSync { best_hash: info.best_hash, best_number: info.best_number, state: PeerSyncState::Available, + recently_announced: Default::default(), }); } (Ok(BlockStatus::Unknown), _) => { @@ -346,6 +350,7 @@ impl ChainSync { best_hash: info.best_hash, best_number: info.best_number, state: PeerSyncState::AncestorSearch(common_best), + recently_announced: Default::default(), }); Self::request_ancestry(protocol, who, common_best) } else { @@ -356,6 +361,7 @@ impl ChainSync { best_hash: info.best_hash, best_number: info.best_number, state: PeerSyncState::Available, + recently_announced: Default::default(), }); self.download_new(protocol, who) } @@ -367,6 +373,7 @@ impl ChainSync { best_hash: info.best_hash, best_number: info.best_number, state: PeerSyncState::Available, + recently_announced: Default::default(), }); } } @@ -457,13 +464,16 @@ impl ChainSync { Vec::new() }; - let best_seen = self.best_seen_block(); - let is_best = new_blocks.first().and_then(|b| b.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n)); - let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; + let is_recent = new_blocks + .first() + .map(|block| self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash))) + .unwrap_or(false); + let origin = if is_recent { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; if let Some((hash, number)) = new_blocks.last() .and_then(|b| b.header.as_ref().map(|h| (b.hash.clone(), *h.number()))) { + trace!(target:"sync", "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), hash, origin); self.block_queued(&hash, number); } self.maintain_sync(protocol); @@ -586,6 +596,10 @@ impl ChainSync { let known_parent = self.is_known(protocol, &header.parent_hash()); let known = self.is_known(protocol, &hash); if let Some(ref mut peer) = self.peers.get_mut(&who) { + while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE { + peer.recently_announced.pop_front(); + } + peer.recently_announced.push_back(hash.clone()); if number > peer.best_number { // update their best block peer.best_number = number;