mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 04:31:08 +00:00
Retry availability until the receiver of the request is dropped (#2763)
* guide updates * keep interactions alive until receivers drop * retry indefinitely * cancel approval tasks on finality * use swap_remove instead of remove
This commit is contained in:
committed by
GitHub
parent
6514e00144
commit
08d5b268a0
@@ -19,9 +19,11 @@
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
|
||||
use futures::future::{BoxFuture, RemoteHandle, FutureExt};
|
||||
use futures::task::{Context, Poll};
|
||||
use lru::LruCache;
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
@@ -33,7 +35,7 @@ use polkadot_primitives::v1::{
|
||||
use polkadot_node_primitives::{ErasureChunk, AvailableData};
|
||||
use polkadot_subsystem::{
|
||||
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
|
||||
OverseerSignal, ActiveLeavesUpdate,
|
||||
OverseerSignal, ActiveLeavesUpdate, SubsystemSender,
|
||||
errors::RecoveryError,
|
||||
jaeger,
|
||||
messages::{
|
||||
@@ -67,21 +69,6 @@ pub struct AvailabilityRecoverySubsystem {
|
||||
fast_path: bool,
|
||||
}
|
||||
|
||||
/// Accumulate all awaiting sides for some particular `AvailableData`.
|
||||
struct InteractionHandle {
|
||||
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
|
||||
}
|
||||
|
||||
/// A message received by main code from an async `Interaction` task.
|
||||
#[derive(Debug)]
|
||||
enum FromInteraction {
|
||||
/// An interaction concluded.
|
||||
Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
|
||||
|
||||
/// Send a request on the network service.
|
||||
NetworkRequest(Requests),
|
||||
}
|
||||
|
||||
struct RequestFromBackersPhase {
|
||||
// a random shuffling of the validators from the backing group which indicates the order
|
||||
// in which we connect to them and request the chunk.
|
||||
@@ -95,7 +82,7 @@ struct RequestChunksPhase {
|
||||
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
|
||||
requesting_chunks: FuturesUnordered<BoxFuture<
|
||||
'static,
|
||||
Result<Option<ErasureChunk>, RequestError>>,
|
||||
Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
|
||||
>,
|
||||
}
|
||||
|
||||
@@ -122,9 +109,8 @@ enum InteractionPhase {
|
||||
}
|
||||
|
||||
/// A state of a single interaction reconstructing an available data.
|
||||
struct Interaction {
|
||||
/// A communication channel with the `State`.
|
||||
to_state: mpsc::Sender<FromInteraction>,
|
||||
struct Interaction<S> {
|
||||
sender: S,
|
||||
|
||||
/// The parameters of the interaction.
|
||||
params: InteractionParams,
|
||||
@@ -142,13 +128,12 @@ impl RequestFromBackersPhase {
|
||||
}
|
||||
}
|
||||
|
||||
// Run this phase to completion, returning `true` if data was successfully recovered and
|
||||
// false otherwise.
|
||||
// Run this phase to completion.
|
||||
async fn run(
|
||||
&mut self,
|
||||
params: &InteractionParams,
|
||||
to_state: &mut mpsc::Sender<FromInteraction>
|
||||
) -> Result<bool, mpsc::SendError> {
|
||||
sender: &mut impl SubsystemSender,
|
||||
) -> Result<AvailableData, RecoveryError> {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?params.candidate_hash,
|
||||
@@ -158,7 +143,7 @@ impl RequestFromBackersPhase {
|
||||
loop {
|
||||
// Pop the next backer, and proceed to next phase if we're out.
|
||||
let validator_index = match self.shuffled_backers.pop() {
|
||||
None => return Ok(false),
|
||||
None => return Err(RecoveryError::Unavailable),
|
||||
Some(i) => i,
|
||||
};
|
||||
|
||||
@@ -168,21 +153,21 @@ impl RequestFromBackersPhase {
|
||||
req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
|
||||
);
|
||||
|
||||
to_state.send(FromInteraction::NetworkRequest(Requests::AvailableDataFetching(req))).await?;
|
||||
sender.send_message(NetworkBridgeMessage::SendRequests(
|
||||
vec![Requests::AvailableDataFetching(req)],
|
||||
IfDisconnected::TryConnect,
|
||||
).into()).await;
|
||||
|
||||
match res.await {
|
||||
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
|
||||
if reconstructed_data_matches_root(params.validators.len(), ¶ms.erasure_root, &data) {
|
||||
to_state.send(
|
||||
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
|
||||
).await?;
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?params.candidate_hash,
|
||||
"Received full data",
|
||||
);
|
||||
return Ok(true);
|
||||
|
||||
return Ok(data);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
@@ -222,8 +207,8 @@ impl RequestChunksPhase {
|
||||
async fn launch_parallel_requests(
|
||||
&mut self,
|
||||
params: &InteractionParams,
|
||||
to_state: &mut mpsc::Sender<FromInteraction>,
|
||||
) -> Result<(), mpsc::SendError> {
|
||||
sender: &mut impl SubsystemSender,
|
||||
) {
|
||||
let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
|
||||
while self.requesting_chunks.len() < max_requests {
|
||||
if let Some(validator_index) = self.shuffling.pop() {
|
||||
@@ -247,39 +232,36 @@ impl RequestChunksPhase {
|
||||
raw_request.clone(),
|
||||
);
|
||||
|
||||
to_state.send(FromInteraction::NetworkRequest(Requests::ChunkFetching(req))).await?;
|
||||
sender.send_message(NetworkBridgeMessage::SendRequests(
|
||||
vec![Requests::ChunkFetching(req)],
|
||||
IfDisconnected::TryConnect,
|
||||
).into()).await;
|
||||
|
||||
self.requesting_chunks.push(Box::pin(async move {
|
||||
match res.await {
|
||||
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk))
|
||||
=> Ok(Some(chunk.recombine_into_chunk(&raw_request))),
|
||||
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
Err(e) => Err((validator_index, e)),
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_chunks(
|
||||
&mut self,
|
||||
params: &InteractionParams,
|
||||
) -> Result<(), mpsc::SendError> {
|
||||
// Check if the requesting chunks is not empty not to poll to completion.
|
||||
if self.requesting_chunks.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
) {
|
||||
// Poll for new updates from requesting_chunks.
|
||||
while let Some(request_result) = self.requesting_chunks.next().await {
|
||||
while let Poll::Ready(Some(request_result))
|
||||
= futures::poll!(self.requesting_chunks.next())
|
||||
{
|
||||
match request_result {
|
||||
Ok(Some(chunk)) => {
|
||||
// Check merkle proofs of any received chunks, and any failures should
|
||||
// lead to issuance of a FromInteraction::ReportPeer message.
|
||||
// Check merkle proofs of any received chunks.
|
||||
|
||||
let validator_index = chunk.index;
|
||||
|
||||
@@ -313,24 +295,30 @@ impl RequestChunksPhase {
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
Err((validator_index, e)) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
?validator_index,
|
||||
"Failure requesting chunk",
|
||||
);
|
||||
|
||||
match e {
|
||||
RequestError::InvalidResponse(_) => {}
|
||||
RequestError::NetworkError(_) | RequestError::Canceled(_) => {
|
||||
self.shuffling.push(validator_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(
|
||||
&mut self,
|
||||
params: &InteractionParams,
|
||||
to_state: &mut mpsc::Sender<FromInteraction>,
|
||||
) -> Result<(), mpsc::SendError> {
|
||||
sender: &mut impl SubsystemSender,
|
||||
) -> Result<AvailableData, RecoveryError> {
|
||||
loop {
|
||||
if is_unavailable(
|
||||
self.received_chunks.len(),
|
||||
@@ -347,23 +335,18 @@ impl RequestChunksPhase {
|
||||
n_validators = %params.validators.len(),
|
||||
"Data recovery is not possible",
|
||||
);
|
||||
to_state.send(FromInteraction::Concluded(
|
||||
params.candidate_hash,
|
||||
Err(RecoveryError::Unavailable),
|
||||
)).await?;
|
||||
|
||||
return Ok(());
|
||||
return Err(RecoveryError::Unavailable);
|
||||
}
|
||||
|
||||
self.launch_parallel_requests(params, to_state).await?;
|
||||
self.wait_for_chunks(params).await?;
|
||||
self.launch_parallel_requests(params, sender).await;
|
||||
self.wait_for_chunks(params).await;
|
||||
|
||||
// If received_chunks has more than threshold entries, attempt to recover the data.
|
||||
// If that fails, or a re-encoding of it doesn't match the expected erasure root,
|
||||
// break and issue a FromInteraction::Concluded(RecoveryError::Invalid).
|
||||
// Otherwise, issue a FromInteraction::Concluded(Ok(())).
|
||||
// return Err(RecoveryError::Invalid)
|
||||
if self.received_chunks.len() >= params.threshold {
|
||||
let concluded = match polkadot_erasure_coding::reconstruct_v1(
|
||||
return match polkadot_erasure_coding::reconstruct_v1(
|
||||
params.validators.len(),
|
||||
self.received_chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)),
|
||||
) {
|
||||
@@ -375,7 +358,8 @@ impl RequestChunksPhase {
|
||||
erasure_root = ?params.erasure_root,
|
||||
"Data recovery complete",
|
||||
);
|
||||
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
|
||||
|
||||
Ok(data)
|
||||
} else {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -383,10 +367,8 @@ impl RequestChunksPhase {
|
||||
erasure_root = ?params.erasure_root,
|
||||
"Data recovery - root mismatch",
|
||||
);
|
||||
FromInteraction::Concluded(
|
||||
params.candidate_hash.clone(),
|
||||
Err(RecoveryError::Invalid),
|
||||
)
|
||||
|
||||
Err(RecoveryError::Invalid)
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -397,15 +379,10 @@ impl RequestChunksPhase {
|
||||
?err,
|
||||
"Data recovery error ",
|
||||
);
|
||||
FromInteraction::Concluded(
|
||||
params.candidate_hash.clone(),
|
||||
Err(RecoveryError::Invalid),
|
||||
)
|
||||
|
||||
Err(RecoveryError::Invalid)
|
||||
},
|
||||
};
|
||||
|
||||
to_state.send(concluded).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -442,59 +419,99 @@ fn reconstructed_data_matches_root(
|
||||
branches.root() == *expected_root
|
||||
}
|
||||
|
||||
impl Interaction {
|
||||
async fn run(mut self) -> error::Result<()> {
|
||||
impl<S: SubsystemSender> Interaction<S> {
|
||||
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
|
||||
loop {
|
||||
// These only fail if we cannot reach the underlying subsystem, which case there is nothing
|
||||
// meaningful we can do.
|
||||
match self.phase {
|
||||
InteractionPhase::RequestFromBackers(ref mut from_backers) => {
|
||||
if from_backers.run(&self.params, &mut self.to_state).await
|
||||
.map_err(error::Error::ClosedToState)?
|
||||
{
|
||||
break Ok(())
|
||||
} else {
|
||||
self.phase = InteractionPhase::RequestChunks(
|
||||
RequestChunksPhase::new(self.params.validators.len() as _)
|
||||
);
|
||||
match from_backers.run(&self.params, &mut self.sender).await {
|
||||
Ok(data) => break Ok(data),
|
||||
Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
|
||||
Err(RecoveryError::Unavailable) => {
|
||||
self.phase = InteractionPhase::RequestChunks(
|
||||
RequestChunksPhase::new(self.params.validators.len() as _)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
InteractionPhase::RequestChunks(ref mut from_all) => {
|
||||
break from_all.run(&self.params, &mut self.to_state).await
|
||||
.map_err(error::Error::ClosedToState)
|
||||
break from_all.run(&self.params, &mut self.sender).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Accumulate all awaiting sides for some particular `AvailableData`.
|
||||
struct InteractionHandle {
|
||||
candidate_hash: CandidateHash,
|
||||
remote: RemoteHandle<Result<AvailableData, RecoveryError>>,
|
||||
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
|
||||
}
|
||||
|
||||
impl Future for InteractionHandle {
|
||||
type Output = Option<(CandidateHash, Result<AvailableData, RecoveryError>)>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut indices_to_remove = Vec::new();
|
||||
for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() {
|
||||
if let Poll::Ready(()) = awaiting.poll_canceled(cx) {
|
||||
indices_to_remove.push(i);
|
||||
}
|
||||
}
|
||||
|
||||
// these are reverse order, so remove is fine.
|
||||
for index in indices_to_remove {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?self.candidate_hash,
|
||||
"Receiver for available data dropped.",
|
||||
);
|
||||
|
||||
self.awaiting.swap_remove(index);
|
||||
}
|
||||
|
||||
if self.awaiting.is_empty() {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?self.candidate_hash,
|
||||
"All receivers for available data dropped.",
|
||||
);
|
||||
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
let remote = &mut self.remote;
|
||||
futures::pin_mut!(remote);
|
||||
let result = futures::ready!(remote.poll(cx));
|
||||
|
||||
for awaiting in self.awaiting.drain(..) {
|
||||
let _ = awaiting.send(result.clone());
|
||||
}
|
||||
|
||||
Poll::Ready(Some((self.candidate_hash, result)))
|
||||
}
|
||||
}
|
||||
|
||||
struct State {
|
||||
/// Each interaction is implemented as its own async task,
|
||||
/// and these handles are for communicating with them.
|
||||
interactions: HashMap<CandidateHash, InteractionHandle>,
|
||||
interactions: FuturesUnordered<InteractionHandle>,
|
||||
|
||||
/// A recent block hash for which state should be available.
|
||||
live_block: (BlockNumber, Hash),
|
||||
|
||||
/// interaction communication. This is cloned and given to interactions that are spun up.
|
||||
from_interaction_tx: mpsc::Sender<FromInteraction>,
|
||||
|
||||
/// receiver for messages from interactions.
|
||||
from_interaction_rx: mpsc::Receiver<FromInteraction>,
|
||||
|
||||
/// An LRU cache of recently recovered data.
|
||||
availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>,
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16);
|
||||
|
||||
Self {
|
||||
interactions: HashMap::new(),
|
||||
interactions: FuturesUnordered::new(),
|
||||
live_block: (0, Hash::default()),
|
||||
from_interaction_tx,
|
||||
from_interaction_rx,
|
||||
availability_lru: LruCache::new(LRU_SIZE),
|
||||
}
|
||||
}
|
||||
@@ -546,15 +563,7 @@ async fn launch_interaction(
|
||||
backing_group: Option<GroupIndex>,
|
||||
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
||||
) -> error::Result<()> {
|
||||
let to_state = state.from_interaction_tx.clone();
|
||||
|
||||
let candidate_hash = receipt.hash();
|
||||
state.interactions.insert(
|
||||
candidate_hash.clone(),
|
||||
InteractionHandle {
|
||||
awaiting: vec![response_sender],
|
||||
}
|
||||
);
|
||||
|
||||
let params = InteractionParams {
|
||||
validator_authority_keys: session_info.discovery_keys.clone(),
|
||||
@@ -574,22 +583,20 @@ async fn launch_interaction(
|
||||
));
|
||||
|
||||
let interaction = Interaction {
|
||||
to_state,
|
||||
sender: ctx.sender().clone(),
|
||||
params,
|
||||
phase,
|
||||
};
|
||||
|
||||
let future = async move {
|
||||
if let Err(e) = interaction.run().await {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
"Interaction finished with an error",
|
||||
);
|
||||
}
|
||||
}.boxed();
|
||||
let (remote, remote_handle) = interaction.run().remote_handle();
|
||||
|
||||
if let Err(e) = ctx.spawn("recovery interaction", future).await {
|
||||
state.interactions.push(InteractionHandle {
|
||||
candidate_hash,
|
||||
remote: remote_handle,
|
||||
awaiting: vec![response_sender],
|
||||
});
|
||||
|
||||
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)).await {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
@@ -626,8 +633,8 @@ async fn handle_recover(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(interaction) = state.interactions.get_mut(&candidate_hash) {
|
||||
interaction.awaiting.push(response_sender);
|
||||
if let Some(i) = state.interactions.iter_mut().find(|i| i.candidate_hash == candidate_hash) {
|
||||
i.awaiting.push(response_sender);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -678,48 +685,6 @@ async fn query_full_data(
|
||||
Ok(rx.await.map_err(error::Error::CanceledQueryFullData)?)
|
||||
}
|
||||
|
||||
/// Handles message from interaction.
|
||||
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
|
||||
async fn handle_from_interaction(
|
||||
state: &mut State,
|
||||
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
|
||||
from_interaction: FromInteraction,
|
||||
) -> error::Result<()> {
|
||||
match from_interaction {
|
||||
FromInteraction::Concluded(candidate_hash, result) => {
|
||||
// Load the entry from the interactions map.
|
||||
// It should always exist, if not for logic errors.
|
||||
if let Some(interaction) = state.interactions.remove(&candidate_hash) {
|
||||
// Send the result to each member of awaiting.
|
||||
for awaiting in interaction.awaiting {
|
||||
if let Err(_) = awaiting.send(result.clone()) {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"An awaiting side of the interaction has been canceled",
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Interaction under candidate hash {} is missing",
|
||||
candidate_hash,
|
||||
);
|
||||
}
|
||||
|
||||
state.availability_lru.put(candidate_hash, result);
|
||||
}
|
||||
FromInteraction::NetworkRequest(request) => {
|
||||
ctx.send_message(NetworkBridgeMessage::SendRequests(
|
||||
vec![request],
|
||||
IfDisconnected::TryConnect,
|
||||
).into()).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl AvailabilityRecoverySubsystem {
|
||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers.
|
||||
pub fn with_fast_path() -> Self {
|
||||
@@ -790,19 +755,9 @@ impl AvailabilityRecoverySubsystem {
|
||||
}
|
||||
}
|
||||
}
|
||||
from_interaction = state.from_interaction_rx.next() => {
|
||||
if let Some(from_interaction) = from_interaction {
|
||||
if let Err(e) = handle_from_interaction(
|
||||
&mut state,
|
||||
&mut ctx,
|
||||
from_interaction,
|
||||
).await {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
"Error handling message from interaction",
|
||||
);
|
||||
}
|
||||
output = state.interactions.next() => {
|
||||
if let Some((candidate_hash, result)) = output.flatten() {
|
||||
state.availability_lru.put(candidate_hash, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user