diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 2d7799b5bb..baca6f3bf6 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -521,8 +521,9 @@ impl, H: ExHashT> Protocol { } }, GenericMessage::BlockAnnounce(announce) => { - self.on_block_announce(network_out, who.clone(), announce); + let outcome = self.on_block_announce(network_out, who.clone(), announce); self.update_peer_info(&who); + return outcome; }, GenericMessage::Transactions(m) => self.on_extrinsics(network_out, transaction_pool, who, m), @@ -1019,7 +1020,7 @@ impl, H: ExHashT> Protocol { mut network_out: &mut dyn NetworkOut, who: PeerId, announce: message::BlockAnnounce - ) { + ) -> CustomMessageOutcome { let header = announce.header; let hash = header.hash(); { @@ -1028,12 +1029,55 @@ impl, H: ExHashT> Protocol { } } self.on_demand_core.on_block_announce(&mut network_out, who.clone(), *header.number()); - self.sync.on_block_announce( + let try_import = self.sync.on_block_announce( &mut ProtocolContext::new(&mut self.context_data, network_out), who.clone(), hash, &header, ); + + // try_import is only true when we have all data required to import block + // in the BlockAnnounce message. This is only when: + // 1) we're on light client; + // AND + // - EITHER 2.1) announced block is stale; + // - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. + // there are no ascendants of this block scheduled for retrieval) + if !try_import { + return CustomMessageOutcome::None; + } + + // to import header from announced block let's construct response to request that normally would have + // been sent over network (but it is not in our case) + let blocks_to_import = self.sync.on_block_data( + &mut ProtocolContext::new(&mut self.context_data, network_out), + who.clone(), + message::generic::BlockRequest { + id: 0, + fields: BlockAttributes::HEADER, + from: message::FromBlock::Hash(hash), + to: None, + direction: message::Direction::Ascending, + max: Some(1), + }, + message::generic::BlockResponse { + id: 0, + blocks: vec![ + message::generic::BlockData { + hash: hash, + header: Some(header), + body: None, + receipt: None, + message_queue: None, + justification: None, + }, + ], + }, + ); + match blocks_to_import { + Some((origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), + None => CustomMessageOutcome::None, + } } /// Call this when a block has been imported in the import queue and we should announce it on diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index fdd36a024d..e451372847 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -31,6 +31,7 @@ //! use std::cmp::max; +use std::ops::Range; use std::collections::{HashMap, VecDeque}; use log::{debug, trace, warn, info}; use crate::protocol::PeerInfo as ProtocolPeerInfo; @@ -135,6 +136,7 @@ pub struct ChainSync { blocks: BlockCollection, best_queued_number: NumberFor, best_queued_hash: B::Hash, + role: Roles, required_block_attributes: message::BlockAttributes, extra_requests: ExtraRequestsAggregator, queue_blocks: HashSet, @@ -195,6 +197,7 @@ impl ChainSync { best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), extra_requests: ExtraRequestsAggregator::new(), + role, required_block_attributes, queue_blocks: Default::default(), best_importing_number: Zero::zero(), @@ -665,12 +668,21 @@ impl ChainSync { // Abort search. peer.state = PeerSyncState::Available; } - trace!(target: "sync", "Updating peer {} info, ours={}, common={}, their best={}", n, number, peer.common_number, peer.best_number); - if peer.best_number >= number { - peer.common_number = number; + let new_common_number = if peer.best_number >= number { + number } else { - peer.common_number = peer.best_number; - } + peer.best_number + }; + trace!( + target: "sync", + "Updating peer {} info, ours={}, common={}->{}, their best={}", + n, + number, + peer.common_number, + new_common_number, + peer.best_number, + ); + peer.common_number = new_common_number; } } @@ -681,12 +693,22 @@ impl ChainSync { } /// Call when a node announces a new block. - pub(crate) fn on_block_announce(&mut self, protocol: &mut Context, who: PeerId, hash: B::Hash, header: &B::Header) { + /// + /// If true is returned, then the caller MUST try to import passed header (call `on_block_data). + /// The network request isn't sent in this case. + #[must_use] + pub(crate) fn on_block_announce( + &mut self, + protocol: &mut Context, + who: PeerId, + hash: B::Hash, + header: &B::Header, + ) -> bool { let number = *header.number(); debug!(target: "sync", "Received block announcement with number {:?}", number); if number.is_zero() { warn!(target: "sync", "Ignored invalid block announcement from {}: {}", who, hash); - return; + return false; } let parent_status = block_status(&*protocol.client(), &self.queue_blocks, header.parent_hash().clone()).ok() .unwrap_or(BlockStatus::Unknown); @@ -705,7 +727,7 @@ impl ChainSync { peer.best_hash = hash; } if let PeerSyncState::AncestorSearch(_, _) = peer.state { - return; + return false; } if header.parent_hash() == &self.best_queued_hash || known_parent { peer.common_number = number - One::one(); @@ -713,39 +735,77 @@ impl ChainSync { peer.common_number = number } } else { - return; + return false; } - if !(known || self.is_already_downloading(&hash)) { - let stale = number <= self.best_queued_number; - if stale { - if !(known_parent || self.is_already_downloading(header.parent_hash())) { - if protocol.client().block_status(&BlockId::Number(*header.number())) - .unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned - { - trace!(target: "sync", "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header); + // known block case + if known || self.is_already_downloading(&hash) { + trace!(target: "sync", "Known block announce from {}: {}", who, hash); + return false; + } + + // stale block case + let requires_additional_data = !self.role.is_light(); + let stale = number <= self.best_queued_number; + if stale { + if !(known_parent || self.is_already_downloading(header.parent_hash())) { + if protocol.client().block_status(&BlockId::Number(*header.number())) + .unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned + { + trace!(target: "sync", "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header); + return false; + } + + trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header); + let request = self.download_unknown_stale(&who, &hash); + match request { + Some(request) => if requires_additional_data { + protocol.send_block_request(who, request); + return false; } else { - trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header); - self.download_unknown_stale(protocol, who, &hash); - } - } else { - if ancient_parent { - trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header); - } else { - self.download_stale(protocol, who, &hash); - } + return true; + }, + None => return false, } } else { if ancient_parent { - trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header); - } else { - trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header); - self.download_new(protocol, who); + trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header); + return false; + } + + let request = self.download_stale(&who, &hash); + match request { + Some(request) => if requires_additional_data { + protocol.send_block_request(who, request); + return false; + } else { + return true; + }, + None => return false, } } - } else { - trace!(target: "sync", "Known block announce from {}: {}", who, hash); } + + if ancient_parent { + trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header); + return false; + } + + trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header); + let (range, request) = match self.select_new_blocks(who.clone()) { + Some((range, request)) => (range, request), + None => return false, + }; + let is_required_data_available = + !requires_additional_data && + range.end - range.start == One::one() && + range.start == *header.number(); + if !is_required_data_available { + protocol.send_block_request(who, request); + return false; + } + + true } fn is_already_downloading(&self, hash: &B::Hash) -> bool { @@ -788,76 +848,105 @@ impl ChainSync { } // Download old block with known parent. - fn download_stale(&mut self, protocol: &mut Context, who: PeerId, hash: &B::Hash) { - if let Some(ref mut peer) = self.peers.get_mut(&who) { - match peer.state { - PeerSyncState::Available => { - let request = message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }; - peer.state = PeerSyncState::DownloadingStale(*hash); - protocol.send_block_request(who, request); - }, - _ => (), - } + fn download_stale( + &mut self, + who: &PeerId, + hash: &B::Hash, + ) -> Option> { + let peer = self.peers.get_mut(who)?; + match peer.state { + PeerSyncState::Available => { + peer.state = PeerSyncState::DownloadingStale(*hash); + Some(message::generic::BlockRequest { + id: 0, + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), + to: None, + direction: message::Direction::Ascending, + max: Some(1), + }) + }, + _ => None, } } // Download old block with unknown parent. - fn download_unknown_stale(&mut self, protocol: &mut Context, who: PeerId, hash: &B::Hash) { - if let Some(ref mut peer) = self.peers.get_mut(&who) { - match peer.state { - PeerSyncState::Available => { - let request = message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Descending, - max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), - }; - peer.state = PeerSyncState::DownloadingStale(*hash); - protocol.send_block_request(who, request); - }, - _ => (), - } + fn download_unknown_stale( + &mut self, + who: &PeerId, + hash: &B::Hash, + ) -> Option> { + let peer = self.peers.get_mut(who)?; + match peer.state { + PeerSyncState::Available => { + peer.state = PeerSyncState::DownloadingStale(*hash); + Some(message::generic::BlockRequest { + id: 0, + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), + to: None, + direction: message::Direction::Descending, + max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), + }) + }, + _ => None, } } - // Issue a request for a peer to download new blocks, if any are available + // Issue a request for a peer to download new blocks, if any are available. fn download_new(&mut self, protocol: &mut Context, who: PeerId) { - if let Some(ref mut peer) = self.peers.get_mut(&who) { - // when there are too many blocks in the queue => do not try to download new blocks - if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { - trace!(target: "sync", "Too many blocks in the queue."); - return; - } - match peer.state { - PeerSyncState::Available => { - trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", who, peer.common_number, peer.best_number); - if let Some(range) = self.blocks.needed_blocks(who.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + if let Some((_, request)) = self.select_new_blocks(who.clone()) { + protocol.send_block_request(who, request); + } + } + + // Select a range of NEW blocks to download from peer. + fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, message::BlockRequest)> { + // when there are too many blocks in the queue => do not try to download new blocks + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); + return None; + } + + let peer = self.peers.get_mut(&who)?; + match peer.state { + PeerSyncState::Available => { + trace!( + target: "sync", + "Considering new block download from {}, common block is {}, best is {:?}", + who, + peer.common_number, + peer.best_number, + ); + let range = self.blocks.needed_blocks(who.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number); + match range { + Some(range) => { trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); - let request = message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Number(range.start), - to: None, - direction: message::Direction::Ascending, - max: Some((range.end - range.start).saturated_into::()), - }; + let from = message::FromBlock::Number(range.start); + let max = Some((range.end - range.start).saturated_into::()); peer.state = PeerSyncState::DownloadingNew(range.start); - protocol.send_block_request(who, request); - } else { + Some(( + range, + message::generic::BlockRequest { + id: 0, + fields: self.required_block_attributes.clone(), + from, + to: None, + direction: message::Direction::Ascending, + max, + }, + )) + }, + None => { trace!(target: "sync", "Nothing to request"); - } - }, - _ => trace!(target: "sync", "Peer {} is busy", who), - } + None + }, + } + }, + _ => { + trace!(target: "sync", "Peer {} is busy", who); + None + }, } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index f31a93cf28..6a7a9ccddd 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -521,7 +521,7 @@ impl> Peer { /// Synchronize with import queue. #[cfg(any(test, feature = "test-helpers"))] - fn import_queue_sync(&self) { + pub fn import_queue_sync(&self) { self.import_queue.synchronize(); let _ = self.net_proto_channel.wait_sync(); } @@ -675,7 +675,7 @@ impl> Peer { /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash. - fn push_blocks_at(&self, at: BlockId, count: usize, with_tx: bool) -> H256 { + pub fn push_blocks_at(&self, at: BlockId, count: usize, with_tx: bool) -> H256 { let mut nonce = 0; if with_tx { self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| { diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 8462304e42..50b00ac62a 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -452,3 +452,36 @@ fn can_not_sync_from_light_peer() { // check that light #1 has disconnected from #2 assert_eq!(net.peer(1).protocol_status().num_peers, 1); } + +#[test] +fn light_peer_imports_header_from_announce() { + let _ = ::env_logger::try_init(); + + fn import_with_announce(net: &mut TestNet, hash: H256) { + let header = net.peer(0).client().header(&BlockId::Hash(hash)).unwrap().unwrap(); + net.peer(1).receive_message( + &net.peer(0).peer_id, + message::generic::Message::BlockAnnounce(message::generic::BlockAnnounce { + header, + }), + ); + + net.peer(1).import_queue_sync(); + assert!(net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some()); + } + + // given the network with 1 full nodes (#0) and 1 light node (#1) + let mut net = TestNet::new(1); + net.add_light_peer(&Default::default()); + + // let them connect to each other + net.sync(); + + // check that NEW block is imported from announce message + let new_hash = net.peer(0).push_blocks(1, false); + import_with_announce(&mut net, new_hash); + + // check that KNOWN STALE block is imported from announce message + let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true); + import_with_announce(&mut net, known_stale_hash); +}