diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 3a0d3e761f..9f97c87bed 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -18,7 +18,7 @@ #![warn(missing_docs)] -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; @@ -78,7 +78,7 @@ struct RequestFromBackersPhase { struct RequestChunksPhase { // a random shuffling of the validators which indicates the order in which we connect to the validators and // request the chunk from them. - shuffling: Vec, + shuffling: VecDeque, received_chunks: HashMap, requesting_chunks: FuturesUnordered bool { + is_unavailable( + self.received_chunks.len(), + self.requesting_chunks.len(), + self.shuffling.len(), + params.threshold, + ) + } + + fn can_conclude(&self, params: &InteractionParams) -> bool { + self.received_chunks.len() >= params.threshold || self.is_unavailable(params) + } + async fn launch_parallel_requests( &mut self, params: &InteractionParams, @@ -211,7 +224,7 @@ impl RequestChunksPhase { ) { let max_requests = std::cmp::min(N_PARALLEL, params.threshold); while self.requesting_chunks.len() < max_requests { - if let Some(validator_index) = self.shuffling.pop() { + if let Some(validator_index) = self.shuffling.pop_back() { let validator = params.validator_authority_keys[validator_index.0 as usize].clone(); tracing::trace!( target: LOG_TARGET, @@ -255,10 +268,8 @@ impl RequestChunksPhase { &mut self, params: &InteractionParams, ) { - // Poll for new updates from requesting_chunks. - while let Poll::Ready(Some(request_result)) - = futures::poll!(self.requesting_chunks.next()) - { + // Wait for all current requests to conclude or time-out, or until we reach enough chunks. + while let Some(request_result) = self.requesting_chunks.next().await { match request_result { Ok(Some(chunk)) => { // Check merkle proofs of any received chunks. @@ -306,11 +317,15 @@ impl RequestChunksPhase { match e { RequestError::InvalidResponse(_) => {} RequestError::NetworkError(_) | RequestError::Canceled(_) => { - self.shuffling.push(validator_index); + self.shuffling.push_front(validator_index); } } } } + + // Stop waiting for requests when we either can already recover the data + // or have gotten firm 'No' responses from enough validators. + if self.can_conclude(params) { break } } } @@ -320,12 +335,7 @@ impl RequestChunksPhase { sender: &mut impl SubsystemSender, ) -> Result { loop { - if is_unavailable( - self.received_chunks.len(), - self.requesting_chunks.len(), - self.shuffling.len(), - params.threshold, - ) { + if self.is_unavailable(¶ms) { tracing::debug!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 01e839710e..609282789e 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -133,7 +133,7 @@ Loop: * If the phase is `InteractionPhase::RequestChunks`: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. - * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, push onto the back of `shuffling` to be retried. + * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried. * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`. * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`, * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`.