diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 2227442eb9..6dc2684847 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -33,10 +33,7 @@ use parity_scale_codec::{Decode, Encode, Error as CodecError, Input}; use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec}; use polkadot_node_primitives::{AvailableData, ErasureChunk}; -use polkadot_node_subsystem_util::{ - self as util, - metrics::{self, prometheus}, -}; +use polkadot_node_subsystem_util as util; use polkadot_primitives::v1::{ BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, Header, ValidatorIndex, }; @@ -47,6 +44,9 @@ use polkadot_subsystem::{ SubsystemError, }; +mod metrics; +pub use self::metrics::*; + #[cfg(test)] mod tests; @@ -1273,131 +1273,3 @@ fn prune_all(db: &Arc, config: &Config, clock: &dyn Clock) -> Re db.write(tx)?; Ok(()) } - -#[derive(Clone)] -struct MetricsInner { - received_availability_chunks_total: prometheus::Counter, - pruning: prometheus::Histogram, - process_block_finalized: prometheus::Histogram, - block_activated: prometheus::Histogram, - process_message: prometheus::Histogram, - store_available_data: prometheus::Histogram, - store_chunk: prometheus::Histogram, - get_chunk: prometheus::Histogram, -} - -/// Availability metrics. -#[derive(Default, Clone)] -pub struct Metrics(Option); - -impl Metrics { - fn on_chunks_received(&self, count: usize) { - if let Some(metrics) = &self.0 { - use core::convert::TryFrom as _; - // assume usize fits into u64 - let by = u64::try_from(count).unwrap_or_default(); - metrics.received_availability_chunks_total.inc_by(by); - } - } - - /// Provide a timer for `prune_povs` which observes on drop. - fn time_pruning(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.pruning.start_timer()) - } - - /// Provide a timer for `process_block_finalized` which observes on drop. - fn time_process_block_finalized( - &self, - ) -> Option { - self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer()) - } - - /// Provide a timer for `block_activated` which observes on drop. - fn time_block_activated(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.block_activated.start_timer()) - } - - /// Provide a timer for `process_message` which observes on drop. - fn time_process_message(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.process_message.start_timer()) - } - - /// Provide a timer for `store_available_data` which observes on drop. - fn time_store_available_data(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer()) - } - - /// Provide a timer for `store_chunk` which observes on drop. - fn time_store_chunk(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) - } - - /// Provide a timer for `get_chunk` which observes on drop. - fn time_get_chunk(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer()) - } -} - -impl metrics::Metrics for Metrics { - fn try_register(registry: &prometheus::Registry) -> Result { - let metrics = MetricsInner { - received_availability_chunks_total: prometheus::register( - prometheus::Counter::new( - "parachain_received_availability_chunks_total", - "Number of availability chunks received.", - )?, - registry, - )?, - pruning: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_pruning", - "Time spent within `av_store::prune_all`", - ))?, - registry, - )?, - process_block_finalized: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_process_block_finalized", - "Time spent within `av_store::process_block_finalized`", - ))?, - registry, - )?, - block_activated: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_block_activated", - "Time spent within `av_store::process_block_activated`", - ))?, - registry, - )?, - process_message: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_process_message", - "Time spent within `av_store::process_message`", - ))?, - registry, - )?, - store_available_data: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_store_available_data", - "Time spent within `av_store::store_available_data`", - ))?, - registry, - )?, - store_chunk: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_store_chunk", - "Time spent within `av_store::store_chunk`", - ))?, - registry, - )?, - get_chunk: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_get_chunk", - "Time spent fetching requested chunks.`", - ))?, - registry, - )?, - }; - Ok(Metrics(Some(metrics))) - } -} diff --git a/polkadot/node/core/av-store/src/metrics.rs b/polkadot/node/core/av-store/src/metrics.rs new file mode 100644 index 0000000000..fddacca662 --- /dev/null +++ b/polkadot/node/core/av-store/src/metrics.rs @@ -0,0 +1,153 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use polkadot_node_subsystem_util::metrics::{self, prometheus}; + +#[derive(Clone)] +pub(crate) struct MetricsInner { + received_availability_chunks_total: prometheus::Counter, + pruning: prometheus::Histogram, + process_block_finalized: prometheus::Histogram, + block_activated: prometheus::Histogram, + process_message: prometheus::Histogram, + store_available_data: prometheus::Histogram, + store_chunk: prometheus::Histogram, + get_chunk: prometheus::Histogram, +} + +/// Availability metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + pub(crate) fn on_chunks_received(&self, count: usize) { + if let Some(metrics) = &self.0 { + use core::convert::TryFrom as _; + // assume usize fits into u64 + let by = u64::try_from(count).unwrap_or_default(); + metrics.received_availability_chunks_total.inc_by(by); + } + } + + /// Provide a timer for `prune_povs` which observes on drop. + pub(crate) fn time_pruning(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.pruning.start_timer()) + } + + /// Provide a timer for `process_block_finalized` which observes on drop. + pub(crate) fn time_process_block_finalized( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer()) + } + + /// Provide a timer for `block_activated` which observes on drop. + pub(crate) fn time_block_activated( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.block_activated.start_timer()) + } + + /// Provide a timer for `process_message` which observes on drop. + pub(crate) fn time_process_message( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.process_message.start_timer()) + } + + /// Provide a timer for `store_available_data` which observes on drop. + pub(crate) fn time_store_available_data( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer()) + } + + /// Provide a timer for `store_chunk` which observes on drop. + pub(crate) fn time_store_chunk( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) + } + + /// Provide a timer for `get_chunk` which observes on drop. + pub(crate) fn time_get_chunk(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer()) + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + received_availability_chunks_total: prometheus::register( + prometheus::Counter::new( + "parachain_received_availability_chunks_total", + "Number of availability chunks received.", + )?, + registry, + )?, + pruning: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_pruning", + "Time spent within `av_store::prune_all`", + ))?, + registry, + )?, + process_block_finalized: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_process_block_finalized", + "Time spent within `av_store::process_block_finalized`", + ))?, + registry, + )?, + block_activated: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_block_activated", + "Time spent within `av_store::process_block_activated`", + ))?, + registry, + )?, + process_message: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_process_message", + "Time spent within `av_store::process_message`", + ))?, + registry, + )?, + store_available_data: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_store_available_data", + "Time spent within `av_store::store_available_data`", + ))?, + registry, + )?, + store_chunk: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_store_chunk", + "Time spent within `av_store::store_chunk`", + ))?, + registry, + )?, + get_chunk: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_get_chunk", + "Time spent fetching requested chunks.`", + ))?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 0b973e03bd..98785b6d39 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -73,7 +73,7 @@ mod tests; const LOG_TARGET: &str = "parachain::availability-recovery"; -// How many parallel requests interaction should have going at once. +// How many parallel recovery tasks should be running at once. const N_PARALLEL: usize = 50; // Size of the LRU cache where we keep recovered data. @@ -104,13 +104,13 @@ pub struct AvailabilityRecoverySubsystem { metrics: Metrics, } -struct RequestFromBackersPhase { +struct RequestFromBackers { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. shuffled_backers: Vec, } -struct RequestChunksPhase { +struct RequestChunksFromValidators { /// How many request have been unsuccessful so far. error_count: usize, /// Total number of responses that have been received. @@ -125,11 +125,11 @@ struct RequestChunksPhase { requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, } -struct InteractionParams { +struct RecoveryParams { /// Discovery ids of `validators`. validator_authority_keys: Vec, - /// Validators relevant to this `Interaction`. + /// Validators relevant to this `RecoveryTask`. validators: Vec, /// The number of pieces needed. @@ -145,33 +145,37 @@ struct InteractionParams { metrics: Metrics, } -enum InteractionPhase { - RequestFromBackers(RequestFromBackersPhase), - RequestChunks(RequestChunksPhase), +/// Source the availability data either by means +/// of direct request response protocol to +/// backers (a.k.a. fast-path), or recover from chunks. +enum Source { + RequestFromBackers(RequestFromBackers), + RequestChunks(RequestChunksFromValidators), } -/// A state of a single interaction reconstructing an available data. -struct Interaction { +/// A stateful reconstruction of availability data in reference to +/// a candidate hash. +struct RecoveryTask { sender: S, - /// The parameters of the interaction. - params: InteractionParams, + /// The parameters of the recovery process. + params: RecoveryParams, - /// The phase of the interaction. - phase: InteractionPhase, + /// The source to obtain the availability data from. + source: Source, } -impl RequestFromBackersPhase { +impl RequestFromBackers { fn new(mut backers: Vec) -> Self { backers.shuffle(&mut rand::thread_rng()); - RequestFromBackersPhase { shuffled_backers: backers } + RequestFromBackers { shuffled_backers: backers } } // Run this phase to completion. async fn run( &mut self, - params: &InteractionParams, + params: &RecoveryParams, sender: &mut impl SubsystemSender, ) -> Result { tracing::trace!( @@ -186,7 +190,7 @@ impl RequestFromBackersPhase { self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?; // Request data. - let (req, res) = OutgoingRequest::new( + let (req, response) = OutgoingRequest::new( Recipient::Authority( params.validator_authority_keys[validator_index.0 as usize].clone(), ), @@ -203,7 +207,7 @@ impl RequestFromBackersPhase { ) .await; - match res.await { + match response.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { if reconstructed_data_matches_root( params.validators.len(), @@ -241,12 +245,12 @@ impl RequestFromBackersPhase { } } -impl RequestChunksPhase { +impl RequestChunksFromValidators { fn new(n_validators: u32) -> Self { let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect(); shuffling.shuffle(&mut rand::thread_rng()); - RequestChunksPhase { + RequestChunksFromValidators { error_count: 0, total_received_responses: 0, shuffling: shuffling.into(), @@ -255,7 +259,7 @@ impl RequestChunksPhase { } } - fn is_unavailable(&self, params: &InteractionParams) -> bool { + fn is_unavailable(&self, params: &RecoveryParams) -> bool { is_unavailable( self.received_chunks.len(), self.requesting_chunks.total_len(), @@ -264,7 +268,7 @@ impl RequestChunksPhase { ) } - fn can_conclude(&self, params: &InteractionParams) -> bool { + fn can_conclude(&self, params: &RecoveryParams) -> bool { self.received_chunks.len() >= params.threshold || self.is_unavailable(params) } @@ -295,7 +299,7 @@ impl RequestChunksPhase { async fn launch_parallel_requests( &mut self, - params: &InteractionParams, + params: &RecoveryParams, sender: &mut impl SubsystemSender, ) { let num_requests = self.get_desired_request_count(params.threshold); @@ -346,7 +350,8 @@ impl RequestChunksPhase { .await; } - async fn wait_for_chunks(&mut self, params: &InteractionParams) { + /// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`. + async fn wait_for_chunks(&mut self, params: &RecoveryParams) { let metrics = ¶ms.metrics; // Wait for all current requests to conclude or time-out, or until we reach enough chunks. @@ -448,7 +453,7 @@ impl RequestChunksPhase { async fn run( &mut self, - params: &InteractionParams, + params: &RecoveryParams, sender: &mut impl SubsystemSender, ) -> Result { // First query the store for any chunks we've got. @@ -559,6 +564,9 @@ const fn is_unavailable( received_chunks + requesting_chunks + unrequested_validators < threshold } +/// Re-encode the data into erasure chunks in order to verify +/// the root hash of the provided merkle tree, which is built +/// on-top of the encoded chunks. fn reconstructed_data_matches_root( n_validators: usize, expected_root: &Hash, @@ -581,7 +589,7 @@ fn reconstructed_data_matches_root( branches.root() == *expected_root } -impl Interaction { +impl RecoveryTask { async fn run(mut self) -> Result { // First just see if we have the data available locally. { @@ -609,18 +617,18 @@ impl Interaction { loop { // These only fail if we cannot reach the underlying subsystem, which case there is nothing // meaningful we can do. - match self.phase { - InteractionPhase::RequestFromBackers(ref mut from_backers) => { + match self.source { + Source::RequestFromBackers(ref mut from_backers) => { match from_backers.run(&self.params, &mut self.sender).await { Ok(data) => break Ok(data), Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), Err(RecoveryError::Unavailable) => - self.phase = InteractionPhase::RequestChunks(RequestChunksPhase::new( + self.source = Source::RequestChunks(RequestChunksFromValidators::new( self.params.validators.len() as _, )), } }, - InteractionPhase::RequestChunks(ref mut from_all) => + Source::RequestChunks(ref mut from_all) => break from_all.run(&self.params, &mut self.sender).await, } } @@ -628,13 +636,13 @@ impl Interaction { } /// Accumulate all awaiting sides for some particular `AvailableData`. -struct InteractionHandle { +struct RecoveryHandle { candidate_hash: CandidateHash, remote: RemoteHandle>, awaiting: Vec>>, } -impl Future for InteractionHandle { +impl Future for RecoveryHandle { type Output = Option<(CandidateHash, Result)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -679,9 +687,9 @@ impl Future for InteractionHandle { } struct State { - /// Each interaction is implemented as its own async task, + /// Each recovery task is implemented as its own async task, /// and these handles are for communicating with them. - interactions: FuturesUnordered, + ongoing_recoveries: FuturesUnordered, /// A recent block hash for which state should be available. live_block: (BlockNumber, Hash), @@ -693,7 +701,7 @@ struct State { impl Default for State { fn default() -> Self { Self { - interactions: FuturesUnordered::new(), + ongoing_recoveries: FuturesUnordered::new(), live_block: (0, Hash::default()), availability_lru: LruCache::new(LRU_SIZE), } @@ -732,8 +740,8 @@ async fn handle_signal(state: &mut State, signal: OverseerSignal) -> SubsystemRe } } -/// Machinery around launching interactions into the background. -async fn launch_interaction( +/// Machinery around launching recovery tasks into the background. +async fn launch_recovery_task( state: &mut State, ctx: &mut Context, session_info: SessionInfo, @@ -748,7 +756,7 @@ where { let candidate_hash = receipt.hash(); - let params = InteractionParams { + let params = RecoveryParams { validator_authority_keys: session_info.discovery_keys.clone(), validators: session_info.validators.clone(), threshold: recovery_threshold(session_info.validators.len())?, @@ -759,28 +767,26 @@ where let phase = backing_group .and_then(|g| session_info.validator_groups.get(g.0 as usize)) - .map(|group| { - InteractionPhase::RequestFromBackers(RequestFromBackersPhase::new(group.clone())) - }) + .map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone()))) .unwrap_or_else(|| { - InteractionPhase::RequestChunks(RequestChunksPhase::new(params.validators.len() as _)) + Source::RequestChunks(RequestChunksFromValidators::new(params.validators.len() as _)) }); - let interaction = Interaction { sender: ctx.sender().clone(), params, phase }; + let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, source: phase }; - let (remote, remote_handle) = interaction.run().remote_handle(); + let (remote, remote_handle) = recovery_task.run().remote_handle(); - state.interactions.push(InteractionHandle { + state.ongoing_recoveries.push(RecoveryHandle { candidate_hash, remote: remote_handle, awaiting: vec![response_sender], }); - if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) { + if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) { tracing::warn!( target: LOG_TARGET, err = ?e, - "Failed to spawn a recovery interaction task", + "Failed to spawn a recovery task", ); } @@ -817,7 +823,9 @@ where return Ok(()) } - if let Some(i) = state.interactions.iter_mut().find(|i| i.candidate_hash == candidate_hash) { + if let Some(i) = + state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash) + { i.awaiting.push(response_sender); return Ok(()) } @@ -831,7 +839,7 @@ where let _span = span.child("session-info-ctx-received"); match session_info { Some(session_info) => - launch_interaction( + launch_recovery_task( state, ctx, session_info, @@ -962,7 +970,7 @@ impl AvailabilityRecoverySubsystem { } } } - output = state.interactions.select_next_some() => { + output = state.ongoing_recoveries.select_next_some() => { if let Some((candidate_hash, result)) = output { state.availability_lru.put(candidate_hash, result); } diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index a918b8c615..190507ae4d 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -1273,7 +1273,7 @@ fn does_not_query_local_validator() { fn parallel_request_calculation_works_as_expected() { let num_validators = 100; let threshold = recovery_threshold(num_validators).unwrap(); - let mut phase = RequestChunksPhase::new(100); + let mut phase = RequestChunksFromValidators::new(100); assert_eq!(phase.get_desired_request_count(threshold), threshold); phase.error_count = 1; phase.total_received_responses = 1; diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 2c9da3fbb5..d7d822188c 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -21,24 +21,21 @@ Output: ## Functionality -We hold a state which tracks the current recovery interactions we have live, as well as which request IDs correspond to which interactions. An interaction is a structure encapsulating all interaction with the network necessary to recover the available data. +We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which task. A recovery task is a structure encapsulating all recovery tasks with the network necessary to recover the available data in respect to one candidate. ```rust struct State { - /// Each interaction is implemented as its own remote async task, and these handles are remote - /// for it. - interactions: FuturesUnordered, - /// A multiplexer over receivers from live interactions. - interaction_receivers: FuturesUnordered>, + /// Each recovery is implemented as an independent async task, and the handles only supply information about the result. + ongoing_recoveries: FuturesUnordered, /// A recent block hash for which state should be available. live_block_hash: Hash, // An LRU cache of recently recovered data. availability_lru: LruCache>, } -/// This is a future, which concludes either when a response is received from the interaction, +/// This is a future, which concludes either when a response is received from the recovery tasks, /// or all the `awaiting` channels have closed. -struct InteractionHandle { +struct RecoveryHandle { candidate_hash: CandidateHash, interaction_response: RemoteHandle, awaiting: Vec>>, @@ -47,7 +44,7 @@ struct InteractionHandle { struct Unavailable; struct Concluded(CandidateHash, Result); -struct InteractionParams { +struct RecoveryTaskParams { validator_authority_keys: Vec, validators: Vec, // The number of pieces needed. @@ -56,13 +53,13 @@ struct InteractionParams { erasure_root: Hash, } -enum InteractionPhase { +enum RecoveryTask { RequestFromBackers { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. shuffled_backers: Vec, } - RequestChunks { + RequestChunksFromValidators { // a random shuffling of the validators which indicates the order in which we connect to the validators and // request the chunk from them. shuffling: Vec, @@ -71,10 +68,10 @@ enum InteractionPhase { } } -struct Interaction { +struct RecoveryTask { to_subsystems: SubsystemSender, - params: InteractionParams, - phase: InteractionPhase, + params: RecoveryTaskParams, + source: Source, } ``` @@ -89,31 +86,24 @@ On `Conclude`, shut down the subsystem. #### `AvailabilityRecoveryMessage::RecoverAvailableData(receipt, session, Option, response)` 1. Check the `availability_lru` for the candidate and return the data if so. -1. Check if there is already an interaction handle for the request. If so, add the response handle to it. -1. Otherwise, load the session info for the given session under the state of `live_block_hash`, and initiate an interaction with *`launch_interaction`*. Add an interaction handle to the state and add the response channel to it. +1. Check if there is already an recovery handle for the request. If so, add the response handle to it. +1. Otherwise, load the session info for the given session under the state of `live_block_hash`, and initiate a recovery task with *`launch_recovery_task`*. Add a recovery handle to the state and add the response channel to it. 1. If the session info is not available, return `RecoveryError::Unavailable` on the response channel. -### From-interaction logic +### Recovery logic -#### `FromInteraction::Concluded` - -1. Load the entry from the `interactions` map. It should always exist, if not for logic errors. Send the result to each member of `awaiting`. -1. Add the entry to the `availability_lru`. - -### Interaction logic - -#### `launch_interaction(session_index, session_info, candidate_receipt, candidate_hash, Option)` +#### `launch_recovery_task(session_index, session_info, candidate_receipt, candidate_hash, Option)` 1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators. -1. Set the various fields of `InteractionParams` based on the validator lists in `session_info` and information about the candidate. +1. Set the various fields of `RecoveryParams` based on the validator lists in `session_info` and information about the candidate. 1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value. -1. Otherwise, start in the `RequestChunks` phase with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. +1. Otherwise, start in the `RequestChunksFromValidators` source with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. 1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender. 1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`. -Launch the interaction as a background task running `interaction_loop(interaction)`. +Launch the source as a background task running `run(recovery_task)`. -#### `interaction_loop(interaction) -> Result` +#### `run(recovery_task) -> Result` ```rust // How many parallel requests to have going at once. @@ -121,7 +111,7 @@ const N_PARALLEL: usize = 50; ``` * Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that. -* If the phase is `InteractionPhase::RequestFromBackers` +* If the task contains `RequestFromBackers` * Loop: * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. @@ -130,14 +120,20 @@ const N_PARALLEL: usize = 50; * If it concludes with available data, attempt a re-encoding. * If it has the correct erasure-root, break and issue a `Ok(available_data)`. * If it has an incorrect erasure-root, return to beginning. - * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. + * Send the result to each member of `awaiting`. + * If the backer is `None`, set the source to `RequestChunksFromValidators` with a random shuffling of validators and empty `received_chunks`, and `requesting_chunks` and break the loop. -* If the phase is `InteractionPhase::RequestChunks`: +* If the task contains `RequestChunksFromValidators`: * Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`. * Loop: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried. - * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`. + * If `received_chunks` has more than `threshold` entries, attempt to recover the data. + * If that fails, return `Err(RecoveryError::Invalid)` + * If correct: + * If re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. + * break and issue `Ok(available_data)` + * Send the result to each member of `awaiting`. * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`, * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`. * Issue a `NetworkBridgeMessage::Requests` and wait for the response in `requesting_chunks`.