diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index d98c0d2c04..f31afc828b 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -141,12 +141,12 @@ mod rep { pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response"); } -enum PendingRequests { +enum AllowedRequests { Some(HashSet), All, } -impl PendingRequests { +impl AllowedRequests { fn add(&mut self, id: &PeerId) { if let Self::Some(ref mut set) = self { set.insert(*id); @@ -174,9 +174,13 @@ impl PendingRequests { Self::All => false, } } + + fn clear(&mut self) { + std::mem::take(self); + } } -impl Default for PendingRequests { +impl Default for AllowedRequests { fn default() -> Self { Self::Some(HashSet::default()) } @@ -211,7 +215,7 @@ pub struct ChainSync { /// Fork sync targets. fork_targets: HashMap>, /// A set of peers for which there might be potential block requests - pending_requests: PendingRequests, + allowed_requests: AllowedRequests, /// A type to check incoming block announcements. block_announce_validator: Box + Send>, /// Maximum number of peers to ask the same blocks in parallel. @@ -549,7 +553,7 @@ impl ChainSync { mode, queue_blocks: Default::default(), fork_targets: Default::default(), - pending_requests: Default::default(), + allowed_requests: Default::default(), block_announce_validator, max_parallel_downloads, downloaded_blocks: 0, @@ -730,7 +734,7 @@ impl ChainSync { ) }; - self.pending_requests.add(&who); + self.allowed_requests.add(&who); self.peers.insert( who, PeerSync { @@ -774,7 +778,7 @@ impl ChainSync { state: PeerSyncState::Available, }, ); - self.pending_requests.add(&who); + self.allowed_requests.add(&who); Ok(None) }, } @@ -841,7 +845,7 @@ impl ChainSync { peer.best_number = number; peer.best_hash = *hash; } - self.pending_requests.add(peer_id); + self.allowed_requests.add(peer_id); } } @@ -883,7 +887,7 @@ impl ChainSync { /// Get an iterator over all block requests of all peers. pub fn block_requests(&mut self) -> impl Iterator)> + '_ { - if self.pending_requests.is_empty() || + if self.allowed_requests.is_empty() || self.state_sync.is_some() || self.mode == SyncMode::Warp { @@ -903,11 +907,11 @@ impl ChainSync { let best_queued = self.best_queued_number; let client = &self.client; let queue = &self.queue_blocks; - let pending_requests = self.pending_requests.take(); + let allowed_requests = self.allowed_requests.take(); let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads }; let gap_sync = &mut self.gap_sync; let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { - if !peer.state.is_available() || !pending_requests.contains(id) { + if !peer.state.is_available() || !allowed_requests.contains(id) { return None } @@ -994,7 +998,12 @@ impl ChainSync { /// Get a state request, if any. pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> { - if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { + if self.allowed_requests.is_empty() { + return None + } + if (self.state_sync.is_some() || self.warp_sync.is_some()) && + self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) + { // Only one pending state request is allowed. return None } @@ -1002,11 +1011,13 @@ impl ChainSync { if sync.is_complete() { return None } + for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.common_number >= sync.target_block_num() { peer.state = PeerSyncState::DownloadingState; let request = sync.next_request(); trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); + self.allowed_requests.clear(); return Some((*id, request)) } } @@ -1022,6 +1033,7 @@ impl ChainSync { if peer.state.is_available() && peer.best_number >= target { trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); peer.state = PeerSyncState::DownloadingState; + self.allowed_requests.clear(); return Some((*id, request)) } } @@ -1032,16 +1044,14 @@ impl ChainSync { /// Get a warp sync request, if any. pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { - if self - .peers - .iter() - .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof) - { - // Only one pending state request is allowed. - return None - } if let Some(sync) = &self.warp_sync { - if sync.is_complete() { + if self.allowed_requests.is_empty() || + sync.is_complete() || + self.peers + .iter() + .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof) + { + // Only one pending state request is allowed. return None } if let Some(request) = sync.next_warp_poof_request() { @@ -1054,6 +1064,7 @@ impl ChainSync { if peer.state.is_available() && peer.best_number >= median { trace!(target: "sync", "New WarpProofRequest for {}", id); peer.state = PeerSyncState::DownloadingWarpProof; + self.allowed_requests.clear(); return Some((*id, request)) } } @@ -1087,7 +1098,7 @@ impl ChainSync { trace!(target: "sync", "Reversing incoming block list"); blocks.reverse() } - self.pending_requests.add(who); + self.allowed_requests.add(who); if let Some(request) = request { match &mut peer.state { PeerSyncState::DownloadingNew(_) => { @@ -1306,6 +1317,7 @@ impl ChainSync { if let Some(peer) = self.peers.get_mut(&who) { if let PeerSyncState::DownloadingState = peer.state { peer.state = PeerSyncState::Available; + self.allowed_requests.set_all(); } } let import_result = if let Some(sync) = &mut self.state_sync { @@ -1368,6 +1380,7 @@ impl ChainSync { if let Some(peer) = self.peers.get_mut(&who) { if let PeerSyncState::DownloadingWarpProof = peer.state { peer.state = PeerSyncState::Available; + self.allowed_requests.set_all(); } } let import_result = if let Some(sync) = &mut self.warp_sync { @@ -1448,7 +1461,7 @@ impl ChainSync { return Ok(OnBlockJustification::Nothing) }; - self.pending_requests.add(&who); + self.allowed_requests.add(&who); if let PeerSyncState::DownloadingJustification(hash) = peer.state { peer.state = PeerSyncState::Available; @@ -1638,7 +1651,7 @@ impl ChainSync { }; } - self.pending_requests.set_all(); + self.allowed_requests.set_all(); output.into_iter() } @@ -1648,7 +1661,7 @@ impl ChainSync { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; self.extra_justifications .try_finalize_root((hash, number), finalization_result, true); - self.pending_requests.set_all(); + self.allowed_requests.set_all(); } /// Notify about finalization of the given block. @@ -1675,6 +1688,7 @@ impl ChainSync { ); self.state_sync = Some(StateSync::new(self.client.clone(), header, *skip_proofs)); + self.allowed_requests.set_all(); } } } @@ -1725,7 +1739,7 @@ impl ChainSync { peer.common_number = new_common_number; } } - self.pending_requests.set_all(); + self.allowed_requests.set_all(); } /// Checks if there is a slot for a block announce validation. @@ -1994,7 +2008,7 @@ impl ChainSync { peer.update_common_number(number - One::one()); } } - self.pending_requests.add(&who); + self.allowed_requests.add(&who); // known block case if known || self.is_already_downloading(&hash) { @@ -2060,7 +2074,7 @@ impl ChainSync { } self.peers.remove(who); self.extra_justifications.peer_disconnected(who); - self.pending_requests.set_all(); + self.allowed_requests.set_all(); self.fork_targets.retain(|_, target| { target.peers.remove(who); !target.peers.is_empty() @@ -2083,7 +2097,7 @@ impl ChainSync { if let Err(e) = self.reset_sync_start_point() { warn!(target: "sync", "💔 Unable to restart sync: {}", e); } - self.pending_requests.set_all(); + self.allowed_requests.set_all(); debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); let old_peers = std::mem::take(&mut self.peers);