From 08d5b268a059c50870fe9801d75f006c16da06d2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 30 Mar 2021 17:33:38 +0200 Subject: [PATCH] Retry availability until the receiver of the request is dropped (#2763) * guide updates * keep interactions alive until receivers drop * retry indefinitely * cancel approval tasks on finality * use swap_remove instead of remove --- polkadot/node/core/approval-voting/src/lib.rs | 54 +++- .../approval-voting/src/persisted_entries.rs | 5 + .../availability-recovery/src/error.rs | 5 +- .../network/availability-recovery/src/lib.rs | 305 ++++++++---------- .../availability-recovery/src/tests.rs | 189 ++++++++--- .../availability/availability-recovery.md | 49 ++- 6 files changed, 341 insertions(+), 266 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 5b60554d24..f7a1e4b078 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -53,6 +53,7 @@ use sp_application_crypto::Pair; use kvdb::KeyValueDB; use futures::prelude::*; +use futures::future::RemoteHandle; use futures::channel::{mpsc, oneshot}; use std::collections::{BTreeMap, HashMap}; @@ -444,6 +445,7 @@ enum Action { WriteCandidateEntry(CandidateHash, CandidateEntry), LaunchApproval { indirect_cert: IndirectAssignmentCert, + relay_block_number: BlockNumber, candidate_index: CandidateIndex, session: SessionIndex, candidate: CandidateReceipt, @@ -452,6 +454,8 @@ enum Action { Conclude, } +type BackgroundTaskMap = BTreeMap>>; + async fn run( mut ctx: C, subsystem: ApprovalVotingSubsystem, @@ -472,6 +476,9 @@ async fn run( let mut wakeups = Wakeups::default(); + // map block numbers to background work. + let mut background_tasks = BTreeMap::new(); + let mut last_finalized_height: Option = None; let mut background_rx = background_rx.fuse(); @@ -489,7 +496,7 @@ async fn run( )? } next_msg = ctx.recv().fuse() => { - handle_from_overseer( + let actions = handle_from_overseer( &mut ctx, &mut state, &subsystem.metrics, @@ -497,7 +504,13 @@ async fn run( next_msg?, &mut last_finalized_height, &wakeups, - ).await? + ).await?; + + if let Some(finalized_height) = last_finalized_height { + cleanup_background_tasks(finalized_height, &mut background_tasks); + } + + actions } background_request = background_rx.next().fuse() => { if let Some(req) = background_request { @@ -519,6 +532,7 @@ async fn run( &mut wakeups, db_writer, &background_tx, + &mut background_tasks, actions, ).await? { break; @@ -535,6 +549,7 @@ async fn handle_actions( wakeups: &mut Wakeups, db: &dyn KeyValueDB, background_tx: &mpsc::Sender, + background_tasks: &mut BackgroundTaskMap, actions: impl IntoIterator, ) -> SubsystemResult { let mut transaction = approval_db::v1::Transaction::default(); @@ -555,6 +570,7 @@ async fn handle_actions( } Action::LaunchApproval { indirect_cert, + relay_block_number, candidate_index, session, candidate, @@ -569,7 +585,7 @@ async fn handle_actions( candidate_index, ).into()); - launch_approval( + let handle = launch_approval( ctx, background_tx.clone(), session, @@ -578,7 +594,11 @@ async fn handle_actions( block_hash, candidate_index as _, backing_group, - ).await? + ).await?; + + if let Some(handle) = handle { + background_tasks.entry(relay_block_number).or_default().push(handle); + } } Action::Conclude => { conclude = true; } } @@ -594,6 +614,19 @@ async fn handle_actions( Ok(conclude) } +// Clean up all background tasks which are no longer needed as they correspond to a +// finalized block. +fn cleanup_background_tasks( + current_finalized_block: BlockNumber, + tasks: &mut BackgroundTaskMap, +) { + let after = tasks.split_off(&(current_finalized_block + 1)); + *tasks = after; + + // tasks up to the finalized block are dropped, and `RemoteHandle` cancels + // the task on drop. +} + // Handle an incoming signal from the overseer. Returns true if execution should conclude. async fn handle_from_overseer( ctx: &mut impl SubsystemContext, @@ -1533,6 +1566,7 @@ fn process_wakeup( // sanity: should always be present. actions.push(Action::LaunchApproval { indirect_cert, + relay_block_number: block_entry.block_number(), candidate_index: i as _, session: block_entry.session(), candidate: candidate_entry.candidate_receipt().clone(), @@ -1566,6 +1600,9 @@ fn process_wakeup( Ok(actions) } +// Launch approval work, returning an `AbortHandle` which corresponds to the background task +// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped +// to cancel the background work and any requests it has spawned. async fn launch_approval( ctx: &mut impl SubsystemContext, mut background_tx: mpsc::Sender, @@ -1575,7 +1612,7 @@ async fn launch_approval( block_hash: Hash, candidate_index: usize, backing_group: GroupIndex, -) -> SubsystemResult<()> { +) -> SubsystemResult>> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); let (context_num_tx, context_num_rx) = oneshot::channel(); @@ -1610,7 +1647,7 @@ async fn launch_approval( candidate.descriptor.relay_parent, ); - return Ok(()); + return Ok(None); } }; @@ -1719,7 +1756,10 @@ async fn launch_approval( } }; - ctx.spawn("approval-checks", Box::pin(background)).await + let (background, remote_handle) = background.remote_handle(); + ctx.spawn("approval-checks", Box::pin(background)) + .await + .map(move |()| Some(remote_handle)) } // Issue and import a local approval vote. Should only be invoked after approval checks diff --git a/polkadot/node/core/approval-voting/src/persisted_entries.rs b/polkadot/node/core/approval-voting/src/persisted_entries.rs index 855e1e8b29..715e959981 100644 --- a/polkadot/node/core/approval-voting/src/persisted_entries.rs +++ b/polkadot/node/core/approval-voting/src/persisted_entries.rs @@ -397,6 +397,11 @@ impl BlockEntry { pub fn candidates(&self) -> &[(CoreIndex, CandidateHash)] { &self.candidates } + + /// Access the block number of the block entry. + pub fn block_number(&self) -> BlockNumber { + self.block_number + } } impl From for BlockEntry { diff --git a/polkadot/node/network/availability-recovery/src/error.rs b/polkadot/node/network/availability-recovery/src/error.rs index bf9a3125b3..50a596ba2a 100644 --- a/polkadot/node/network/availability-recovery/src/error.rs +++ b/polkadot/node/network/availability-recovery/src/error.rs @@ -16,7 +16,7 @@ //! The `Error` and `Result` types used by the subsystem. -use futures::channel::{mpsc, oneshot}; +use futures::channel::oneshot; use thiserror::Error; /// Error type used by the Availability Recovery subsystem. @@ -34,9 +34,6 @@ pub enum Error { #[error("failed to send response")] CanceledResponseSender, - #[error("to_state channel is closed")] - ClosedToState(#[source] mpsc::SendError), - #[error(transparent)] Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError), diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 2033521bdc..3a0d3e761f 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -19,9 +19,11 @@ #![warn(missing_docs)] use std::collections::HashMap; +use std::pin::Pin; -use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered}; -use futures::future::BoxFuture; +use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; +use futures::future::{BoxFuture, RemoteHandle, FutureExt}; +use futures::task::{Context, Poll}; use lru::LruCache; use rand::seq::SliceRandom; @@ -33,7 +35,7 @@ use polkadot_primitives::v1::{ use polkadot_node_primitives::{ErasureChunk, AvailableData}; use polkadot_subsystem::{ SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer, - OverseerSignal, ActiveLeavesUpdate, + OverseerSignal, ActiveLeavesUpdate, SubsystemSender, errors::RecoveryError, jaeger, messages::{ @@ -67,21 +69,6 @@ pub struct AvailabilityRecoverySubsystem { fast_path: bool, } -/// Accumulate all awaiting sides for some particular `AvailableData`. -struct InteractionHandle { - awaiting: Vec>>, -} - -/// A message received by main code from an async `Interaction` task. -#[derive(Debug)] -enum FromInteraction { - /// An interaction concluded. - Concluded(CandidateHash, Result), - - /// Send a request on the network service. - NetworkRequest(Requests), -} - struct RequestFromBackersPhase { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. @@ -95,7 +82,7 @@ struct RequestChunksPhase { received_chunks: HashMap, requesting_chunks: FuturesUnordered, RequestError>>, + Result, (ValidatorIndex, RequestError)>>, >, } @@ -122,9 +109,8 @@ enum InteractionPhase { } /// A state of a single interaction reconstructing an available data. -struct Interaction { - /// A communication channel with the `State`. - to_state: mpsc::Sender, +struct Interaction { + sender: S, /// The parameters of the interaction. params: InteractionParams, @@ -142,13 +128,12 @@ impl RequestFromBackersPhase { } } - // Run this phase to completion, returning `true` if data was successfully recovered and - // false otherwise. + // Run this phase to completion. async fn run( &mut self, params: &InteractionParams, - to_state: &mut mpsc::Sender - ) -> Result { + sender: &mut impl SubsystemSender, + ) -> Result { tracing::trace!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, @@ -158,7 +143,7 @@ impl RequestFromBackersPhase { loop { // Pop the next backer, and proceed to next phase if we're out. let validator_index = match self.shuffled_backers.pop() { - None => return Ok(false), + None => return Err(RecoveryError::Unavailable), Some(i) => i, }; @@ -168,21 +153,21 @@ impl RequestFromBackersPhase { req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash }, ); - to_state.send(FromInteraction::NetworkRequest(Requests::AvailableDataFetching(req))).await?; + sender.send_message(NetworkBridgeMessage::SendRequests( + vec![Requests::AvailableDataFetching(req)], + IfDisconnected::TryConnect, + ).into()).await; match res.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { if reconstructed_data_matches_root(params.validators.len(), ¶ms.erasure_root, &data) { - to_state.send( - FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data)) - ).await?; - tracing::trace!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, "Received full data", ); - return Ok(true); + + return Ok(data); } else { tracing::debug!( target: LOG_TARGET, @@ -222,8 +207,8 @@ impl RequestChunksPhase { async fn launch_parallel_requests( &mut self, params: &InteractionParams, - to_state: &mut mpsc::Sender, - ) -> Result<(), mpsc::SendError> { + sender: &mut impl SubsystemSender, + ) { let max_requests = std::cmp::min(N_PARALLEL, params.threshold); while self.requesting_chunks.len() < max_requests { if let Some(validator_index) = self.shuffling.pop() { @@ -247,39 +232,36 @@ impl RequestChunksPhase { raw_request.clone(), ); - to_state.send(FromInteraction::NetworkRequest(Requests::ChunkFetching(req))).await?; + sender.send_message(NetworkBridgeMessage::SendRequests( + vec![Requests::ChunkFetching(req)], + IfDisconnected::TryConnect, + ).into()).await; self.requesting_chunks.push(Box::pin(async move { match res.await { Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => Ok(Some(chunk.recombine_into_chunk(&raw_request))), Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None), - Err(e) => Err(e), + Err(e) => Err((validator_index, e)), } })); } else { break; } } - - Ok(()) } async fn wait_for_chunks( &mut self, params: &InteractionParams, - ) -> Result<(), mpsc::SendError> { - // Check if the requesting chunks is not empty not to poll to completion. - if self.requesting_chunks.is_empty() { - return Ok(()); - } - + ) { // Poll for new updates from requesting_chunks. - while let Some(request_result) = self.requesting_chunks.next().await { + while let Poll::Ready(Some(request_result)) + = futures::poll!(self.requesting_chunks.next()) + { match request_result { Ok(Some(chunk)) => { - // Check merkle proofs of any received chunks, and any failures should - // lead to issuance of a FromInteraction::ReportPeer message. + // Check merkle proofs of any received chunks. let validator_index = chunk.index; @@ -313,24 +295,30 @@ impl RequestChunksPhase { } } Ok(None) => {} - Err(e) => { + Err((validator_index, e)) => { tracing::debug!( target: LOG_TARGET, err = ?e, + ?validator_index, "Failure requesting chunk", ); + + match e { + RequestError::InvalidResponse(_) => {} + RequestError::NetworkError(_) | RequestError::Canceled(_) => { + self.shuffling.push(validator_index); + } + } } } } - - Ok(()) } async fn run( &mut self, params: &InteractionParams, - to_state: &mut mpsc::Sender, - ) -> Result<(), mpsc::SendError> { + sender: &mut impl SubsystemSender, + ) -> Result { loop { if is_unavailable( self.received_chunks.len(), @@ -347,23 +335,18 @@ impl RequestChunksPhase { n_validators = %params.validators.len(), "Data recovery is not possible", ); - to_state.send(FromInteraction::Concluded( - params.candidate_hash, - Err(RecoveryError::Unavailable), - )).await?; - return Ok(()); + return Err(RecoveryError::Unavailable); } - self.launch_parallel_requests(params, to_state).await?; - self.wait_for_chunks(params).await?; + self.launch_parallel_requests(params, sender).await; + self.wait_for_chunks(params).await; // If received_chunks has more than threshold entries, attempt to recover the data. // If that fails, or a re-encoding of it doesn't match the expected erasure root, - // break and issue a FromInteraction::Concluded(RecoveryError::Invalid). - // Otherwise, issue a FromInteraction::Concluded(Ok(())). + // return Err(RecoveryError::Invalid) if self.received_chunks.len() >= params.threshold { - let concluded = match polkadot_erasure_coding::reconstruct_v1( + return match polkadot_erasure_coding::reconstruct_v1( params.validators.len(), self.received_chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)), ) { @@ -375,7 +358,8 @@ impl RequestChunksPhase { erasure_root = ?params.erasure_root, "Data recovery complete", ); - FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data)) + + Ok(data) } else { tracing::trace!( target: LOG_TARGET, @@ -383,10 +367,8 @@ impl RequestChunksPhase { erasure_root = ?params.erasure_root, "Data recovery - root mismatch", ); - FromInteraction::Concluded( - params.candidate_hash.clone(), - Err(RecoveryError::Invalid), - ) + + Err(RecoveryError::Invalid) } } Err(err) => { @@ -397,15 +379,10 @@ impl RequestChunksPhase { ?err, "Data recovery error ", ); - FromInteraction::Concluded( - params.candidate_hash.clone(), - Err(RecoveryError::Invalid), - ) + + Err(RecoveryError::Invalid) }, }; - - to_state.send(concluded).await?; - return Ok(()); } } } @@ -442,59 +419,99 @@ fn reconstructed_data_matches_root( branches.root() == *expected_root } -impl Interaction { - async fn run(mut self) -> error::Result<()> { +impl Interaction { + async fn run(mut self) -> Result { loop { // These only fail if we cannot reach the underlying subsystem, which case there is nothing // meaningful we can do. match self.phase { InteractionPhase::RequestFromBackers(ref mut from_backers) => { - if from_backers.run(&self.params, &mut self.to_state).await - .map_err(error::Error::ClosedToState)? - { - break Ok(()) - } else { - self.phase = InteractionPhase::RequestChunks( - RequestChunksPhase::new(self.params.validators.len() as _) - ); + match from_backers.run(&self.params, &mut self.sender).await { + Ok(data) => break Ok(data), + Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), + Err(RecoveryError::Unavailable) => { + self.phase = InteractionPhase::RequestChunks( + RequestChunksPhase::new(self.params.validators.len() as _) + ) + } } } InteractionPhase::RequestChunks(ref mut from_all) => { - break from_all.run(&self.params, &mut self.to_state).await - .map_err(error::Error::ClosedToState) + break from_all.run(&self.params, &mut self.sender).await; } } } } } +/// Accumulate all awaiting sides for some particular `AvailableData`. +struct InteractionHandle { + candidate_hash: CandidateHash, + remote: RemoteHandle>, + awaiting: Vec>>, +} + +impl Future for InteractionHandle { + type Output = Option<(CandidateHash, Result)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut indices_to_remove = Vec::new(); + for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() { + if let Poll::Ready(()) = awaiting.poll_canceled(cx) { + indices_to_remove.push(i); + } + } + + // these are reverse order, so remove is fine. + for index in indices_to_remove { + tracing::debug!( + target: LOG_TARGET, + candidate_hash = ?self.candidate_hash, + "Receiver for available data dropped.", + ); + + self.awaiting.swap_remove(index); + } + + if self.awaiting.is_empty() { + tracing::debug!( + target: LOG_TARGET, + candidate_hash = ?self.candidate_hash, + "All receivers for available data dropped.", + ); + + return Poll::Ready(None); + } + + let remote = &mut self.remote; + futures::pin_mut!(remote); + let result = futures::ready!(remote.poll(cx)); + + for awaiting in self.awaiting.drain(..) { + let _ = awaiting.send(result.clone()); + } + + Poll::Ready(Some((self.candidate_hash, result))) + } +} + struct State { /// Each interaction is implemented as its own async task, /// and these handles are for communicating with them. - interactions: HashMap, + interactions: FuturesUnordered, /// A recent block hash for which state should be available. live_block: (BlockNumber, Hash), - /// interaction communication. This is cloned and given to interactions that are spun up. - from_interaction_tx: mpsc::Sender, - - /// receiver for messages from interactions. - from_interaction_rx: mpsc::Receiver, - /// An LRU cache of recently recovered data. availability_lru: LruCache>, } impl Default for State { fn default() -> Self { - let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16); - Self { - interactions: HashMap::new(), + interactions: FuturesUnordered::new(), live_block: (0, Hash::default()), - from_interaction_tx, - from_interaction_rx, availability_lru: LruCache::new(LRU_SIZE), } } @@ -546,15 +563,7 @@ async fn launch_interaction( backing_group: Option, response_sender: oneshot::Sender>, ) -> error::Result<()> { - let to_state = state.from_interaction_tx.clone(); - let candidate_hash = receipt.hash(); - state.interactions.insert( - candidate_hash.clone(), - InteractionHandle { - awaiting: vec![response_sender], - } - ); let params = InteractionParams { validator_authority_keys: session_info.discovery_keys.clone(), @@ -574,22 +583,20 @@ async fn launch_interaction( )); let interaction = Interaction { - to_state, + sender: ctx.sender().clone(), params, phase, }; - let future = async move { - if let Err(e) = interaction.run().await { - tracing::debug!( - target: LOG_TARGET, - err = ?e, - "Interaction finished with an error", - ); - } - }.boxed(); + let (remote, remote_handle) = interaction.run().remote_handle(); - if let Err(e) = ctx.spawn("recovery interaction", future).await { + state.interactions.push(InteractionHandle { + candidate_hash, + remote: remote_handle, + awaiting: vec![response_sender], + }); + + if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)).await { tracing::warn!( target: LOG_TARGET, err = ?e, @@ -626,8 +633,8 @@ async fn handle_recover( return Ok(()); } - if let Some(interaction) = state.interactions.get_mut(&candidate_hash) { - interaction.awaiting.push(response_sender); + if let Some(i) = state.interactions.iter_mut().find(|i| i.candidate_hash == candidate_hash) { + i.awaiting.push(response_sender); return Ok(()); } @@ -678,48 +685,6 @@ async fn query_full_data( Ok(rx.await.map_err(error::Error::CanceledQueryFullData)?) } -/// Handles message from interaction. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_from_interaction( - state: &mut State, - ctx: &mut impl SubsystemContext, - from_interaction: FromInteraction, -) -> error::Result<()> { - match from_interaction { - FromInteraction::Concluded(candidate_hash, result) => { - // Load the entry from the interactions map. - // It should always exist, if not for logic errors. - if let Some(interaction) = state.interactions.remove(&candidate_hash) { - // Send the result to each member of awaiting. - for awaiting in interaction.awaiting { - if let Err(_) = awaiting.send(result.clone()) { - tracing::debug!( - target: LOG_TARGET, - "An awaiting side of the interaction has been canceled", - ); - } - } - } else { - tracing::warn!( - target: LOG_TARGET, - "Interaction under candidate hash {} is missing", - candidate_hash, - ); - } - - state.availability_lru.put(candidate_hash, result); - } - FromInteraction::NetworkRequest(request) => { - ctx.send_message(NetworkBridgeMessage::SendRequests( - vec![request], - IfDisconnected::TryConnect, - ).into()).await; - } - } - - Ok(()) -} - impl AvailabilityRecoverySubsystem { /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers. pub fn with_fast_path() -> Self { @@ -790,19 +755,9 @@ impl AvailabilityRecoverySubsystem { } } } - from_interaction = state.from_interaction_rx.next() => { - if let Some(from_interaction) = from_interaction { - if let Err(e) = handle_from_interaction( - &mut state, - &mut ctx, - from_interaction, - ).await { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Error handling message from interaction", - ); - } + output = state.interactions.next() => { + if let Some((candidate_hash, result)) = output.flatten() { + state.availability_lru.put(candidate_hash, result); } } } diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index 70a9240874..ff14cf0706 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -141,11 +141,19 @@ async fn overseer_recv( use sp_keyring::Sr25519Keyring; -#[derive(Debug, Clone)] +#[derive(Debug)] enum Has { No, Yes, - Timeout, + NetworkError(sc_network::RequestFailure), +} + +impl Has { + fn timeout() -> Self { + Has::NetworkError(sc_network::RequestFailure::Network( + sc_network::OutboundFailure::Timeout + )) + } } #[derive(Clone)] @@ -172,18 +180,6 @@ impl TestState { self.validators.len() - self.threshold() + 1 } - fn all_have(&self) -> Vec { - (0..self.validators.len()).map(|_| Has::Yes).collect() - } - - fn all_dont_have(&self) -> Vec { - (0..self.validators.len()).map(|_| Has::Yes).collect() - } - - fn all_timeout(&self) -> Vec { - (0..self.validators.len()).map(|_| Has::Timeout).collect() - } - async fn test_runtime_api( &self, virtual_overseer: &mut VirtualOverseer, @@ -216,7 +212,7 @@ impl TestState { candidate_hash: CandidateHash, virtual_overseer: &mut VirtualOverseer, n: usize, - who_has: &[Has], + who_has: impl Fn(usize) -> Has, ) { // arbitrary order. for _ in 0..n { @@ -237,14 +233,10 @@ impl TestState { assert_eq!(req.payload.candidate_hash, candidate_hash); let validator_index = req.payload.index.0 as usize; - let available_data = match who_has[validator_index] { + let available_data = match who_has(validator_index) { Has::No => Ok(None), Has::Yes => Ok(Some(self.chunks[validator_index].clone().into())), - Has::Timeout => { - Err(sc_network::RequestFailure::Network( - sc_network::OutboundFailure::Timeout - )) - } + Has::NetworkError(e) => Err(e), }; let _ = req.pending_response.send( @@ -263,7 +255,7 @@ impl TestState { &self, candidate_hash: CandidateHash, virtual_overseer: &mut VirtualOverseer, - who_has: &[Has], + who_has: impl Fn(usize) -> Has, ) { for _ in 0..self.validators.len() { // Receive a request for a chunk. @@ -286,27 +278,21 @@ impl TestState { .position(|a| Recipient::Authority(a.clone()) == req.peer) .unwrap(); - let available_data = match who_has[validator_index] { + let available_data = match who_has(validator_index) { Has::No => Ok(None), Has::Yes => Ok(Some(self.available_data.clone())), - Has::Timeout => { - Err(sc_network::RequestFailure::Network( - sc_network::OutboundFailure::Timeout - )) - } + Has::NetworkError(e) => Err(e), }; + let done = available_data.as_ref().ok().map_or(false, |x| x.is_some()); + let _ = req.pending_response.send( available_data.map(|r| req_res::v1::AvailableDataFetchingResponse::from(r).encode() ) ); - match who_has[validator_index].clone() { - Has::Yes => break, // done - Has::No => {} - Has::Timeout => {} - } + if done { break } } ) } @@ -448,7 +434,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { candidate_hash, &mut virtual_overseer, test_state.threshold(), - &test_state.all_have(), + |_| Has::Yes, ).await; // Recovered data should match the original one. @@ -477,7 +463,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { new_candidate.hash(), &mut virtual_overseer, test_state.impossibility_threshold(), - &test_state.all_dont_have(), + |_| Has::No, ).await; // A request times out with `Unavailable` error. @@ -524,7 +510,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk candidate_hash, &mut virtual_overseer, test_state.threshold(), - &test_state.all_have(), + |_| Has::Yes, ).await; // Recovered data should match the original one. @@ -553,7 +539,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk new_candidate.hash(), &mut virtual_overseer, test_state.impossibility_threshold(), - &test_state.all_dont_have(), + |_| Has::No, ).await; // A request times out with `Unavailable` error. @@ -607,7 +593,7 @@ fn bad_merkle_path_leads_to_recovery_error() { candidate_hash, &mut virtual_overseer, test_state.impossibility_threshold(), - &test_state.all_have(), + |_| Has::Yes, ).await; // A request times out with `Unavailable` error. @@ -656,14 +642,11 @@ fn wrong_chunk_index_leads_to_recovery_error() { test_state.chunks[3] = test_state.chunks[0].clone(); test_state.chunks[4] = test_state.chunks[0].clone(); - let mut have = test_state.all_dont_have(); - have[0] = Has::No; - test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, test_state.impossibility_threshold(), - &have, + |_| Has::No, ).await; // A request times out with `Unavailable` error as there are no good peers. @@ -726,7 +709,7 @@ fn invalid_erasure_coding_leads_to_invalid_error() { candidate_hash, &mut virtual_overseer, test_state.threshold(), - &test_state.all_have(), + |_| Has::Yes, ).await; // f+1 'valid' chunks can't produce correct data. @@ -769,13 +752,15 @@ fn fast_path_backing_group_recovers() { let candidate_hash = test_state.candidate.hash(); - let mut who_has = test_state.all_dont_have(); - who_has[3] = Has::Yes; + let who_has = |i| match i { + 3 => Has::Yes, + _ => Has::No, + }; test_state.test_full_data_requests( candidate_hash, &mut virtual_overseer, - &who_has, + who_has, ).await; // Recovered data should match the original one. @@ -819,24 +804,124 @@ fn no_answers_in_fast_path_causes_chunk_requests() { let candidate_hash = test_state.candidate.hash(); // mix of timeout and no. - let mut who_has = test_state.all_timeout(); - who_has[0] = Has::No; - who_has[3] = Has::No; - + let who_has = |i| match i { + 0 | 3 => Has::No, + _ => Has::timeout(), + }; test_state.test_full_data_requests( candidate_hash, &mut virtual_overseer, - &who_has, + who_has, ).await; test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, test_state.threshold(), - &test_state.all_have(), + |_| Has::Yes, ).await; // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); }); } + +#[test] +fn task_canceled_when_receivers_dropped() { + let test_state = TestState::default(); + + test_harness_chunks_only(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], + deactivated: smallvec![], + }), + ).await; + + let (tx, _) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + None, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + for _ in 0..test_state.validators.len() { + match virtual_overseer.recv().timeout(TIMEOUT).await { + None => return, + Some(_) => continue, + } + } + + panic!("task requested all validators without concluding") + }); +} + +#[test] +fn chunks_retry_until_all_nodes_respond() { + let test_state = TestState::default(); + + test_harness_chunks_only(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], + deactivated: smallvec![], + }), + ).await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + Some(GroupIndex(0)), + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + let candidate_hash = test_state.candidate.hash(); + + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.validators.len(), + |_| Has::timeout(), + ).await; + + // we get to go another round! + + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.impossibility_threshold(), + |_| Has::No, + ).await; + + // Recovered data should match the original one. + assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + }); +} diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 86026b67cc..01e839710e 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -25,31 +25,27 @@ We hold a state which tracks the current recovery interactions we have live, as ```rust struct State { - /// Each interaction is implemented as its own async task, and these handles are for communicating with them. - interactions: Map, + /// Each interaction is implemented as its own remote async task, and these handles are remote + /// for it. + interactions: FuturesUnordered, + /// A multiplexer over receivers from live interactions. + interaction_receivers: FuturesUnordered>, /// A recent block hash for which state should be available. live_block_hash: Hash, - - /// interaction communication. This is cloned and given to interactions that are spun up. - from_interaction_tx: Sender, - /// receiver for messages from interactions. - from_interaction_rx: Receiver, - // An LRU cache of recently recovered data. availability_lru: LruCache>, } +/// This is a future, which concludes either when a response is received from the interaction, +/// or all the `awaiting` channels have closed. struct InteractionHandle { + candidate_hash: CandidateHash, + interaction_response: RemoteHandle, awaiting: Vec>>, } struct Unavailable; -enum FromInteraction { - // An interaction concluded. - Concluded(CandidateHash, Result), - // Send a request on the network. - NetworkRequest(Requests), -} +struct Concluded(CandidateHash, Result); struct InteractionParams { validator_authority_keys: Vec, @@ -71,12 +67,12 @@ enum InteractionPhase { // request the chunk from them. shuffling: Vec, received_chunks: Map, - requesting_chunks: FuturesUnordered>>, + requesting_chunks: FuturesUnordered>, } } struct Interaction { - to_state: Sender, + to_subsystems: SubsystemSender, params: InteractionParams, phase: InteractionPhase, } @@ -104,10 +100,6 @@ On `Conclude`, shut down the subsystem. 1. Load the entry from the `interactions` map. It should always exist, if not for logic errors. Send the result to each member of `awaiting`. 1. Add the entry to the availability_lru. -#### `FromInteraction::NetworkRequest(requests)` - -1. Forward with `NetworkBridgeMessage::SendRequests`. - ### Interaction logic #### `launch_interaction(session_index, session_info, candidate_receipt, candidate_hash, Option)` @@ -115,13 +107,13 @@ On `Conclude`, shut down the subsystem. 1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators. 1. Set the various fields of `InteractionParams` based on the validator lists in `session_info` and information about the candidate. 1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value. -1. Otherwise, start in the `RequestChunks` phase with `received_chunks` and `requesting_chunks` both empty. -1. Set the `to_state` sender to be equal to a clone of `state.from_interaction_tx`. +1. Otherwise, start in the `RequestChunks` phase with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. +1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender. 1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`. Launch the interaction as a background task running `interaction_loop(interaction)`. -#### `interaction_loop(interaction)` +#### `interaction_loop(interaction) -> Result` ```rust // How many parallel requests to have going at once. @@ -135,13 +127,14 @@ Loop: * If the backer is `Some`, issue a `FromInteraction::NetworkRequest` with a network request for the `AvailableData` and wait for the response. * If it concludes with a `None` result, return to beginning. * If it concludes with available data, attempt a re-encoding. - * If it has the correct erasure-root, break and issue a `Concluded(Ok(available_data))`. + * If it has the correct erasure-root, break and issue a `Ok(available_data)`. * If it has an incorrect erasure-root, issue a `FromInteraction::ReportPeer` message and return to beginning. - * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `received_chunks` and `requesting_chunks`. + * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks`. * If the phase is `InteractionPhase::RequestChunks`: - * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks, and any failures should lead to issuance of a `FromInteraction::ReportPeer` message. - * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Concluded(RecoveryError::Invalid)`. If correct, break and issue `Concluded(Ok(available_data))`. + * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. + * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, push onto the back of `shuffling` to be retried. + * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`. * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`, - * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, break and set the phase to `Concluded(None)`. + * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`. * Issue a `FromInteraction::NetworkRequest` and wait for the response in `requesting_chunks`.