diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 6dab5a2a54..ea993bdce0 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -64,9 +64,9 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); /// Current protocol version. -pub(crate) const CURRENT_VERSION: u32 = 3; +pub(crate) const CURRENT_VERSION: u32 = 4; /// Lowest version we support -pub(crate) const MIN_VERSION: u32 = 2; +pub(crate) const MIN_VERSION: u32 = 3; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; @@ -1028,14 +1028,33 @@ impl, H: ExHashT> Protocol { return; } + let is_best = self.context_data.chain.info().chain.best_hash == hash; + debug!(target: "sync", "Reannouncing block {:?}", hash); + self.send_announcement(&header, is_best, true) + } + + fn send_announcement(&mut self, header: &B::Header, is_best: bool, force: bool) { let hash = header.hash(); - let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() }); - for (who, ref mut peer) in self.context_data.peers.iter_mut() { - trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); - peer.known_blocks.insert(hash); - self.behaviour.send_packet(who, message.clone()) + trace!(target: "sync", "Announcing block {:?} to {}", hash, who); + let inserted = peer.known_blocks.insert(hash); + if inserted || force { + let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { + header: header.clone(), + state: if peer.info.protocol_version >= 4 { + if is_best { + Some(message::BlockState::Best) + } else { + Some(message::BlockState::Normal) + } + } else { + None + } + }); + + self.behaviour.send_packet(who, message) + } } } @@ -1072,7 +1091,12 @@ impl, H: ExHashT> Protocol { peerset: self.peerset_handle.clone(), }, who.clone(), *header.number()); - match self.sync.on_block_announce(who.clone(), hash, &header) { + let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) { + message::BlockState::Best => true, + message::BlockState::Normal => false, + }; + + match self.sync.on_block_announce(who.clone(), hash, &header, is_their_best) { sync::OnBlockAnnounce::Request(peer, req) => { self.send_message(peer, GenericMessage::BlockRequest(req)); return CustomMessageOutcome::None @@ -1132,8 +1156,10 @@ impl, H: ExHashT> Protocol { /// Call this when a block has been imported in the import queue and we should announce it on /// the network. - pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { - self.sync.update_chain_info(header); + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, is_best: bool) { + if is_best { + self.sync.update_chain_info(header); + } self.specialization.on_block_imported( &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), hash.clone(), @@ -1146,15 +1172,7 @@ impl, H: ExHashT> Protocol { } // send out block announcements - - let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() }); - - for (who, ref mut peer) in self.context_data.peers.iter_mut() { - if peer.known_blocks.insert(hash.clone()) { - trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - self.behaviour.send_packet(who, message.clone()) - } - } + self.send_announcement(&header, is_best, false); } /// Call this when a block has been finalized. The sync layer may have some additional diff --git a/substrate/core/network/src/protocol/message.rs b/substrate/core/network/src/protocol/message.rs index cb02845735..2faa68a4e2 100644 --- a/substrate/core/network/src/protocol/message.rs +++ b/substrate/core/network/src/protocol/message.rs @@ -107,6 +107,15 @@ pub enum Direction { Descending = 1, } +/// Block state in the chain. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)] +pub enum BlockState { + /// Block is not part of the best chain. + Normal, + /// Latest best block. + Best, +} + /// Remote call response. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct RemoteCallResponse { @@ -127,12 +136,13 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { - use codec::{Encode, Decode}; + use codec::{Encode, Decode, Input, Output}; use sr_primitives::Justification; use crate::config::Roles; use super::{ RemoteReadResponse, Transactions, Direction, RequestId, BlockAttributes, RemoteCallResponse, ConsensusEngineId, + BlockState, }; /// Consensus is mostly opaque to us #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] @@ -257,10 +267,35 @@ pub mod generic { } /// Announce a new complete relay chain block on the network. - #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct BlockAnnounce { /// New block header. pub header: H, + /// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common. + pub state: Option, + } + + // Custom Encode/Decode impl to maintain backwards compatibility with v3. + // This assumes that the packet contains nothing but the announcement message. + // TODO: Get rid of it once protocol v4 is common. + impl Encode for BlockAnnounce { + fn encode_to(&self, dest: &mut T) { + self.header.encode_to(dest); + if let Some(state) = &self.state { + state.encode_to(dest); + } + } + } + + impl Decode for BlockAnnounce { + fn decode(input: &mut I) -> Result { + let header = H::decode(input)?; + let state = BlockState::decode(input).ok(); + Ok(BlockAnnounce { + header, + state, + }) + } } #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 5963ebf0f2..52e88034a3 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -885,7 +885,9 @@ impl ChainSync { /// header (call `on_block_data`). The network request isn't sent /// in this case. Both hash and header is passed as an optimization /// to avoid rehashing the header. - pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header) -> OnBlockAnnounce { + pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header, is_best: bool) + -> OnBlockAnnounce + { let number = *header.number(); debug!(target: "sync", "Received block announcement with number {:?}", number); if number.is_zero() { @@ -907,7 +909,7 @@ impl ChainSync { peer.recently_announced.pop_front(); } peer.recently_announced.push_back(hash.clone()); - if number > peer.best_number { + if is_best && number > peer.best_number { // update their best block peer.best_number = number; peer.best_hash = hash; @@ -915,9 +917,9 @@ impl ChainSync { if let PeerSyncState::AncestorSearch(_, _) = peer.state { return OnBlockAnnounce::Nothing } - // We assume that the announced block is the latest they have seen, and so our common number + // If the announced block is the best they have seen, our common number // is either one further ahead or it's the one they just announced, if we know about it. - if known { + if known && is_best { peer.common_number = number } else if header.parent_hash() == &self.best_queued_hash || known_parent { peer.common_number = number - One::one(); diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index ac6bd1ac05..a1cba0395e 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -297,8 +297,8 @@ impl, H: ExHashT> NetworkWorker } /// You must call this when a new block is imported by the client. - pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header) { - self.network_service.user_protocol_mut().on_block_imported(hash, &header); + pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, is_best: bool) { + self.network_service.user_protocol_mut().on_block_imported(hash, &header, is_best); } /// You must call this when a new block is finalized by the client. diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index b6a6103071..5fd67acd1b 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -300,7 +300,7 @@ impl> Peer { Default::default() }; self.block_import.import_block(import_block, cache).expect("block_import failed"); - self.network.on_block_imported(hash, header); + self.network.on_block_imported(hash, header, true); at = hash; } @@ -662,7 +662,7 @@ pub trait TestNetFactory: Sized { // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() { - peer.network.on_block_imported(notification.hash, notification.header); + peer.network.on_block_imported(notification.hash, notification.header, true); } // We poll `finality_notification_stream`, but we only take the last event. diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index a8900eedd9..431776b31e 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -682,7 +682,7 @@ fn build_network_future< // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { - network.on_block_imported(notification.hash, notification.header); + network.on_block_imported(notification.hash, notification.header, notification.is_new_best); } // We poll `finality_notification_stream`, but we only take the last event.