Track allowed requests for state/warp sync (#10843)

* Track allowed requests for state/warp sync

* Added missing allowed_requests resets

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* fmt

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2022-02-17 12:37:54 +01:00
committed by GitHub
parent 1a61cd40fa
commit f5af00fe1b
+43 -29
View File
@@ -141,12 +141,12 @@ mod rep {
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}
enum PendingRequests {
enum AllowedRequests {
Some(HashSet<PeerId>),
All,
}
impl PendingRequests {
impl AllowedRequests {
fn add(&mut self, id: &PeerId) {
if let Self::Some(ref mut set) = self {
set.insert(*id);
@@ -174,9 +174,13 @@ impl PendingRequests {
Self::All => false,
}
}
fn clear(&mut self) {
std::mem::take(self);
}
}
impl Default for PendingRequests {
impl Default for AllowedRequests {
fn default() -> Self {
Self::Some(HashSet::default())
}
@@ -211,7 +215,7 @@ pub struct ChainSync<B: BlockT> {
/// Fork sync targets.
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
/// A set of peers for which there might be potential block requests
pending_requests: PendingRequests,
allowed_requests: AllowedRequests,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
@@ -549,7 +553,7 @@ impl<B: BlockT> ChainSync<B> {
mode,
queue_blocks: Default::default(),
fork_targets: Default::default(),
pending_requests: Default::default(),
allowed_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
downloaded_blocks: 0,
@@ -730,7 +734,7 @@ impl<B: BlockT> ChainSync<B> {
)
};
self.pending_requests.add(&who);
self.allowed_requests.add(&who);
self.peers.insert(
who,
PeerSync {
@@ -774,7 +778,7 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
},
);
self.pending_requests.add(&who);
self.allowed_requests.add(&who);
Ok(None)
},
}
@@ -841,7 +845,7 @@ impl<B: BlockT> ChainSync<B> {
peer.best_number = number;
peer.best_hash = *hash;
}
self.pending_requests.add(peer_id);
self.allowed_requests.add(peer_id);
}
}
@@ -883,7 +887,7 @@ impl<B: BlockT> ChainSync<B> {
/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ {
if self.pending_requests.is_empty() ||
if self.allowed_requests.is_empty() ||
self.state_sync.is_some() ||
self.mode == SyncMode::Warp
{
@@ -903,11 +907,11 @@ impl<B: BlockT> ChainSync<B> {
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let pending_requests = self.pending_requests.take();
let allowed_requests = self.allowed_requests.take();
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let gap_sync = &mut self.gap_sync;
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() || !pending_requests.contains(id) {
if !peer.state.is_available() || !allowed_requests.contains(id) {
return None
}
@@ -994,7 +998,12 @@ impl<B: BlockT> ChainSync<B> {
/// Get a state request, if any.
pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) {
if self.allowed_requests.is_empty() {
return None
}
if (self.state_sync.is_some() || self.warp_sync.is_some()) &&
self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
{
// Only one pending state request is allowed.
return None
}
@@ -1002,11 +1011,13 @@ impl<B: BlockT> ChainSync<B> {
if sync.is_complete() {
return None
}
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
self.allowed_requests.clear();
return Some((*id, request))
}
}
@@ -1022,6 +1033,7 @@ impl<B: BlockT> ChainSync<B> {
if peer.state.is_available() && peer.best_number >= target {
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
peer.state = PeerSyncState::DownloadingState;
self.allowed_requests.clear();
return Some((*id, request))
}
}
@@ -1032,16 +1044,14 @@ impl<B: BlockT> ChainSync<B> {
/// Get a warp sync request, if any.
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
if self
.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof)
{
// Only one pending state request is allowed.
return None
}
if let Some(sync) = &self.warp_sync {
if sync.is_complete() {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
self.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof)
{
// Only one pending state request is allowed.
return None
}
if let Some(request) = sync.next_warp_poof_request() {
@@ -1054,6 +1064,7 @@ impl<B: BlockT> ChainSync<B> {
if peer.state.is_available() && peer.best_number >= median {
trace!(target: "sync", "New WarpProofRequest for {}", id);
peer.state = PeerSyncState::DownloadingWarpProof;
self.allowed_requests.clear();
return Some((*id, request))
}
}
@@ -1087,7 +1098,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse()
}
self.pending_requests.add(who);
self.allowed_requests.add(who);
if let Some(request) = request {
match &mut peer.state {
PeerSyncState::DownloadingNew(_) => {
@@ -1306,6 +1317,7 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingState = peer.state {
peer.state = PeerSyncState::Available;
self.allowed_requests.set_all();
}
}
let import_result = if let Some(sync) = &mut self.state_sync {
@@ -1368,6 +1380,7 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingWarpProof = peer.state {
peer.state = PeerSyncState::Available;
self.allowed_requests.set_all();
}
}
let import_result = if let Some(sync) = &mut self.warp_sync {
@@ -1448,7 +1461,7 @@ impl<B: BlockT> ChainSync<B> {
return Ok(OnBlockJustification::Nothing)
};
self.pending_requests.add(&who);
self.allowed_requests.add(&who);
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
@@ -1638,7 +1651,7 @@ impl<B: BlockT> ChainSync<B> {
};
}
self.pending_requests.set_all();
self.allowed_requests.set_all();
output.into_iter()
}
@@ -1648,7 +1661,7 @@ impl<B: BlockT> ChainSync<B> {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
self.extra_justifications
.try_finalize_root((hash, number), finalization_result, true);
self.pending_requests.set_all();
self.allowed_requests.set_all();
}
/// Notify about finalization of the given block.
@@ -1675,6 +1688,7 @@ impl<B: BlockT> ChainSync<B> {
);
self.state_sync =
Some(StateSync::new(self.client.clone(), header, *skip_proofs));
self.allowed_requests.set_all();
}
}
}
@@ -1725,7 +1739,7 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = new_common_number;
}
}
self.pending_requests.set_all();
self.allowed_requests.set_all();
}
/// Checks if there is a slot for a block announce validation.
@@ -1994,7 +2008,7 @@ impl<B: BlockT> ChainSync<B> {
peer.update_common_number(number - One::one());
}
}
self.pending_requests.add(&who);
self.allowed_requests.add(&who);
// known block case
if known || self.is_already_downloading(&hash) {
@@ -2060,7 +2074,7 @@ impl<B: BlockT> ChainSync<B> {
}
self.peers.remove(who);
self.extra_justifications.peer_disconnected(who);
self.pending_requests.set_all();
self.allowed_requests.set_all();
self.fork_targets.retain(|_, target| {
target.peers.remove(who);
!target.peers.is_empty()
@@ -2083,7 +2097,7 @@ impl<B: BlockT> ChainSync<B> {
if let Err(e) = self.reset_sync_start_point() {
warn!(target: "sync", "💔 Unable to restart sync: {}", e);
}
self.pending_requests.set_all();
self.allowed_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::take(&mut self.peers);