availability recovery type name clarifications (#4203)

* minor changes

* fmt

* rename to expressive types

* chore: fixup

* chore: remove `Data` prefixes

* address review comments

* guide items

* sourcer -> source, add `FromValdiators` suffix
This commit is contained in:
Bernhard Schuster
2021-11-08 14:43:23 +01:00
committed by GitHub
parent 1193a5554b
commit edac78d03c
5 changed files with 246 additions and 217 deletions
+4 -132
View File
@@ -33,10 +33,7 @@ use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v1::{
BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, Header, ValidatorIndex,
};
@@ -47,6 +44,9 @@ use polkadot_subsystem::{
SubsystemError,
};
mod metrics;
pub use self::metrics::*;
#[cfg(test)]
mod tests;
@@ -1273,131 +1273,3 @@ fn prune_all(db: &Arc<dyn KeyValueDB>, config: &Config, clock: &dyn Clock) -> Re
db.write(tx)?;
Ok(())
}
#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
pruning: prometheus::Histogram,
process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram,
process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram,
get_chunk: prometheus::Histogram,
}
/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
/// Provide a timer for `prune_povs` which observes on drop.
fn time_pruning(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.pruning.start_timer())
}
/// Provide a timer for `process_block_finalized` which observes on drop.
fn time_process_block_finalized(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer())
}
/// Provide a timer for `block_activated` which observes on drop.
fn time_block_activated(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.block_activated.start_timer())
}
/// Provide a timer for `process_message` which observes on drop.
fn time_process_message(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_message.start_timer())
}
/// Provide a timer for `store_available_data` which observes on drop.
fn time_store_available_data(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer())
}
/// Provide a timer for `store_chunk` which observes on drop.
fn time_store_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer())
}
/// Provide a timer for `get_chunk` which observes on drop.
fn time_get_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
pruning: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_pruning",
"Time spent within `av_store::prune_all`",
))?,
registry,
)?,
process_block_finalized: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_block_finalized",
"Time spent within `av_store::process_block_finalized`",
))?,
registry,
)?,
block_activated: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_block_activated",
"Time spent within `av_store::process_block_activated`",
))?,
registry,
)?,
process_message: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_message",
"Time spent within `av_store::process_message`",
))?,
registry,
)?,
store_available_data: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_available_data",
"Time spent within `av_store::store_available_data`",
))?,
registry,
)?,
store_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk",
"Time spent within `av_store::store_chunk`",
))?,
registry,
)?,
get_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_get_chunk",
"Time spent fetching requested chunks.`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
+153
View File
@@ -0,0 +1,153 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone)]
pub(crate) struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
pruning: prometheus::Histogram,
process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram,
process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram,
get_chunk: prometheus::Histogram,
}
/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
pub(crate) fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
/// Provide a timer for `prune_povs` which observes on drop.
pub(crate) fn time_pruning(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.pruning.start_timer())
}
/// Provide a timer for `process_block_finalized` which observes on drop.
pub(crate) fn time_process_block_finalized(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer())
}
/// Provide a timer for `block_activated` which observes on drop.
pub(crate) fn time_block_activated(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.block_activated.start_timer())
}
/// Provide a timer for `process_message` which observes on drop.
pub(crate) fn time_process_message(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_message.start_timer())
}
/// Provide a timer for `store_available_data` which observes on drop.
pub(crate) fn time_store_available_data(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer())
}
/// Provide a timer for `store_chunk` which observes on drop.
pub(crate) fn time_store_chunk(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer())
}
/// Provide a timer for `get_chunk` which observes on drop.
pub(crate) fn time_get_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
pruning: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_pruning",
"Time spent within `av_store::prune_all`",
))?,
registry,
)?,
process_block_finalized: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_block_finalized",
"Time spent within `av_store::process_block_finalized`",
))?,
registry,
)?,
block_activated: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_block_activated",
"Time spent within `av_store::process_block_activated`",
))?,
registry,
)?,
process_message: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_message",
"Time spent within `av_store::process_message`",
))?,
registry,
)?,
store_available_data: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_available_data",
"Time spent within `av_store::store_available_data`",
))?,
registry,
)?,
store_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk",
"Time spent within `av_store::store_chunk`",
))?,
registry,
)?,
get_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_get_chunk",
"Time spent fetching requested chunks.`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
@@ -73,7 +73,7 @@ mod tests;
const LOG_TARGET: &str = "parachain::availability-recovery";
// How many parallel requests interaction should have going at once.
// How many parallel recovery tasks should be running at once.
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
@@ -104,13 +104,13 @@ pub struct AvailabilityRecoverySubsystem {
metrics: Metrics,
}
struct RequestFromBackersPhase {
struct RequestFromBackers {
// a random shuffling of the validators from the backing group which indicates the order
// in which we connect to them and request the chunk.
shuffled_backers: Vec<ValidatorIndex>,
}
struct RequestChunksPhase {
struct RequestChunksFromValidators {
/// How many request have been unsuccessful so far.
error_count: usize,
/// Total number of responses that have been received.
@@ -125,11 +125,11 @@ struct RequestChunksPhase {
requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
}
struct InteractionParams {
struct RecoveryParams {
/// Discovery ids of `validators`.
validator_authority_keys: Vec<AuthorityDiscoveryId>,
/// Validators relevant to this `Interaction`.
/// Validators relevant to this `RecoveryTask`.
validators: Vec<ValidatorId>,
/// The number of pieces needed.
@@ -145,33 +145,37 @@ struct InteractionParams {
metrics: Metrics,
}
enum InteractionPhase {
RequestFromBackers(RequestFromBackersPhase),
RequestChunks(RequestChunksPhase),
/// Source the availability data either by means
/// of direct request response protocol to
/// backers (a.k.a. fast-path), or recover from chunks.
enum Source {
RequestFromBackers(RequestFromBackers),
RequestChunks(RequestChunksFromValidators),
}
/// A state of a single interaction reconstructing an available data.
struct Interaction<S> {
/// A stateful reconstruction of availability data in reference to
/// a candidate hash.
struct RecoveryTask<S> {
sender: S,
/// The parameters of the interaction.
params: InteractionParams,
/// The parameters of the recovery process.
params: RecoveryParams,
/// The phase of the interaction.
phase: InteractionPhase,
/// The source to obtain the availability data from.
source: Source,
}
impl RequestFromBackersPhase {
impl RequestFromBackers {
fn new(mut backers: Vec<ValidatorIndex>) -> Self {
backers.shuffle(&mut rand::thread_rng());
RequestFromBackersPhase { shuffled_backers: backers }
RequestFromBackers { shuffled_backers: backers }
}
// Run this phase to completion.
async fn run(
&mut self,
params: &InteractionParams,
params: &RecoveryParams,
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
tracing::trace!(
@@ -186,7 +190,7 @@ impl RequestFromBackersPhase {
self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?;
// Request data.
let (req, res) = OutgoingRequest::new(
let (req, response) = OutgoingRequest::new(
Recipient::Authority(
params.validator_authority_keys[validator_index.0 as usize].clone(),
),
@@ -203,7 +207,7 @@ impl RequestFromBackersPhase {
)
.await;
match res.await {
match response.await {
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
if reconstructed_data_matches_root(
params.validators.len(),
@@ -241,12 +245,12 @@ impl RequestFromBackersPhase {
}
}
impl RequestChunksPhase {
impl RequestChunksFromValidators {
fn new(n_validators: u32) -> Self {
let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksPhase {
RequestChunksFromValidators {
error_count: 0,
total_received_responses: 0,
shuffling: shuffling.into(),
@@ -255,7 +259,7 @@ impl RequestChunksPhase {
}
}
fn is_unavailable(&self, params: &InteractionParams) -> bool {
fn is_unavailable(&self, params: &RecoveryParams) -> bool {
is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.total_len(),
@@ -264,7 +268,7 @@ impl RequestChunksPhase {
)
}
fn can_conclude(&self, params: &InteractionParams) -> bool {
fn can_conclude(&self, params: &RecoveryParams) -> bool {
self.received_chunks.len() >= params.threshold || self.is_unavailable(params)
}
@@ -295,7 +299,7 @@ impl RequestChunksPhase {
async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
params: &RecoveryParams,
sender: &mut impl SubsystemSender,
) {
let num_requests = self.get_desired_request_count(params.threshold);
@@ -346,7 +350,8 @@ impl RequestChunksPhase {
.await;
}
async fn wait_for_chunks(&mut self, params: &InteractionParams) {
/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
async fn wait_for_chunks(&mut self, params: &RecoveryParams) {
let metrics = &params.metrics;
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
@@ -448,7 +453,7 @@ impl RequestChunksPhase {
async fn run(
&mut self,
params: &InteractionParams,
params: &RecoveryParams,
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
// First query the store for any chunks we've got.
@@ -559,6 +564,9 @@ const fn is_unavailable(
received_chunks + requesting_chunks + unrequested_validators < threshold
}
/// Re-encode the data into erasure chunks in order to verify
/// the root hash of the provided merkle tree, which is built
/// on-top of the encoded chunks.
fn reconstructed_data_matches_root(
n_validators: usize,
expected_root: &Hash,
@@ -581,7 +589,7 @@ fn reconstructed_data_matches_root(
branches.root() == *expected_root
}
impl<S: SubsystemSender> Interaction<S> {
impl<S: SubsystemSender> RecoveryTask<S> {
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
// First just see if we have the data available locally.
{
@@ -609,18 +617,18 @@ impl<S: SubsystemSender> Interaction<S> {
loop {
// These only fail if we cannot reach the underlying subsystem, which case there is nothing
// meaningful we can do.
match self.phase {
InteractionPhase::RequestFromBackers(ref mut from_backers) => {
match self.source {
Source::RequestFromBackers(ref mut from_backers) => {
match from_backers.run(&self.params, &mut self.sender).await {
Ok(data) => break Ok(data),
Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
Err(RecoveryError::Unavailable) =>
self.phase = InteractionPhase::RequestChunks(RequestChunksPhase::new(
self.source = Source::RequestChunks(RequestChunksFromValidators::new(
self.params.validators.len() as _,
)),
}
},
InteractionPhase::RequestChunks(ref mut from_all) =>
Source::RequestChunks(ref mut from_all) =>
break from_all.run(&self.params, &mut self.sender).await,
}
}
@@ -628,13 +636,13 @@ impl<S: SubsystemSender> Interaction<S> {
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
struct RecoveryHandle {
candidate_hash: CandidateHash,
remote: RemoteHandle<Result<AvailableData, RecoveryError>>,
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}
impl Future for InteractionHandle {
impl Future for RecoveryHandle {
type Output = Option<(CandidateHash, Result<AvailableData, RecoveryError>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -679,9 +687,9 @@ impl Future for InteractionHandle {
}
struct State {
/// Each interaction is implemented as its own async task,
/// Each recovery task is implemented as its own async task,
/// and these handles are for communicating with them.
interactions: FuturesUnordered<InteractionHandle>,
ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
/// A recent block hash for which state should be available.
live_block: (BlockNumber, Hash),
@@ -693,7 +701,7 @@ struct State {
impl Default for State {
fn default() -> Self {
Self {
interactions: FuturesUnordered::new(),
ongoing_recoveries: FuturesUnordered::new(),
live_block: (0, Hash::default()),
availability_lru: LruCache::new(LRU_SIZE),
}
@@ -732,8 +740,8 @@ async fn handle_signal(state: &mut State, signal: OverseerSignal) -> SubsystemRe
}
}
/// Machinery around launching interactions into the background.
async fn launch_interaction<Context>(
/// Machinery around launching recovery tasks into the background.
async fn launch_recovery_task<Context>(
state: &mut State,
ctx: &mut Context,
session_info: SessionInfo,
@@ -748,7 +756,7 @@ where
{
let candidate_hash = receipt.hash();
let params = InteractionParams {
let params = RecoveryParams {
validator_authority_keys: session_info.discovery_keys.clone(),
validators: session_info.validators.clone(),
threshold: recovery_threshold(session_info.validators.len())?,
@@ -759,28 +767,26 @@ where
let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g.0 as usize))
.map(|group| {
InteractionPhase::RequestFromBackers(RequestFromBackersPhase::new(group.clone()))
})
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
.unwrap_or_else(|| {
InteractionPhase::RequestChunks(RequestChunksPhase::new(params.validators.len() as _))
Source::RequestChunks(RequestChunksFromValidators::new(params.validators.len() as _))
});
let interaction = Interaction { sender: ctx.sender().clone(), params, phase };
let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, source: phase };
let (remote, remote_handle) = interaction.run().remote_handle();
let (remote, remote_handle) = recovery_task.run().remote_handle();
state.interactions.push(InteractionHandle {
state.ongoing_recoveries.push(RecoveryHandle {
candidate_hash,
remote: remote_handle,
awaiting: vec![response_sender],
});
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) {
if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to spawn a recovery interaction task",
"Failed to spawn a recovery task",
);
}
@@ -817,7 +823,9 @@ where
return Ok(())
}
if let Some(i) = state.interactions.iter_mut().find(|i| i.candidate_hash == candidate_hash) {
if let Some(i) =
state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash)
{
i.awaiting.push(response_sender);
return Ok(())
}
@@ -831,7 +839,7 @@ where
let _span = span.child("session-info-ctx-received");
match session_info {
Some(session_info) =>
launch_interaction(
launch_recovery_task(
state,
ctx,
session_info,
@@ -962,7 +970,7 @@ impl AvailabilityRecoverySubsystem {
}
}
}
output = state.interactions.select_next_some() => {
output = state.ongoing_recoveries.select_next_some() => {
if let Some((candidate_hash, result)) = output {
state.availability_lru.put(candidate_hash, result);
}
@@ -1273,7 +1273,7 @@ fn does_not_query_local_validator() {
fn parallel_request_calculation_works_as_expected() {
let num_validators = 100;
let threshold = recovery_threshold(num_validators).unwrap();
let mut phase = RequestChunksPhase::new(100);
let mut phase = RequestChunksFromValidators::new(100);
assert_eq!(phase.get_desired_request_count(threshold), threshold);
phase.error_count = 1;
phase.total_received_responses = 1;
@@ -21,24 +21,21 @@ Output:
## Functionality
We hold a state which tracks the current recovery interactions we have live, as well as which request IDs correspond to which interactions. An interaction is a structure encapsulating all interaction with the network necessary to recover the available data.
We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which task. A recovery task is a structure encapsulating all recovery tasks with the network necessary to recover the available data in respect to one candidate.
```rust
struct State {
/// Each interaction is implemented as its own remote async task, and these handles are remote
/// for it.
interactions: FuturesUnordered<InteractionHandle>,
/// A multiplexer over receivers from live interactions.
interaction_receivers: FuturesUnordered<ResponseReceiver<Concluded>>,
/// Each recovery is implemented as an independent async task, and the handles only supply information about the result.
ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
/// A recent block hash for which state should be available.
live_block_hash: Hash,
// An LRU cache of recently recovered data.
availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>,
}
/// This is a future, which concludes either when a response is received from the interaction,
/// This is a future, which concludes either when a response is received from the recovery tasks,
/// or all the `awaiting` channels have closed.
struct InteractionHandle {
struct RecoveryHandle {
candidate_hash: CandidateHash,
interaction_response: RemoteHandle<Concluded>,
awaiting: Vec<ResponseChannel<Result<AvailableData, RecoveryError>>>,
@@ -47,7 +44,7 @@ struct InteractionHandle {
struct Unavailable;
struct Concluded(CandidateHash, Result<AvailableData, RecoveryError>);
struct InteractionParams {
struct RecoveryTaskParams {
validator_authority_keys: Vec<AuthorityId>,
validators: Vec<ValidatorId>,
// The number of pieces needed.
@@ -56,13 +53,13 @@ struct InteractionParams {
erasure_root: Hash,
}
enum InteractionPhase {
enum RecoveryTask {
RequestFromBackers {
// a random shuffling of the validators from the backing group which indicates the order
// in which we connect to them and request the chunk.
shuffled_backers: Vec<ValidatorIndex>,
}
RequestChunks {
RequestChunksFromValidators {
// a random shuffling of the validators which indicates the order in which we connect to the validators and
// request the chunk from them.
shuffling: Vec<ValidatorIndex>,
@@ -71,10 +68,10 @@ enum InteractionPhase {
}
}
struct Interaction {
struct RecoveryTask {
to_subsystems: SubsystemSender,
params: InteractionParams,
phase: InteractionPhase,
params: RecoveryTaskParams,
source: Source,
}
```
@@ -89,31 +86,24 @@ On `Conclude`, shut down the subsystem.
#### `AvailabilityRecoveryMessage::RecoverAvailableData(receipt, session, Option<backing_group_index>, response)`
1. Check the `availability_lru` for the candidate and return the data if so.
1. Check if there is already an interaction handle for the request. If so, add the response handle to it.
1. Otherwise, load the session info for the given session under the state of `live_block_hash`, and initiate an interaction with *`launch_interaction`*. Add an interaction handle to the state and add the response channel to it.
1. Check if there is already an recovery handle for the request. If so, add the response handle to it.
1. Otherwise, load the session info for the given session under the state of `live_block_hash`, and initiate a recovery task with *`launch_recovery_task`*. Add a recovery handle to the state and add the response channel to it.
1. If the session info is not available, return `RecoveryError::Unavailable` on the response channel.
### From-interaction logic
### Recovery logic
#### `FromInteraction::Concluded`
1. Load the entry from the `interactions` map. It should always exist, if not for logic errors. Send the result to each member of `awaiting`.
1. Add the entry to the `availability_lru`.
### Interaction logic
#### `launch_interaction(session_index, session_info, candidate_receipt, candidate_hash, Option<backing_group_index>)`
#### `launch_recovery_task(session_index, session_info, candidate_receipt, candidate_hash, Option<backing_group_index>)`
1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators.
1. Set the various fields of `InteractionParams` based on the validator lists in `session_info` and information about the candidate.
1. Set the various fields of `RecoveryParams` based on the validator lists in `session_info` and information about the candidate.
1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value.
1. Otherwise, start in the `RequestChunks` phase with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty.
1. Otherwise, start in the `RequestChunksFromValidators` source with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty.
1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender.
1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`.
Launch the interaction as a background task running `interaction_loop(interaction)`.
Launch the source as a background task running `run(recovery_task)`.
#### `interaction_loop(interaction) -> Result<AvailableData, RecoeryError>`
#### `run(recovery_task) -> Result<AvailableData, RecoeryError>`
```rust
// How many parallel requests to have going at once.
@@ -121,7 +111,7 @@ const N_PARALLEL: usize = 50;
```
* Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that.
* If the phase is `InteractionPhase::RequestFromBackers`
* If the task contains `RequestFromBackers`
* Loop:
* If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`.
* If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`.
@@ -130,14 +120,20 @@ const N_PARALLEL: usize = 50;
* If it concludes with available data, attempt a re-encoding.
* If it has the correct erasure-root, break and issue a `Ok(available_data)`.
* If it has an incorrect erasure-root, return to beginning.
* If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop.
* Send the result to each member of `awaiting`.
* If the backer is `None`, set the source to `RequestChunksFromValidators` with a random shuffling of validators and empty `received_chunks`, and `requesting_chunks` and break the loop.
* If the phase is `InteractionPhase::RequestChunks`:
* If the task contains `RequestChunksFromValidators`:
* Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`.
* Loop:
* If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`.
* Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried.
* If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`.
* If `received_chunks` has more than `threshold` entries, attempt to recover the data.
* If that fails, return `Err(RecoveryError::Invalid)`
* If correct:
* If re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`.
* break and issue `Ok(available_data)`
* Send the result to each member of `awaiting`.
* While there are fewer than `N_PARALLEL` entries in `requesting_chunks`,
* Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`.
* Issue a `NetworkBridgeMessage::Requests` and wait for the response in `requesting_chunks`.