Refactor availability-recovery strategies (#1457)

Refactors availability-recovery strategies to allow for easily adding
new hotpaths and failover mechanisms.

The new interface allows for chaining multiple `RecoveryStrategy`-es
together, to cleanly express the relationship between them and share
state and code where neccessary/possible:

This was done in order to aid in implementing new hotpaths like
[systematic chunks
recovery](https://github.com/paritytech/polkadot-sdk/issues/598) and
[fetching from approval
checkers](https://github.com/paritytech/polkadot-sdk/issues/575).

Thanks to this design, intermediate state can be shared between the
strategies. For example, if the systematic chunks recovery retrieved
less than the needed amount of chunks, pass them over to the next
FetchChunks strategy, which will only need to recover the remaining
number of chunks.

Draft example of how a systematic chunk recovery strategy would look:
https://github.com/paritytech/polkadot-sdk/commit/667d870bdf1470525d66c13929d5eac7249dd995
(notice how easy it was to add and reuse code)

Note that this PR doesn't itself add any new strategy, it should fully
preserve backwards compatiblity in terms of functionality. Follow-up PRs
to add new strategies will come.
This commit is contained in:
Alin Dima
2023-09-20 15:56:43 +03:00
committed by GitHub
parent 771c3fbde7
commit 6f00edbc55
5 changed files with 953 additions and 754 deletions
@@ -23,11 +23,10 @@ use std::{
iter::Iterator,
num::NonZeroUsize,
pin::Pin,
time::Duration,
};
use futures::{
channel::oneshot::{self, channel},
channel::oneshot,
future::{Future, FutureExt, RemoteHandle},
pin_mut,
prelude::*,
@@ -35,77 +34,55 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
task::{Context, Poll},
};
use rand::seq::SliceRandom;
use schnellru::{ByLength, LruMap};
use task::{FetchChunks, FetchChunksParams, FetchFull, FetchFullParams};
use fatality::Nested;
use polkadot_erasure_coding::{
branch_hash, branches, obtain_chunks_v1, recovery_threshold, Error as ErasureEncodingError,
};
#[cfg(not(test))]
use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
use polkadot_node_network_protocol::{
request_response::{
self as req_res, outgoing::RequestError, v1 as request_v1, IncomingRequestReceiver,
OutgoingRequest, Recipient, Requests,
},
IfDisconnected, UnifiedReputationChange as Rep,
request_response::{v1 as request_v1, IncomingRequestReceiver},
UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem::{
errors::RecoveryError,
jaeger,
messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeTxMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
SubsystemResult,
messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::request_session_info;
use polkadot_primitives::{
AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
Hash, HashT, IndexedVec, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HashT,
SessionIndex, SessionInfo, ValidatorIndex,
};
mod error;
mod futures_undead;
mod metrics;
mod task;
use metrics::Metrics;
use futures_undead::FuturesUndead;
use sc_network::{OutboundFailure, RequestFailure};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::availability-recovery";
// How many parallel recovery tasks should be running at once.
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
const LRU_SIZE: u32 = 16;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
/// Time after which we consider a request to have failed
///
/// and we should try more peers. Note in theory the request times out at the network level,
/// measurements have shown, that in practice requests might actually take longer to fail in
/// certain occasions. (The very least, authority discovery is not part of the timeout.)
///
/// For the time being this value is the same as the timeout on the networking layer, but as this
/// timeout is more soft than the networking one, it might make sense to pick different values as
/// well.
#[cfg(not(test))]
const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
#[cfg(test)]
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
/// PoV size limit in bytes for which prefer fetching from backers.
const SMALL_POV_LIMIT: usize = 128 * 1024;
#[derive(Clone, PartialEq)]
/// The strategy we use to recover the PoV.
pub enum RecoveryStrategy {
pub enum RecoveryStrategyKind {
/// We always try the backing group first, then fallback to validator chunks.
BackersFirstAlways,
/// We try the backing group first if PoV size is lower than specified, then fallback to
@@ -113,101 +90,25 @@ pub enum RecoveryStrategy {
BackersFirstIfSizeLower(usize),
/// We always recover using validator chunks.
ChunksAlways,
/// Do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
BypassAvailabilityStore,
}
impl RecoveryStrategy {
/// Returns true if the strategy needs backing group index.
pub fn needs_backing_group(&self) -> bool {
match self {
RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) =>
true,
_ => false,
}
}
/// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise
/// `None`.
pub fn pov_size_limit(&self) -> Option<usize> {
match *self {
RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit),
_ => None,
}
}
}
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// PoV recovery strategy to use.
recovery_strategy: RecoveryStrategy,
recovery_strategy_kind: RecoveryStrategyKind,
// If this is true, do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
bypass_availability_store: bool,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
/// Metrics for this subsystem.
metrics: Metrics,
}
struct RequestFromBackers {
// a random shuffling of the validators from the backing group which indicates the order
// in which we connect to them and request the chunk.
shuffled_backers: Vec<ValidatorIndex>,
// channel to the erasure task handler.
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
}
struct RequestChunksFromValidators {
/// How many request have been unsuccessful so far.
error_count: usize,
/// Total number of responses that have been received.
///
/// including failed ones.
total_received_responses: usize,
/// a random shuffling of the validators which indicates the order in which we connect to the
/// validators and request the chunk from them.
shuffling: VecDeque<ValidatorIndex>,
/// Chunks received so far.
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
/// Pending chunk requests with soft timeout.
requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
// channel to the erasure task handler.
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
}
struct RecoveryParams {
/// Discovery ids of `validators`.
validator_authority_keys: Vec<AuthorityDiscoveryId>,
/// Validators relevant to this `RecoveryTask`.
validators: IndexedVec<ValidatorIndex, ValidatorId>,
/// The number of pieces needed.
threshold: usize,
/// A hash of the relevant candidate.
candidate_hash: CandidateHash,
/// The root of the erasure encoding of the para block.
erasure_root: Hash,
/// Metrics to report
metrics: Metrics,
/// Do not request data from availability-store
bypass_availability_store: bool,
}
/// Source the availability data either by means
/// of direct request response protocol to
/// backers (a.k.a. fast-path), or recover from chunks.
enum Source {
RequestFromBackers(RequestFromBackers),
RequestChunks(RequestChunksFromValidators),
}
/// Expensive erasure coding computations that we want to run on a blocking thread.
enum ErasureTask {
pub enum ErasureTask {
/// Reconstructs `AvailableData` from chunks given `n_validators`.
Reconstruct(
usize,
@@ -219,486 +120,6 @@ enum ErasureTask {
Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
}
/// A stateful reconstruction of availability data in reference to
/// a candidate hash.
struct RecoveryTask<Sender> {
sender: Sender,
/// The parameters of the recovery process.
params: RecoveryParams,
/// The source to obtain the availability data from.
source: Source,
// channel to the erasure task handler.
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
}
impl RequestFromBackers {
fn new(
mut backers: Vec<ValidatorIndex>,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
) -> Self {
backers.shuffle(&mut rand::thread_rng());
RequestFromBackers { shuffled_backers: backers, erasure_task_tx }
}
// Run this phase to completion.
async fn run(
&mut self,
params: &RecoveryParams,
sender: &mut impl overseer::AvailabilityRecoverySenderTrait,
) -> Result<AvailableData, RecoveryError> {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Requesting from backers",
);
loop {
// Pop the next backer, and proceed to next phase if we're out.
let validator_index =
self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?;
// Request data.
let (req, response) = OutgoingRequest::new(
Recipient::Authority(
params.validator_authority_keys[validator_index.0 as usize].clone(),
),
req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
);
sender
.send_message(NetworkBridgeTxMessage::SendRequests(
vec![Requests::AvailableDataFetchingV1(req)],
IfDisconnected::ImmediateError,
))
.await;
match response.await {
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
let (reencode_tx, reencode_rx) = channel();
self.erasure_task_tx
.send(ErasureTask::Reencode(
params.validators.len(),
params.erasure_root,
data,
reencode_tx,
))
.await
.map_err(|_| RecoveryError::ChannelClosed)?;
let reencode_response =
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
if let Some(data) = reencode_response {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Received full data",
);
return Ok(data)
} else {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
"Invalid data response",
);
// it doesn't help to report the peer with req/res.
}
},
Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
Err(e) => gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
err = ?e,
"Error fetching full available data."
),
}
}
}
}
impl RequestChunksFromValidators {
fn new(
n_validators: u32,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
) -> Self {
let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksFromValidators {
error_count: 0,
total_received_responses: 0,
shuffling: shuffling.into(),
received_chunks: HashMap::new(),
requesting_chunks: FuturesUndead::new(),
erasure_task_tx,
}
}
fn is_unavailable(&self, params: &RecoveryParams) -> bool {
is_unavailable(
self.chunk_count(),
self.requesting_chunks.total_len(),
self.shuffling.len(),
params.threshold,
)
}
fn chunk_count(&self) -> usize {
self.received_chunks.len()
}
fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) {
self.received_chunks.insert(validator_index, chunk);
}
fn can_conclude(&self, params: &RecoveryParams) -> bool {
self.chunk_count() >= params.threshold || self.is_unavailable(params)
}
/// Desired number of parallel requests.
///
/// For the given threshold (total required number of chunks) get the desired number of
/// requests we want to have running in parallel at this time.
fn get_desired_request_count(&self, threshold: usize) -> usize {
// Upper bound for parallel requests.
// We want to limit this, so requests can be processed within the timeout and we limit the
// following feedback loop:
// 1. Requests fail due to timeout
// 2. We request more chunks to make up for it
// 3. Bandwidth is spread out even more, so we get even more timeouts
// 4. We request more chunks to make up for it ...
let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
// How many chunks are still needed?
let remaining_chunks = threshold.saturating_sub(self.chunk_count());
// What is the current error rate, so we can make up for it?
let inv_error_rate =
self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
// Actual number of requests we want to have in flight in parallel:
std::cmp::min(
max_requests_boundary,
remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
)
}
async fn launch_parallel_requests<Sender>(
&mut self,
params: &RecoveryParams,
sender: &mut Sender,
) where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
let num_requests = self.get_desired_request_count(params.threshold);
let candidate_hash = &params.candidate_hash;
let already_requesting_count = self.requesting_chunks.len();
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?num_requests,
error_count= ?self.error_count,
total_received = ?self.total_received_responses,
threshold = ?params.threshold,
?already_requesting_count,
"Requesting availability chunks for a candidate",
);
let mut requests = Vec::with_capacity(num_requests - already_requesting_count);
while self.requesting_chunks.len() < num_requests {
if let Some(validator_index) = self.shuffling.pop_back() {
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
gum::trace!(
target: LOG_TARGET,
?validator,
?validator_index,
?candidate_hash,
"Requesting chunk",
);
// Request data.
let raw_request = req_res::v1::ChunkFetchingRequest {
candidate_hash: params.candidate_hash,
index: validator_index,
};
let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request);
requests.push(Requests::ChunkFetchingV1(req));
params.metrics.on_chunk_request_issued();
let timer = params.metrics.time_chunk_request();
self.requesting_chunks.push(Box::pin(async move {
let _timer = timer;
match res.await {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
Ok(Some(chunk.recombine_into_chunk(&raw_request))),
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
Err(e) => Err((validator_index, e)),
}
}));
} else {
break
}
}
sender
.send_message(NetworkBridgeTxMessage::SendRequests(
requests,
IfDisconnected::TryConnect,
))
.await;
}
/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
async fn wait_for_chunks(&mut self, params: &RecoveryParams) {
let metrics = &params.metrics;
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
// return in that case for `launch_parallel_requests` to fill up slots again.
while let Some(request_result) =
self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
{
self.total_received_responses += 1;
match request_result {
Ok(Some(chunk)) =>
if is_chunk_valid(params, &chunk) {
metrics.on_chunk_request_succeeded();
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Received valid chunk",
);
self.insert_chunk(chunk.index, chunk);
} else {
metrics.on_chunk_request_invalid();
self.error_count += 1;
},
Ok(None) => {
metrics.on_chunk_request_no_such_chunk();
self.error_count += 1;
},
Err((validator_index, e)) => {
self.error_count += 1;
gum::trace!(
target: LOG_TARGET,
candidate_hash= ?params.candidate_hash,
err = ?e,
?validator_index,
"Failure requesting chunk",
);
match e {
RequestError::InvalidResponse(_) => {
metrics.on_chunk_request_invalid();
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
err = ?e,
?validator_index,
"Chunk fetching response was invalid",
);
},
RequestError::NetworkError(err) => {
// No debug logs on general network errors - that became very spammy
// occasionally.
if let RequestFailure::Network(OutboundFailure::Timeout) = err {
metrics.on_chunk_request_timeout();
} else {
metrics.on_chunk_request_error();
}
self.shuffling.push_front(validator_index);
},
RequestError::Canceled(_) => {
metrics.on_chunk_request_error();
self.shuffling.push_front(validator_index);
},
}
},
}
// Stop waiting for requests when we either can already recover the data
// or have gotten firm 'No' responses from enough validators.
if self.can_conclude(params) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
received_chunks_count = ?self.chunk_count(),
requested_chunks_count = ?self.requesting_chunks.len(),
threshold = ?params.threshold,
"Can conclude availability for a candidate",
);
break
}
}
}
async fn run<Sender>(
&mut self,
params: &RecoveryParams,
sender: &mut Sender,
) -> Result<AvailableData, RecoveryError>
where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
let metrics = &params.metrics;
// First query the store for any chunks we've got.
if !params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
.await;
match rx.await {
Ok(chunks) => {
// This should either be length 1 or 0. If we had the whole data,
// we wouldn't have reached this stage.
let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
self.shuffling.retain(|i| !chunk_indices.contains(i));
for chunk in chunks {
if is_chunk_valid(params, &chunk) {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Found valid chunk on disk"
);
self.insert_chunk(chunk.index, chunk);
} else {
gum::error!(
target: LOG_TARGET,
"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
);
};
}
},
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Failed to reach the availability store"
);
},
}
}
let _recovery_timer = metrics.time_full_recovery();
loop {
if self.is_unavailable(&params) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
received = %self.chunk_count(),
requesting = %self.requesting_chunks.len(),
total_requesting = %self.requesting_chunks.total_len(),
n_validators = %params.validators.len(),
"Data recovery is not possible",
);
metrics.on_recovery_failed();
return Err(RecoveryError::Unavailable)
}
self.launch_parallel_requests(params, sender).await;
self.wait_for_chunks(params).await;
// If received_chunks has more than threshold entries, attempt to recover the data.
// If that fails, or a re-encoding of it doesn't match the expected erasure root,
// return Err(RecoveryError::Invalid)
if self.chunk_count() >= params.threshold {
let recovery_duration = metrics.time_erasure_recovery();
// Send request to reconstruct available data from chunks.
let (avilable_data_tx, available_data_rx) = channel();
self.erasure_task_tx
.send(ErasureTask::Reconstruct(
params.validators.len(),
std::mem::take(&mut self.received_chunks),
avilable_data_tx,
))
.await
.map_err(|_| RecoveryError::ChannelClosed)?;
let available_data_response =
available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
return match available_data_response {
Ok(data) => {
// Send request to re-encode the chunks and check merkle root.
let (reencode_tx, reencode_rx) = channel();
self.erasure_task_tx
.send(ErasureTask::Reencode(
params.validators.len(),
params.erasure_root,
data,
reencode_tx,
))
.await
.map_err(|_| RecoveryError::ChannelClosed)?;
let reencode_response =
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
if let Some(data) = reencode_response {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery complete",
);
metrics.on_recovery_succeeded();
Ok(data)
} else {
recovery_duration.map(|rd| rd.stop_and_discard());
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery - root mismatch",
);
metrics.on_recovery_invalid();
Err(RecoveryError::Invalid)
}
},
Err(err) => {
recovery_duration.map(|rd| rd.stop_and_discard());
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
?err,
"Data recovery error ",
);
metrics.on_recovery_invalid();
Err(RecoveryError::Invalid)
},
}
}
}
}
}
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
@@ -777,60 +198,6 @@ fn reconstructed_data_matches_root(
branches.root() == *expected_root
}
impl<Sender> RecoveryTask<Sender>
where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
// First just see if we have the data available locally.
if !self.params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
self.sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(
self.params.candidate_hash,
tx,
))
.await;
match rx.await {
Ok(Some(data)) => return Ok(data),
Ok(None) => {},
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Failed to reach the availability store",
)
},
}
}
self.params.metrics.on_recovery_started();
loop {
// These only fail if we cannot reach the underlying subsystem, which case there is
// nothing meaningful we can do.
match self.source {
Source::RequestFromBackers(ref mut from_backers) => {
match from_backers.run(&self.params, &mut self.sender).await {
Ok(data) => break Ok(data),
Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
Err(RecoveryError::ChannelClosed) =>
break Err(RecoveryError::ChannelClosed),
Err(RecoveryError::Unavailable) =>
self.source = Source::RequestChunks(RequestChunksFromValidators::new(
self.params.validators.len() as _,
self.erasure_task_tx.clone(),
)),
}
},
Source::RequestChunks(ref mut from_all) =>
break from_all.run(&self.params, &mut self.sender).await,
}
}
}
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct RecoveryHandle {
candidate_hash: CandidateHash,
@@ -973,65 +340,23 @@ async fn launch_recovery_task<Context>(
ctx: &mut Context,
session_info: SessionInfo,
receipt: CandidateReceipt,
mut backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
metrics: &Metrics,
recovery_strategy: &RecoveryStrategy,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
bypass_availability_store: bool,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
let params = RecoveryParams {
validator_authority_keys: session_info.discovery_keys.clone(),
validators: session_info.validators.clone(),
n_validators: session_info.validators.len(),
threshold: recovery_threshold(session_info.validators.len())?,
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore,
bypass_availability_store,
};
if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() {
// Get our own chunk size to get an estimate of the PoV size.
let chunk_size: Result<Option<usize>, error::Error> =
query_chunk_size(ctx, candidate_hash).await;
if let Ok(Some(chunk_size)) = chunk_size {
let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3;
let prefer_backing_group = pov_size_estimate < small_pov_limit;
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
pov_size_estimate,
small_pov_limit,
enabled = prefer_backing_group,
"Prefer fetch from backing group",
);
backing_group = backing_group.filter(|_| {
// We keep the backing group only if `1/3` of chunks sum up to less than
// `small_pov_limit`.
prefer_backing_group
});
}
}
let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g))
.map(|group| {
Source::RequestFromBackers(RequestFromBackers::new(
group.clone(),
erasure_task_tx.clone(),
))
})
.unwrap_or_else(|| {
Source::RequestChunks(RequestChunksFromValidators::new(
params.validators.len() as _,
erasure_task_tx.clone(),
))
});
let recovery_task =
RecoveryTask { sender: ctx.sender().clone(), params, source: phase, erasure_task_tx };
let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
let (remote, remote_handle) = recovery_task.run().remote_handle();
@@ -1062,8 +387,9 @@ async fn handle_recover<Context>(
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
metrics: &Metrics,
recovery_strategy: &RecoveryStrategy,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
recovery_strategy_kind: RecoveryStrategyKind,
bypass_availability_store: bool,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
@@ -1098,19 +424,71 @@ async fn handle_recover<Context>(
let _span = span.child("session-info-ctx-received");
match session_info {
Some(session_info) =>
Some(session_info) => {
let mut recovery_strategies: VecDeque<
Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
> = VecDeque::with_capacity(2);
if let Some(backing_group) = backing_group {
if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
let mut small_pov_size = true;
if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) =
recovery_strategy_kind
{
// Get our own chunk size to get an estimate of the PoV size.
let chunk_size: Result<Option<usize>, error::Error> =
query_chunk_size(ctx, candidate_hash).await;
if let Ok(Some(chunk_size)) = chunk_size {
let pov_size_estimate =
chunk_size.saturating_mul(session_info.validators.len()) / 3;
small_pov_size = pov_size_estimate < small_pov_limit;
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
pov_size_estimate,
small_pov_limit,
enabled = small_pov_size,
"Prefer fetch from backing group",
);
} else {
// we have a POV limit but were not able to query the chunk size, so
// don't use the backing group.
small_pov_size = false;
}
};
match (&recovery_strategy_kind, small_pov_size) {
(RecoveryStrategyKind::BackersFirstAlways, _) |
(RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) => recovery_strategies.push_back(
Box::new(FetchFull::new(FetchFullParams {
validators: backing_validators.to_vec(),
erasure_task_tx: erasure_task_tx.clone(),
})),
),
_ => {},
};
}
}
recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
n_validators: session_info.validators.len(),
erasure_task_tx,
})));
launch_recovery_task(
state,
ctx,
session_info,
receipt,
backing_group,
response_sender,
metrics,
recovery_strategy,
erasure_task_tx,
recovery_strategies,
bypass_availability_store,
)
.await,
.await
},
None => {
gum::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block);
response_sender
@@ -1155,7 +533,12 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics }
Self {
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
bypass_availability_store: true,
req_receiver,
metrics,
}
}
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
@@ -1164,7 +547,12 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics }
Self {
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
bypass_availability_store: false,
req_receiver,
metrics,
}
}
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
@@ -1172,7 +560,12 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics }
Self {
recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
bypass_availability_store: false,
req_receiver,
metrics,
}
}
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
@@ -1182,7 +575,8 @@ impl AvailabilityRecoverySubsystem {
metrics: Metrics,
) -> Self {
Self {
recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
bypass_availability_store: false,
req_receiver,
metrics,
}
@@ -1190,7 +584,8 @@ impl AvailabilityRecoverySubsystem {
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { recovery_strategy, mut req_receiver, metrics } = self;
let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
self;
let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
let mut erasure_task_rx = erasure_task_rx.fuse();
@@ -1275,11 +670,12 @@ impl AvailabilityRecoverySubsystem {
&mut ctx,
receipt,
session_index,
maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()),
maybe_backing_group,
response_sender,
&metrics,
&recovery_strategy,
erasure_task_tx.clone(),
recovery_strategy_kind.clone(),
bypass_availability_store
).await {
gum::warn!(
target: LOG_TARGET,
@@ -1295,7 +691,7 @@ impl AvailabilityRecoverySubsystem {
in_req = recv_req => {
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
Ok(req) => {
if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore {
if bypass_availability_store {
gum::debug!(
target: LOG_TARGET,
"Skipping request to availability-store.",