diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 29e5137a3d..846440ac39 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -556,6 +556,11 @@ impl Protocol { self.sync.status().queued_blocks } + /// Number of processed blocks. + pub fn num_processed_blocks(&self) -> usize { + self.sync.num_processed_blocks() + } + /// Number of active sync requests. pub fn num_sync_requests(&self) -> usize { self.sync.num_sync_requests() diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index ffbe6a096a..683f3c3139 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -85,7 +85,7 @@ mod rep { pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header"); /// Reputation change for peers which send us a block which we fail to verify. - pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 20), "Block verification failed"); + pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed"); /// Reputation change for peers which send us a known bad block. pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block"); @@ -138,6 +138,8 @@ pub struct ChainSync { block_announce_validator: Box + Send>, /// Maximum number of peers to ask the same blocks in parallel. max_parallel_downloads: u32, + /// Total number of processed blocks (imported or failed). + processed_blocks: usize, } /// All the data we have about a Peer that we are trying to sync with @@ -318,6 +320,7 @@ impl ChainSync { is_idle: false, block_announce_validator, max_parallel_downloads, + processed_blocks: 0, } } @@ -357,6 +360,11 @@ impl ChainSync { self.fork_targets.len() } + /// Number of processed blocks. + pub fn num_processed_blocks(&self) -> usize { + self.processed_blocks + } + /// Handle a new connected peer. /// /// Call this method whenever we connect to a new peer. @@ -649,7 +657,7 @@ impl ChainSync { pub fn on_block_data (&mut self, who: PeerId, request: Option>, response: BlockResponse) -> Result, BadPeer> { - let new_blocks: Vec> = + let mut new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(&who) { let mut blocks = response.blocks; if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) { @@ -768,6 +776,12 @@ impl ChainSync { Vec::new() }; + let orig_len = new_blocks.len(); + new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); + if new_blocks.len() != orig_len { + debug!(target: "sync", "Ignoring {} blocks that are already queued", orig_len - new_blocks.len()); + } + let is_recent = new_blocks.first() .map(|block| { self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash)) @@ -895,10 +909,12 @@ impl ChainSync { let mut output = Vec::new(); let mut has_error = false; - let mut hashes = vec![]; - for (result, hash) in results { - hashes.push(hash); + for (_, hash) in &results { + self.queue_blocks.remove(&hash); + } + self.processed_blocks += results.len(); + for (result, hash) in results { if has_error { continue; } @@ -943,43 +959,39 @@ impl ChainSync { }, Err(BlockImportError::IncompleteHeader(who)) => { if let Some(peer) = who { - info!("Peer sent block with incomplete header to import"); + warn!("Peer sent block with incomplete header to import"); output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); output.extend(self.restart()); } }, Err(BlockImportError::VerificationFailed(who, e)) => { if let Some(peer) = who { - info!("Verification failed from peer: {}", e); + warn!("Verification failed for block {:?} received from peer: {}, {:?}", hash, peer, e); output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); output.extend(self.restart()); } }, Err(BlockImportError::BadBlock(who)) => { if let Some(peer) = who { - info!("Block received from peer has been blacklisted"); + info!("Block {:?} received from peer {} has been blacklisted", hash, peer); output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); - output.extend(self.restart()); } }, Err(BlockImportError::MissingState) => { // This may happen if the chain we were requesting upon has been discarded // in the meantime because other chain has been finalized. // Don't mark it as bad as it still may be synced if explicitly requested. - trace!(target: "sync", "Obsolete block"); + trace!(target: "sync", "Obsolete block {:?}", hash); }, - Err(BlockImportError::UnknownParent) | - Err(BlockImportError::Cancelled) | - Err(BlockImportError::Other(_)) => { + e @ Err(BlockImportError::UnknownParent) | + e @ Err(BlockImportError::Other(_)) => { + warn!(target: "sync", "Error importing block {:?}: {:?}", hash, e); output.extend(self.restart()); }, + Err(BlockImportError::Cancelled) => {} }; } - for hash in hashes { - self.queue_blocks.remove(&hash); - } - self.is_idle = false; output.into_iter() } @@ -1094,9 +1106,9 @@ impl ChainSync { if let PeerSyncState::AncestorSearch(_, _) = peer.state { return OnBlockAnnounce::Nothing } - // If the announced block is the best they have seen, our common number + // If the announced block is the best they have and is not ahead of us, our common number // is either one further ahead or it's the one they just announced, if we know about it. - if is_best { + if is_best && self.best_queued_number >= number { if known { peer.common_number = number } else if header.parent_hash() == &self.best_queued_hash || known_parent { @@ -1168,7 +1180,7 @@ impl ChainSync { /// Restart the sync process. fn restart<'a>(&'a mut self) -> impl Iterator), BadPeer>> + 'a { - self.queue_blocks.clear(); + self.processed_blocks = 0; self.blocks.clear(); let info = self.client.info(); self.best_queued_hash = info.best_hash; diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 811690f433..81bea868b4 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -375,6 +375,11 @@ impl NetworkWorker { self.network_service.user_protocol().num_queued_blocks() } + /// Returns the number of processed blocks. + pub fn num_processed_blocks(&self) -> usize { + self.network_service.user_protocol().num_processed_blocks() + } + /// Number of active sync requests. pub fn num_sync_requests(&self) -> usize { self.network_service.user_protocol().num_sync_requests() diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index e98cf8bada..8ff06fc5ac 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -207,6 +207,11 @@ impl Peer { self.network.num_connected_peers() } + /// Returns the number of processed blocks. + pub fn num_processed_blocks(&self) -> usize { + self.network.num_processed_blocks() + } + /// Returns true if we have no peer. pub fn is_offline(&self) -> bool { self.num_peers() == 0 diff --git a/substrate/client/network/test/src/sync.rs b/substrate/client/network/test/src/sync.rs index 3882575168..5453747220 100644 --- a/substrate/client/network/test/src/sync.rs +++ b/substrate/client/network/test/src/sync.rs @@ -657,3 +657,40 @@ fn full_sync_requires_block_body() { net.block_until_idle(); assert_eq!(net.peer(1).client.info().best_number, 0); } + +#[test] +fn imports_stale_once() { + let _ = ::env_logger::try_init(); + + fn import_with_announce(net: &mut TestNet, hash: H256) { + // Announce twice + net.peer(0).announce_block(hash, Vec::new()); + net.peer(0).announce_block(hash, Vec::new()); + + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some() { + Poll::Ready(()) + } else { + Poll::Pending + } + })); + } + + // given the network with 2 full nodes + let mut net = TestNet::new(2); + + // let them connect to each other + net.block_until_sync(); + + // check that NEW block is imported from announce message + let new_hash = net.peer(0).push_blocks(1, false); + import_with_announce(&mut net, new_hash); + assert_eq!(net.peer(1).num_processed_blocks(), 1); + + // check that KNOWN STALE block is imported from announce message + let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true); + import_with_announce(&mut net, known_stale_hash); + assert_eq!(net.peer(1).num_processed_blocks(), 2); +} +