From 6e572a947750d147b0d7c3f743f70c631d9948a2 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Thu, 9 Jan 2020 19:00:57 +0100 Subject: [PATCH] Prioritize new blocks over old forks when syncing (#4414) * Prioritize new blocks over old forks when syncing * Fixed some test cases --- substrate/client/network/src/protocol.rs | 16 +- substrate/client/network/src/protocol/sync.rs | 252 ++++++++++-------- substrate/client/network/src/service.rs | 5 + substrate/client/network/test/src/lib.rs | 3 + 4 files changed, 157 insertions(+), 119 deletions(-) diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 1b30da59de..9287312f09 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -547,6 +547,11 @@ impl, H: ExHashT> Protocol { self.sync.status().queued_blocks } + /// Number of active sync requests. + pub fn num_sync_requests(&self) -> usize { + self.sync.num_sync_requests() + } + /// Starts a new data demand request. /// /// The parameter contains a `Sender` where the result, once received, must be sent. @@ -892,7 +897,7 @@ impl, H: ExHashT> Protocol { } } } else { - match self.sync.on_block_data(peer, request, response) { + match self.sync.on_block_data(peer, Some(request), response) { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Request(peer, req)) => { @@ -1320,14 +1325,7 @@ impl, H: ExHashT> Protocol { // been sent over network (but it is not in our case) let blocks_to_import = self.sync.on_block_data( who.clone(), - message::generic::BlockRequest { - id: 0, - fields: BlockAttributes::HEADER, - from: message::FromBlock::Hash(hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }, + None, message::generic::BlockResponse { id: 0, blocks: vec![ diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index 5ae641b475..8929fc75f8 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -352,6 +352,11 @@ impl ChainSync { } } + /// Number of active sync requests. + pub fn num_sync_requests(&self) -> usize { + self.fork_targets.len() + } + /// Handle a new connected peer. /// /// Call this method whenever we connect to a new peer. @@ -473,7 +478,7 @@ impl ChainSync { debug!( target: "sync", "Explicit sync request for block {:?} with no peers specified. \ - Syncing from all connected peers {:?} instead.", + Syncing from all connected peers {:?} instead.", hash, peers, ); @@ -588,7 +593,27 @@ impl ChainSync { trace!(target: "sync", "Peer {} is busy", id); return None } - if let Some((hash, req)) = fork_sync_request( + if let Some((range, req)) = peer_block_request( + id, + peer, + blocks, + attrs, + max_parallel, + last_finalized, + best_queued, + ) { + peer.state = PeerSyncState::DownloadingNew(range.start); + trace!( + target: "sync", + "New block request for {}, (best:{}, common:{}) {:?}", + id, + peer.best_number, + peer.common_number, + req, + ); + have_requests = true; + Some((id.clone(), req)) + } else if let Some((hash, req)) = fork_sync_request( id, fork_targets, best_queued, @@ -604,25 +629,6 @@ impl ChainSync { peer.state = PeerSyncState::DownloadingStale(hash); have_requests = true; Some((id.clone(), req)) - } else if let Some((range, req)) = peer_block_request( - id, - peer, - blocks, - attrs, - max_parallel, - last_finalized - ) { - peer.state = PeerSyncState::DownloadingNew(range.start); - trace!( - target: "sync", - "New block request for {}, (best:{}, common:{}) {:?}", - id, - peer.best_number, - peer.common_number, - req, - ); - have_requests = true; - Some((id.clone(), req)) } else { None } @@ -636,111 +642,127 @@ impl ChainSync { /// Handle a response from the remote to a block request that we made. /// /// `request` must be the original request that triggered `response`. + /// or `None` if data comes from the block announcement. /// /// If this corresponds to a valid block, this outputs the block that /// must be imported in the import queue. pub fn on_block_data - (&mut self, who: PeerId, request: BlockRequest, response: BlockResponse) -> Result, BadPeer> + (&mut self, who: PeerId, request: Option>, response: BlockResponse) -> Result, BadPeer> { let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(&who) { let mut blocks = response.blocks; - if request.direction == message::Direction::Descending { + if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) { trace!(target: "sync", "Reversing incoming block list"); blocks.reverse() } self.is_idle = false; - match &mut peer.state { - PeerSyncState::DownloadingNew(start_block) => { - self.blocks.clear_peer_download(&who); - self.blocks.insert(*start_block, blocks, who); - peer.state = PeerSyncState::Available; - self.blocks - .drain(self.best_queued_number + One::one()) - .into_iter() - .map(|block_data| { + if request.is_some() { + match &mut peer.state { + PeerSyncState::DownloadingNew(start_block) => { + self.blocks.clear_peer_download(&who); + self.blocks.insert(*start_block, blocks, who); + peer.state = PeerSyncState::Available; + self.blocks + .drain(self.best_queued_number + One::one()) + .into_iter() + .map(|block_data| { + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + justification: block_data.block.justification, + origin: block_data.origin, + allow_missing_state: true, + import_existing: false, + } + }).collect() + } + PeerSyncState::DownloadingStale(_) => { + peer.state = PeerSyncState::Available; + blocks.into_iter().map(|b| { IncomingBlock { - hash: block_data.block.hash, - header: block_data.block.header, - body: block_data.block.body, - justification: block_data.block.justification, - origin: block_data.origin, - allow_missing_state: false, + hash: b.hash, + header: b.header, + body: b.body, + justification: b.justification, + origin: Some(who.clone()), + allow_missing_state: true, import_existing: false, } }).collect() - } - PeerSyncState::DownloadingStale(_) => { - peer.state = PeerSyncState::Available; - blocks.into_iter().map(|b| { - IncomingBlock { - hash: b.hash, - header: b.header, - body: b.body, - justification: b.justification, - origin: Some(who.clone()), - allow_missing_state: true, - import_existing: false, - } - }).collect() - } - PeerSyncState::AncestorSearch(num, state) => { - let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) { - (Some(block), Ok(maybe_our_block_hash)) => { - trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); - maybe_our_block_hash.filter(|x| x == &block.hash) - }, - (None, _) => { - debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); - return Err(BadPeer(who, rep::UNKNOWN_ANCESTOR)) - }, - (_, Err(e)) => { - info!("Error answering legitimate blockchain query: {:?}", e); - return Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR)) - } - }; - if matching_hash.is_some() && peer.common_number < *num { - peer.common_number = *num; } - if matching_hash.is_none() && num.is_zero() { - trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); - return Err(BadPeer(who, rep::GENESIS_MISMATCH)) - } - if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) { - peer.state = PeerSyncState::AncestorSearch(next_num, next_state); - return Ok(OnBlockData::Request(who, ancestry_request::(next_num))) - } else { - // Ancestry search is complete. Check if peer is on a stale fork unknown to us and - // add it to sync targets if necessary. - trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", - self.best_queued_hash, - self.best_queued_number, - peer.best_hash, - peer.best_number, - matching_hash, - peer.common_number, - ); - if peer.common_number < peer.best_number - && peer.best_number < self.best_queued_number - { - trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who); - self.fork_targets - .entry(peer.best_hash.clone()) - .or_insert_with(|| ForkTarget { - number: peer.best_number, - parent_hash: None, - peers: Default::default(), - }) - .peers.insert(who); + PeerSyncState::AncestorSearch(num, state) => { + let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) { + (Some(block), Ok(maybe_our_block_hash)) => { + trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); + maybe_our_block_hash.filter(|x| x == &block.hash) + }, + (None, _) => { + debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); + return Err(BadPeer(who, rep::UNKNOWN_ANCESTOR)) + }, + (_, Err(e)) => { + info!("Error answering legitimate blockchain query: {:?}", e); + return Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR)) + } + }; + if matching_hash.is_some() && peer.common_number < *num { + peer.common_number = *num; + } + if matching_hash.is_none() && num.is_zero() { + trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); + return Err(BadPeer(who, rep::GENESIS_MISMATCH)) + } + if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) { + peer.state = PeerSyncState::AncestorSearch(next_num, next_state); + return Ok(OnBlockData::Request(who, ancestry_request::(next_num))) + } else { + // Ancestry search is complete. Check if peer is on a stale fork unknown to us and + // add it to sync targets if necessary. + trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", + self.best_queued_hash, + self.best_queued_number, + peer.best_hash, + peer.best_number, + matching_hash, + peer.common_number, + ); + if peer.common_number < peer.best_number + && peer.best_number < self.best_queued_number + { + trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who); + self.fork_targets + .entry(peer.best_hash.clone()) + .or_insert_with(|| ForkTarget { + number: peer.best_number, + parent_hash: None, + peers: Default::default(), + }) + .peers.insert(who); + } + peer.state = PeerSyncState::Available; + Vec::new() } - peer.state = PeerSyncState::Available; - Vec::new() } - } - | PeerSyncState::Available - | PeerSyncState::DownloadingJustification(..) - | PeerSyncState::DownloadingFinalityProof(..) => Vec::new() + | PeerSyncState::Available + | PeerSyncState::DownloadingJustification(..) + | PeerSyncState::DownloadingFinalityProof(..) => Vec::new() + } + } else { + // When request.is_none() just acccept blocks + blocks.into_iter().map(|b| { + IncomingBlock { + hash: b.hash, + header: b.header, + body: b.body, + justification: b.justification, + origin: Some(who.clone()), + allow_missing_state: true, + import_existing: false, + } + }).collect() } } else { Vec::new() @@ -1255,10 +1277,15 @@ fn peer_block_request( attrs: &message::BlockAttributes, max_parallel_downloads: u32, finalized: NumberFor, + best_num: NumberFor, ) -> Option<(Range>, BlockRequest)> { if peer.common_number < finalized { return None; } + if best_num >= peer.best_number { + // Will be downloaded as alternative fork instead. + return None; + } if let Some(range) = blocks.needed_blocks( id.clone(), MAX_BLOCKS_TO_REQUEST, @@ -1291,11 +1318,16 @@ fn fork_sync_request( check_block: impl Fn(&B::Hash) -> BlockStatus, ) -> Option<(B::Hash, BlockRequest)> { - targets.retain(|hash, r| if r.number > finalized { + targets.retain(|hash, r| { + if r.number <= finalized { + trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number); + return false; + } + if check_block(hash) != BlockStatus::Unknown { + trace!(target: "sync", "Removed obsolete fork sync request {:?} (#{})", hash, r.number); + return false; + } true - } else { - trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number); - false }); for (hash, r) in targets { if !r.peers.contains(id) { diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 3785335925..5f18a5c10c 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -319,6 +319,11 @@ impl, H: ExHashT> NetworkWorker self.network_service.user_protocol().num_queued_blocks() } + /// Number of active sync requests. + pub fn num_sync_requests(&self) -> usize { + self.network_service.user_protocol().num_sync_requests() + } + /// Adds an address for a node. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { self.network_service.add_known_address(peer_id, addr); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 5912933294..e76e58d4af 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -689,6 +689,9 @@ pub trait TestNetFactory: Sized { if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { return Async::NotReady } + if peer.network.num_sync_requests() != 0 { + return Async::NotReady + } match (highest, peer.client.info().best_hash) { (None, b) => highest = Some(b), (Some(ref a), ref b) if a == b => {},