Fix future-polling loop in availability and add a better early-exit (#2779)

* onto the front

* fix early exit for waiting for requests

* add logging back
This commit is contained in:
Robert Habermeier
2021-03-31 17:35:17 +02:00
committed by GitHub
parent bd9bdf5f5f
commit e65cad69ec
2 changed files with 26 additions and 16 deletions
@@ -18,7 +18,7 @@
#![warn(missing_docs)]
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
@@ -78,7 +78,7 @@ struct RequestFromBackersPhase {
struct RequestChunksPhase {
// a random shuffling of the validators which indicates the order in which we connect to the validators and
// request the chunk from them.
shuffling: Vec<ValidatorIndex>,
shuffling: VecDeque<ValidatorIndex>,
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUnordered<BoxFuture<
'static,
@@ -198,12 +198,25 @@ impl RequestChunksPhase {
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksPhase {
shuffling,
shuffling: shuffling.into(),
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
}
}
fn is_unavailable(&self, params: &InteractionParams) -> bool {
is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
params.threshold,
)
}
fn can_conclude(&self, params: &InteractionParams) -> bool {
self.received_chunks.len() >= params.threshold || self.is_unavailable(params)
}
async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
@@ -211,7 +224,7 @@ impl RequestChunksPhase {
) {
let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
while self.requesting_chunks.len() < max_requests {
if let Some(validator_index) = self.shuffling.pop() {
if let Some(validator_index) = self.shuffling.pop_back() {
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
tracing::trace!(
target: LOG_TARGET,
@@ -255,10 +268,8 @@ impl RequestChunksPhase {
&mut self,
params: &InteractionParams,
) {
// Poll for new updates from requesting_chunks.
while let Poll::Ready(Some(request_result))
= futures::poll!(self.requesting_chunks.next())
{
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
@@ -306,11 +317,15 @@ impl RequestChunksPhase {
match e {
RequestError::InvalidResponse(_) => {}
RequestError::NetworkError(_) | RequestError::Canceled(_) => {
self.shuffling.push(validator_index);
self.shuffling.push_front(validator_index);
}
}
}
}
// Stop waiting for requests when we either can already recover the data
// or have gotten firm 'No' responses from enough validators.
if self.can_conclude(params) { break }
}
}
@@ -320,12 +335,7 @@ impl RequestChunksPhase {
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
loop {
if is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
params.threshold,
) {
if self.is_unavailable(&params) {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,