Availability recovery subsystem (#2122)

* Adds message types

* Add code skeleton

* Adds subsystem code.

* Adds a first test

* Adds interaction result to availability_lru

* Use LruCache instead of a HashMap

* Whitespaces to tabs

* Do not ignore errors

* Change error type

* Add a timeout to chunk requests

* Add custom errors and log them

* Adds replace_availability_recovery method

* recovery_threshold computed by erasure crate

* change core to std

* adds docs to error type

* Adds a test for invalid reconstruction

* refactors interaction run into multiple methods

* Cleanup AwaitedChunks

* Even more fixes

* Test that recovery with wrong root is an error

* Break to launch another requests

* Styling fixes

* Add SessionIndex to API

* Proper relay parents for MakeRequest

* Remove validator_discovery and use message

* Remove a stream on exhaustion

* On cleanup free the request streams

* Fix merge and refactor
This commit is contained in:
Fedor Sakharov
2021-01-15 05:06:25 +03:00
committed by GitHub
parent 00d2b4bf3e
commit 90a686266f
13 changed files with 1682 additions and 23 deletions
+28
View File
@@ -4942,6 +4942,33 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "polkadot-availability-recovery"
version = "0.1.0"
dependencies = [
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.10",
"futures-timer 3.0.2",
"log",
"lru",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"rand 0.7.3",
"smallvec 1.6.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
"streamunordered",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
name = "polkadot-cli"
version = "0.8.27"
@@ -5650,6 +5677,7 @@ dependencies = [
"pallet-transaction-payment-rpc-runtime-api",
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-availability-recovery",
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
+1
View File
@@ -57,6 +57,7 @@ members = [
"node/network/statement-distribution",
"node/network/bitfield-distribution",
"node/network/availability-distribution",
"node/network/availability-recovery",
"node/network/collator-protocol",
"node/overseer",
"node/primitives",
+1
View File
@@ -119,6 +119,7 @@ impl CodeParams {
.expect("this struct is not created with invalid shard number; qed")
}
}
/// Returns the maximum number of allowed, faulty chunks
/// which does not prevent recovery given all other pieces
/// are correct.
@@ -0,0 +1,34 @@
[package]
name = "polkadot-availability-recovery"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.8"
lru = "0.6.1"
rand = "0.7.3"
thiserror = "1.0.21"
tracing = "0.1.22"
tracing-futures = "0.2.4"
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
futures-timer = "3.0.2"
streamunordered = "0.5.1"
[dev-dependencies]
assert_matches = "1.4.0"
env_logger = "0.8.1"
futures-timer = "3.0.2"
log = "0.4.11"
smallvec = "1.5.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
@@ -0,0 +1,50 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The `Error` and `Result` types used by the subsystem.
use futures::channel::{mpsc, oneshot};
use thiserror::Error;
/// Error type used by the Availability Recovery subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Subsystem(#[from] polkadot_subsystem::SubsystemError),
#[error("failed to query a chunk from store")]
CanceledQueryChunk(#[source] oneshot::Canceled),
#[error("failed to query session info")]
CanceledSessionInfo(#[source] oneshot::Canceled),
#[error("failed to send response")]
CanceledResponseSender,
#[error("to_state channel is closed")]
ClosedToState(#[source] mpsc::SendError),
#[error(transparent)]
Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -0,0 +1,854 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Availability Recovery Subsystem of Polkadot.
#![warn(missing_docs)]
use std::collections::HashMap;
use std::time::Duration;
use std::pin::Pin;
use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use lru::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use streamunordered::{StreamUnordered, StreamYield};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
Hash, ErasureChunk, ValidatorId, ValidatorIndex,
SessionInfo, SessionIndex, BlakeTwo256, HashT,
};
use polkadot_subsystem::{
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
OverseerSignal, ActiveLeavesUpdate,
errors::RecoveryError,
messages::{
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, RequestId,
};
use polkadot_node_subsystem_util::{
Timeout, TimeoutExt,
request_session_info_ctx,
};
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "availability_recovery";
const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_UNEXPECTED_CHUNK: Rep = Rep::new(-100, "Peer has sent an unexpected chunk");
// How many parallel requests interaction should have going at once.
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
const LRU_SIZE: usize = 16;
// A timeout for a chunk request.
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
// A period to poll and clean AwaitedChunks.
const AWAITED_CHUNKS_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem;
type ChunkResponse = Result<(PeerId, ErasureChunk), RecoveryError>;
/// Data we keep around for every chunk that we are awaiting.
struct AwaitedChunk {
/// Index of the validator we have requested this chunk from.
validator_index: ValidatorIndex,
/// The hash of the candidate the chunks belongs to.
candidate_hash: CandidateHash,
/// Token to cancel the connection request to the validator.
token: usize,
/// Result sender.
response: oneshot::Sender<ChunkResponse>,
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}
/// A message received by main code from an async `Interaction` task.
#[derive(Debug)]
enum FromInteraction {
/// An interaction concluded.
Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
/// Make a request of a particular chunk from a particular validator.
MakeRequest(
AuthorityDiscoveryId,
CandidateHash,
ValidatorIndex,
oneshot::Sender<ChunkResponse>,
),
/// Report a peer.
ReportPeer(
PeerId,
Rep,
),
}
/// A state of a single interaction reconstructing an available data.
struct Interaction {
/// A communication channel with the `State`.
to_state: mpsc::Sender<FromInteraction>,
/// Discovery ids of `validators`.
validator_authority_keys: Vec<AuthorityDiscoveryId>,
/// Validators relevant to this `Interaction`.
validators: Vec<ValidatorId>,
/// A random shuffling of the validators which indicates the order in which we connect
/// to the validators and request the chunk from them.
shuffling: Vec<ValidatorIndex>,
/// The number of pieces needed.
threshold: usize,
/// A hash of the relevant candidate.
candidate_hash: CandidateHash,
/// The root of the erasure encoding of the para block.
erasure_root: Hash,
/// The chunks that we have received from peers.
received_chunks: HashMap<PeerId, ErasureChunk>,
/// The chunk requests that are waiting to complete.
requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<ChunkResponse>>>,
}
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
n_validators: usize,
threshold: usize,
) -> bool {
received_chunks + requesting_chunks + n_validators < threshold
}
impl Interaction {
async fn launch_parallel_requests(&mut self) -> error::Result<()> {
while self.requesting_chunks.len() < N_PARALLEL {
if let Some(validator_index) = self.shuffling.pop() {
let (tx, rx) = oneshot::channel();
self.to_state.send(FromInteraction::MakeRequest(
self.validator_authority_keys[validator_index as usize].clone(),
self.candidate_hash.clone(),
validator_index,
tx,
)).await.map_err(error::Error::ClosedToState)?;
self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT));
} else {
break;
}
}
Ok(())
}
async fn wait_for_chunks(&mut self) -> error::Result<()> {
// Check if the requesting chunks is not empty not to poll to completion.
if self.requesting_chunks.is_empty() {
return Ok(());
}
// Poll for new updates from requesting_chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
match request_result {
Some(Ok(Ok((peer_id, chunk)))) => {
// Check merkle proofs of any received chunks, and any failures should
// lead to issuance of a FromInteraction::ReportPeer message.
if let Ok(anticipated_hash) = branch_hash(
&self.erasure_root,
&chunk.proof,
chunk.index as usize,
) {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
self.to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await.map_err(error::Error::ClosedToState)?;
}
} else {
self.to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await.map_err(error::Error::ClosedToState)?;
}
self.received_chunks.insert(peer_id, chunk);
}
Some(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A response channel was cacelled while waiting for a chunk",
);
}
Some(Ok(Err(e))) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A chunk request ended with an error",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
"A chunk request has timed out",
);
// we break here to launch another request.
break;
}
}
}
Ok(())
}
async fn run(mut self) -> error::Result<()> {
loop {
if is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
self.threshold,
) {
self.to_state.send(FromInteraction::Concluded(
self.candidate_hash,
Err(RecoveryError::Unavailable),
)).await.map_err(error::Error::ClosedToState)?;
return Ok(());
}
self.launch_parallel_requests().await?;
self.wait_for_chunks().await?;
// If received_chunks has more than threshold entries, attempt to recover the data.
// If that fails, or a re-encoding of it doesn't match the expected erasure root,
// break and issue a FromInteraction::Concluded(RecoveryError::Invalid).
// Otherwise, issue a FromInteraction::Concluded(Ok(())).
if self.received_chunks.len() >= self.threshold {
let concluded = match polkadot_erasure_coding::reconstruct_v1(
self.validators.len(),
self.received_chunks.values().map(|c| (&c.chunk[..], c.index as usize)),
) {
Ok(data) => {
if reconstructed_data_matches_root(self.validators.len(), &self.erasure_root, &data) {
FromInteraction::Concluded(self.candidate_hash.clone(), Ok(data))
} else {
FromInteraction::Concluded(
self.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
}
}
Err(_) => FromInteraction::Concluded(
self.candidate_hash.clone(),
Err(RecoveryError::Invalid),
),
};
self.to_state.send(concluded).await.map_err(error::Error::ClosedToState)?;
return Ok(());
}
}
}
}
fn reconstructed_data_matches_root(
n_validators: usize,
expected_root: &Hash,
data: &AvailableData,
) -> bool {
let chunks = match obtain_chunks_v1(n_validators, data) {
Ok(chunks) => chunks,
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to obtain chunks",
);
return false;
}
};
let branches = branches(&chunks);
branches.root() == *expected_root
}
struct State {
/// Each interaction is implemented as its own async task,
/// and these handles are for communicating with them.
interactions: HashMap<CandidateHash, InteractionHandle>,
/// A recent block hash for which state should be available.
live_block_hash: Hash,
/// We are waiting for these validators to connect and as soon as they
/// do to request the needed chunks we are awaitinf for.
discovering_validators: HashMap<AuthorityDiscoveryId, Vec<AwaitedChunk>>,
/// Requests that we have issued to the already connected validators
/// about the chunks we are interested in.
live_chunk_requests: HashMap<RequestId, (PeerId, AwaitedChunk)>,
/// Derive request ids from this.
next_request_id: RequestId,
connecting_validators: StreamUnordered<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
/// interaction communication. This is cloned and given to interactions that are spun up.
from_interaction_tx: mpsc::Sender<FromInteraction>,
/// receiver for messages from interactions.
from_interaction_rx: mpsc::Receiver<FromInteraction>,
/// An LRU cache of recently recovered data.
availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>,
}
impl Default for State {
fn default() -> Self {
let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16);
Self {
from_interaction_tx,
from_interaction_rx,
interactions: HashMap::new(),
live_block_hash: Hash::default(),
discovering_validators: HashMap::new(),
live_chunk_requests: HashMap::new(),
next_request_id: 0,
connecting_validators: StreamUnordered::new(),
availability_lru: LruCache::new(LRU_SIZE),
}
}
}
impl<C> Subsystem<C> for AvailabilityRecoverySubsystem
where C: SubsystemContext<Message = AvailabilityRecoveryMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = self.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-recovery", e))
.boxed();
SpawnedSubsystem {
name: "availability-recovery-subsystem",
future,
}
}
}
/// Handles a signal from the overseer.
async fn handle_signal(
state: &mut State,
signal: OverseerSignal,
) -> SubsystemResult<bool> {
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
// if activated is non-empty, set state.live_block_hash to the first block in Activated.
if let Some(hash) = activated.get(0) {
state.live_block_hash = hash.0;
}
Ok(false)
}
OverseerSignal::BlockFinalized(_, _) => Ok(false)
}
}
/// Report a reputation change for a peer.
async fn report_peer(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
peer: PeerId,
rep: Rep,
) {
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await;
}
/// Machinery around launching interactions into the background.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn launch_interaction(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
session_index: SessionIndex,
session_info: SessionInfo,
receipt: CandidateReceipt,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
let threshold = recovery_threshold(session_info.validators.len())?;
let to_state = state.from_interaction_tx.clone();
let candidate_hash = receipt.hash();
let erasure_root = receipt.descriptor.erasure_root;
let validators = session_info.validators.clone();
let validator_authority_keys = session_info.discovery_keys.clone();
let mut shuffling: Vec<_> = (0..validators.len() as ValidatorIndex).collect();
state.interactions.insert(
candidate_hash.clone(),
InteractionHandle {
awaiting: vec![response_sender],
}
);
{
// make borrow checker happy.
let mut rng = thread_rng();
shuffling.shuffle(&mut rng);
}
let interaction = Interaction {
to_state,
validator_authority_keys,
validators,
shuffling,
threshold,
candidate_hash,
erasure_root,
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
};
let future = async move {
if let Err(e) = interaction.run().await {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Interaction finished with an error",
);
}
}.boxed();
if let Err(e) = ctx.spawn("recovery interaction", future).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to spawn a recovery interaction task",
);
}
Ok(())
}
/// Handles an availability recovery request.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_recover(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
receipt: CandidateReceipt,
session_index: SessionIndex,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
if let Some(result) = state.availability_lru.get(&candidate_hash) {
if let Err(e) = response_sender.send(result.clone()) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error responding with an availability recovery result",
);
}
return Ok(());
}
if let Some(interaction) = state.interactions.get_mut(&candidate_hash) {
interaction.awaiting.push(response_sender);
return Ok(());
}
let session_info = request_session_info_ctx(
state.live_block_hash,
session_index,
ctx,
).await?.await.map_err(error::Error::CanceledSessionInfo)??;
match session_info {
Some(session_info) => {
launch_interaction(
state,
ctx,
session_index,
session_info,
receipt,
response_sender,
).await
}
None => {
tracing::warn!(
target: LOG_TARGET,
"SessionInfo is `None` at {}", state.live_block_hash,
);
response_sender
.send(Err(RecoveryError::Unavailable))
.map_err(|_| error::Error::CanceledResponseSender)?;
Ok(())
}
}
}
/// Queries a chunk from av-store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> error::Result<Option<ErasureChunk>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
)).await;
Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?)
}
/// Handles message from interaction.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_from_interaction(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
from_interaction: FromInteraction,
) -> error::Result<()> {
match from_interaction {
FromInteraction::Concluded(candidate_hash, result) => {
// Load the entry from the interactions map.
// It should always exist, if not for logic errors.
if let Some(interaction) = state.interactions.remove(&candidate_hash) {
// Send the result to each member of awaiting.
for awaiting in interaction.awaiting {
if let Err(_) = awaiting.send(result.clone()) {
tracing::debug!(
target: LOG_TARGET,
"An awaiting side of the interaction has been canceled",
);
}
}
} else {
tracing::warn!(
target: LOG_TARGET,
"Interaction under candidate hash {} is missing",
candidate_hash,
);
}
state.availability_lru.put(candidate_hash, result);
}
FromInteraction::MakeRequest(id, candidate_hash, validator_index, response) => {
let (tx, rx) = mpsc::channel(2);
let message = NetworkBridgeMessage::ConnectToValidators {
validator_ids: vec![id.clone()],
connected: tx,
};
ctx.send_message(AllMessages::NetworkBridge(message)).await;
let token = state.connecting_validators.push(rx);
state.discovering_validators.entry(id).or_default().push(AwaitedChunk {
validator_index,
candidate_hash,
token,
response,
});
}
FromInteraction::ReportPeer(peer_id, rep) => {
report_peer(ctx, peer_id, rep).await;
}
}
Ok(())
}
/// Handles a network bridge update.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
update: NetworkBridgeEvent<protocol_v1::AvailabilityRecoveryMessage>,
) -> error::Result<()> {
match update {
NetworkBridgeEvent::PeerMessage(peer, message) => {
match message {
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash,
validator_index,
) => {
// Issue a
// AvailabilityStore::QueryChunk(candidate-hash, validator_index, response)
// message.
let chunk = query_chunk(ctx, candidate_hash, validator_index).await?;
// Whatever the result, issue an
// AvailabilityRecoveryV1Message::Chunk(r_id, response) message.
let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
chunk,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer],
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
),
)).await;
}
protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
match state.live_chunk_requests.remove(&request_id) {
None => {
// If there doesn't exist one, report the peer and return.
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, awaited_chunk)) if peer_id == peer => {
// If there exists an entry under r_id, remove it.
// Send the chunk response on the awaited_chunk for the interaction to handle.
if let Some(chunk) = chunk {
if awaited_chunk.response.send(Ok((peer_id, chunk))).is_err() {
tracing::debug!(
target: LOG_TARGET,
"A sending side of the recovery request is closed",
);
}
}
}
Some(a) => {
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
state.live_chunk_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
}
}
}
}
// We do not really need to track the peers' views in this subsystem
// since the peers are _required_ to have the data we are interested in.
NetworkBridgeEvent::PeerViewChange(_, _) => {}
NetworkBridgeEvent::OurViewChange(_) => {}
// All peer connections are handled via validator discovery API.
NetworkBridgeEvent::PeerConnected(_, _) => {}
NetworkBridgeEvent::PeerDisconnected(_) => {}
}
Ok(())
}
/// Issues a chunk request to the validator we've been waiting for to connect to us.
async fn issue_chunk_request(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
peer_id: PeerId,
awaited_chunk: AwaitedChunk,
) -> error::Result<()> {
let request_id = state.next_request_id;
state.next_request_id += 1;
let wire_message = protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
),
)).await;
state.live_chunk_requests.insert(request_id, (peer_id, awaited_chunk));
Ok(())
}
/// Handles a newly connected validator in the context of some relay leaf.
async fn handle_validator_connected(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
authority_id: AuthorityDiscoveryId,
peer_id: PeerId,
) -> error::Result<()> {
if let Some(discovering) = state.discovering_validators.remove(&authority_id) {
for chunk in discovering {
issue_chunk_request(state, ctx, peer_id.clone(), chunk).await?;
}
}
Ok(())
}
/// Awaited chunks info that `State` holds has to be cleaned up
/// periodically since there is no way `Interaction` can communicate
/// a timedout request.
fn cleanup_awaited_chunks(state: &mut State) {
let mut removed_tokens = Vec::new();
for (_, v) in state.discovering_validators.iter_mut() {
v.retain(|e| if !e.response.is_canceled() {
removed_tokens.push(e.token);
false
} else {
true
});
}
for token in removed_tokens {
Pin::new(&mut state.connecting_validators).remove(token);
}
state.discovering_validators.retain(|_, v| !v.is_empty());
state.live_chunk_requests.retain(|_, v| !v.1.response.is_canceled());
}
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem`.
pub fn new() -> Self {
Self
}
async fn run(
self,
mut ctx: impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
) -> SubsystemResult<()> {
let mut state = State::default();
let awaited_chunk_cleanup_interval = futures::stream::repeat(()).then(|_| async move {
Delay::new(AWAITED_CHUNKS_CLEANUP_INTERVAL).await;
});
futures::pin_mut!(awaited_chunk_cleanup_interval);
loop {
futures::select_biased! {
_v = awaited_chunk_cleanup_interval.next() => {
cleanup_awaited_chunks(&mut state);
}
v = state.connecting_validators.next() => {
if let Some((v, token)) = v {
match v {
StreamYield::Item(v) => {
if let Err(e) = handle_validator_connected(
&mut state,
&mut ctx,
v.0,
v.1,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to handle a newly connected validator",
);
}
}
StreamYield::Finished(_) => {
Pin::new(&mut state.connecting_validators).remove(token);
}
}
}
}
v = ctx.recv().fuse() => {
match v? {
FromOverseer::Signal(signal) => if handle_signal(
&mut state,
signal,
).await? {
return Ok(());
}
FromOverseer::Communication { msg } => {
match msg {
AvailabilityRecoveryMessage::RecoverAvailableData(
receipt,
session_index,
response_sender,
) => {
if let Err(e) = handle_recover(
&mut state,
&mut ctx,
receipt,
session_index,
response_sender,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error handling a recovery request",
);
}
}
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(event) => {
if let Err(e) = handle_network_update(
&mut state,
&mut ctx,
event,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error handling a network bridge update",
);
}
}
}
}
}
}
from_interaction = state.from_interaction_rx.next() => {
if let Some(from_interaction) = from_interaction {
if let Err(e) = handle_from_interaction(
&mut state,
&mut ctx,
from_interaction,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error handling message from interaction",
);
}
}
}
}
}
}
}
@@ -0,0 +1,565 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::time::Duration;
use std::sync::Arc;
use futures::{executor, future};
use futures_timer::Delay;
use assert_matches::assert_matches;
use smallvec::smallvec;
use super::*;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, PersistedValidationData, PoV, BlockData, HeadData,
};
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
struct TestHarness {
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = ()>>(
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_availability_recovery"),
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityRecoverySubsystem::new();
let subsystem = subsystem.run(context);
let test_fut = test(TestHarness { virtual_overseer });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
}
const TIMEOUT: Duration = Duration::from_millis(100);
macro_rules! delay {
($delay:expr) => {
Delay::new(Duration::from_millis($delay)).await;
};
}
async fn overseer_signal(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>,
signal: OverseerSignal,
) {
delay!(50);
overseer
.send(FromOverseer::Signal(signal))
.timeout(TIMEOUT)
.await
.expect("10ms is more than enough for sending signals.");
}
async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>,
msg: AvailabilityRecoveryMessage,
) {
tracing::trace!(msg = ?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
.await
.expect("10ms is more than enough for sending messages.");
}
async fn overseer_recv(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>,
) -> AllMessages {
tracing::trace!("waiting for message ...");
let msg = overseer
.recv()
.timeout(TIMEOUT)
.await
.expect("TIMEOUT is enough to recv.");
tracing::trace!(msg = ?msg, "received message");
msg
}
use sp_keyring::Sr25519Keyring;
#[derive(Clone)]
struct TestState {
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validator_authority_id: Vec<AuthorityDiscoveryId>,
validator_peer_id: Vec<PeerId>,
current: Hash,
candidate: CandidateReceipt,
session_index: SessionIndex,
persisted_validation_data: PersistedValidationData,
available_data: AvailableData,
chunks: Vec<ErasureChunk>,
}
impl TestState {
async fn test_runtime_api(
&self,
virtual_overseer: &mut VirtualOverseer,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(
session_index,
tx,
)
)) => {
assert_eq!(relay_parent, self.current);
assert_eq!(session_index, self.session_index);
tx.send(Ok(Some(SessionInfo {
validators: self.validator_public.clone(),
discovery_keys: self.validator_authority_id.clone(),
..Default::default()
}))).unwrap();
}
);
}
async fn test_connect_to_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
) {
// Channels by AuthorityDiscoveryId to send results to.
// Gather them here and send in batch after the loop not to race.
let mut results = HashMap::new();
for _ in 0..self.validator_public.len() {
// Connect to shuffled validators one by one.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
..
}
) => {
for validator_id in validator_ids {
let idx = self.validator_authority_id
.iter()
.position(|x| *x == validator_id)
.unwrap();
results.insert(
(
self.validator_authority_id[idx].clone(),
self.validator_peer_id[idx].clone(),
),
connected.clone(),
);
}
}
);
}
for (k, mut v) in results.into_iter() {
v.send(k).await.unwrap();
}
}
async fn test_chunk_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
) {
for _ in 0..self.validator_public.len() {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
_peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash_recvd,
validator_index,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
(request_id, validator_index)
}
);
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index as usize].clone(),
protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
Some(self.chunks[validator_index as usize].clone()),
)
)
)
).await;
}
);
}
}
async fn test_faulty_chunk_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
faulty: &[bool],
) {
for _ in 0..self.validator_public.len() {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
_peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash_recvd,
validator_index,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
(request_id, validator_index)
}
);
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index as usize].clone(),
protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
Some(self.chunks[validator_index as usize].clone()),
)
)
)
).await;
}
);
}
for i in 0..self.validator_public.len() {
if faulty[i] {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(
peer,
rep,
)
) => {
assert_eq!(rep, COST_MERKLE_PROOF_INVALID);
// These may arrive in any order since the interaction implementation
// uses `FuturesUnordered`.
assert!(self.validator_peer_id.iter().find(|p| **p == peer).is_some());
}
);
}
}
}
}
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
fn derive_erasure_chunks_with_proofs_and_root(
n_validators: usize,
available_data: &AvailableData,
) -> (Vec<ErasureChunk>, Hash) {
let chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
// create proofs for each erasure chunk
let branches = branches(chunks.as_ref());
let root = branches.root();
let erasure_chunks = branches
.enumerate()
.map(|(index, (proof, chunk))| ErasureChunk {
chunk: chunk.to_vec(),
index: index as _,
proof,
})
.collect::<Vec<ErasureChunk>>();
(erasure_chunks, root)
}
impl Default for TestState {
fn default() -> Self {
let validators = vec![
Sr25519Keyring::Ferdie, // <- this node, role: validator
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
];
let validator_public = validator_pubkeys(&validators);
let validator_authority_id = validator_authority_id(&validators);
let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
.take(validator_public.len())
.collect();
let current = Hash::repeat_byte(1);
let mut candidate = CandidateReceipt::default();
let session_index = 10;
let persisted_validation_data = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
block_number: Default::default(),
hrmp_mqc_heads: Vec::new(),
dmq_mqc_head: Default::default(),
max_pov_size: 1024,
relay_storage_root: Default::default(),
};
let pov = PoV {
block_data: BlockData(vec![42; 64]),
};
let available_data = AvailableData {
validation_data: persisted_validation_data.clone(),
pov: Arc::new(pov),
};
let (chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
validators.len(),
&available_data,
);
candidate.descriptor.erasure_root = erasure_root;
candidate.descriptor.relay_parent = Hash::repeat_byte(10);
Self {
validators,
validator_public,
validator_authority_id,
validator_peer_id,
current,
candidate,
session_index,
persisted_validation_data,
available_data,
chunks,
}
}
}
#[test]
fn availability_is_recovered() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
let (tx, rx) = oneshot::channel();
// Test another candidate, send no chunks.
let mut new_candidate = CandidateReceipt::default();
new_candidate.descriptor.relay_parent = test_state.candidate.descriptor.relay_parent;
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
new_candidate,
test_state.session_index,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
});
}
#[test]
fn a_faulty_chunk_leads_to_recovery_error() {
let mut test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
// Create some faulty chunks.
test_state.chunks[0].chunk = vec![1; 32];
test_state.chunks[1].chunk = vec![2; 32];
let mut faulty = vec![false; test_state.chunks.len()];
faulty[0] = true;
faulty[1] = true;
test_state.test_faulty_chunk_requests(
candidate_hash,
&mut virtual_overseer,
&faulty,
).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
});
}
#[test]
fn a_wrong_chunk_leads_to_recovery_error() {
let mut test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
// Send a wrong chunk so it passes proof check but fails to reconstruct.
test_state.chunks[1] = test_state.chunks[0].clone();
test_state.chunks[2] = test_state.chunks[0].clone();
test_state.chunks[3] = test_state.chunks[0].clone();
test_state.chunks[4] = test_state.chunks[0].clone();
let faulty = vec![false; test_state.chunks.len()];
test_state.test_faulty_chunk_requests(
candidate_hash,
&mut virtual_overseer,
&faulty,
).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
});
}
+14 -1
View File
@@ -282,7 +282,7 @@ impl View {
pub mod v1 {
use polkadot_primitives::v1::{
Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt,
SignedAvailabilityBitfield, PoV, CandidateHash,
SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex,
};
use polkadot_node_primitives::SignedFullStatement;
use parity_scale_codec::{Encode, Decode};
@@ -297,6 +297,16 @@ pub mod v1 {
Chunk(CandidateHash, ErasureChunk),
}
/// Network messages used by the availability recovery subsystem.
#[derive(Debug, Clone, Encode, Decode, PartialEq)]
pub enum AvailabilityRecoveryMessage {
/// Request a chunk for a given candidate hash and validator index.
RequestChunk(RequestId, CandidateHash, ValidatorIndex),
/// Respond with chunk for a given candidate hash and validator index.
/// The response may be `None` if the requestee does not have the chunk.
Chunk(RequestId, Option<ErasureChunk>),
}
/// Network messages used by the bitfield distribution subsystem.
#[derive(Debug, Clone, Encode, Decode, PartialEq)]
pub enum BitfieldDistributionMessage {
@@ -359,6 +369,9 @@ pub mod v1 {
/// Statement distribution messages
#[codec(index = "3")]
StatementDistribution(StatementDistributionMessage),
/// Availability recovery messages
#[codec(index = "4")]
AvailabilityRecovery(AvailabilityRecoveryMessage),
}
impl_try_from!(ValidationProtocol, AvailabilityDistribution, AvailabilityDistributionMessage);
+97 -22
View File
@@ -85,6 +85,7 @@ use polkadot_subsystem::messages::{
AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage,
AvailabilityRecoveryMessage,
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
@@ -524,6 +525,9 @@ pub struct Overseer<S> {
/// An availability distribution subsystem.
availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>,
/// An availability recovery subsystem.
availability_recovery_subsystem: OverseenSubsystem<AvailabilityRecoveryMessage>,
/// A bitfield signing subsystem.
bitfield_signing_subsystem: OverseenSubsystem<BitfieldSigningMessage>,
@@ -593,8 +597,8 @@ pub struct Overseer<S> {
/// message type that is specific to this [`Subsystem`]. At the moment not all
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
pub struct AllSubsystems<
CV = (), CB = (), CS = (), SD = (), AD = (), BS = (), BD = (), P = (),
PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = ()
CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (),
> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
@@ -606,6 +610,8 @@ pub struct AllSubsystems<
pub statement_distribution: SD,
/// An availability distribution subsystem.
pub availability_distribution: AD,
/// An availability recovery subsystem.
pub availability_recovery: AR,
/// A bitfield signing subsystem.
pub bitfield_signing: BS,
/// A bitfield distribution subsystem.
@@ -628,8 +634,8 @@ pub struct AllSubsystems<
pub collator_protocol: CP,
}
impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
{
/// Create a new instance of [`AllSubsystems`].
///
@@ -658,6 +664,7 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem
> {
AllSubsystems {
@@ -666,6 +673,7 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
availability_recovery: DummySubsystem,
bitfield_signing: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
@@ -683,13 +691,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_candidate_validation<NEW>(
self,
candidate_validation: NEW,
) -> AllSubsystems<NEW, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<NEW, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -707,13 +716,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_candidate_backing<NEW>(
self,
candidate_backing: NEW,
) -> AllSubsystems<CV, NEW, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, NEW, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -731,13 +741,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_candidate_selection<NEW>(
self,
candidate_selection: NEW,
) -> AllSubsystems<CV, CB, NEW, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, NEW, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -755,13 +766,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_statement_distribution<NEW>(
self,
statement_distribution: NEW,
) -> AllSubsystems<CV, CB, CS, NEW, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, NEW, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -779,13 +791,39 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_availability_distribution<NEW>(
self,
availability_distribution: NEW,
) -> AllSubsystems<CV, CB, CS, SD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, NEW, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `availability_recovery` instance in `self`.
pub fn replace_availability_recovery<NEW>(
self,
availability_recovery: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -803,13 +841,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_bitfield_signing<NEW>(
self,
bitfield_signing: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -827,13 +866,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_bitfield_distribution<NEW>(
self,
bitfield_distribution: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution,
provisioner: self.provisioner,
@@ -851,13 +891,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_provisioner<NEW>(
self,
provisioner: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner,
@@ -875,13 +916,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_pov_distribution<NEW>(
self,
pov_distribution: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -899,13 +941,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_runtime_api<NEW>(
self,
runtime_api: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -923,13 +966,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_availability_store<NEW>(
self,
availability_store: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -947,13 +991,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_network_bridge<NEW>(
self,
network_bridge: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -971,13 +1016,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_chain_api<NEW>(
self,
chain_api: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -995,13 +1041,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_collation_generation<NEW>(
self,
collation_generation: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -1019,13 +1066,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
pub fn replace_collator_protocol<NEW>(
self,
collator_protocol: NEW,
) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW> {
) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW> {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
availability_recovery: self.availability_recovery,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
@@ -1260,9 +1308,9 @@ where
/// #
/// # }); }
/// ```
pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>(
pub fn new<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>(
leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>,
all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>,
prometheus_registry: Option<&prometheus::Registry>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)>
@@ -1272,6 +1320,7 @@ where
CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
AR: Subsystem<OverseerSubsystemContext<AvailabilityRecoveryMessage>> + Send,
BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send,
BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + Send,
P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + Send,
@@ -1357,6 +1406,15 @@ where
&mut seed,
)?;
let availability_recovery_subsystem = spawn(
&mut s,
&mut running_subsystems,
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.availability_recovery,
&metrics,
&mut seed,
)?;
let bitfield_signing_subsystem = spawn(
&mut s,
&mut running_subsystems,
@@ -1462,6 +1520,7 @@ where
candidate_selection_subsystem,
statement_distribution_subsystem,
availability_distribution_subsystem,
availability_recovery_subsystem,
bitfield_signing_subsystem,
bitfield_distribution_subsystem,
provisioner_subsystem,
@@ -1493,6 +1552,7 @@ where
let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.availability_recovery_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await;
@@ -1660,6 +1720,7 @@ where
self.candidate_selection_subsystem.send_signal(signal.clone()).await?;
self.statement_distribution_subsystem.send_signal(signal.clone()).await?;
self.availability_distribution_subsystem.send_signal(signal.clone()).await?;
self.availability_recovery_subsystem.send_signal(signal.clone()).await?;
self.bitfield_signing_subsystem.send_signal(signal.clone()).await?;
self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?;
self.provisioner_subsystem.send_signal(signal.clone()).await?;
@@ -1694,6 +1755,9 @@ where
AllMessages::AvailabilityDistribution(msg) => {
self.availability_distribution_subsystem.send_message(msg).await?;
},
AllMessages::AvailabilityRecovery(msg) => {
let _ = self.availability_recovery_subsystem.send_message(msg).await;
},
AllMessages::BitfieldDistribution(msg) => {
self.bitfield_distribution_subsystem.send_message(msg).await?;
},
@@ -2538,6 +2602,15 @@ mod tests {
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}
fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage {
let (sender, _) = oneshot::channel();
AvailabilityRecoveryMessage::RecoverAvailableData(
Default::default(),
Default::default(),
sender,
)
}
fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}
@@ -2589,6 +2662,7 @@ mod tests {
collator_protocol: subsystem.clone(),
statement_distribution: subsystem.clone(),
availability_distribution: subsystem.clone(),
availability_recovery: subsystem.clone(),
bitfield_signing: subsystem.clone(),
bitfield_distribution: subsystem.clone(),
provisioner: subsystem.clone(),
@@ -2624,6 +2698,7 @@ mod tests {
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
@@ -2638,7 +2713,7 @@ mod tests {
select! {
res = overseer_fut => {
const NUM_SUBSYSTEMS: usize = 15;
const NUM_SUBSYSTEMS: usize = 16;
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
// x2 because of broadcast_signal on startup
+2
View File
@@ -79,6 +79,7 @@ rococo-runtime = { path = "../../runtime/rococo" }
# Polkadot Subsystems
polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution", optional = true }
polkadot-availability-distribution = { path = "../network/availability-distribution", optional = true }
polkadot-availability-recovery = { path = "../network/availability-recovery", optional = true }
polkadot-collator-protocol = { path = "../network/collator-protocol", optional = true }
polkadot-network-bridge = { path = "../network/bridge", optional = true }
polkadot-node-collation-generation = { path = "../collation-generation", optional = true }
@@ -114,6 +115,7 @@ runtime-benchmarks = [
real-overseer = [
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-availability-recovery",
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
+3
View File
@@ -397,12 +397,15 @@ where
use polkadot_node_core_provisioner::ProvisioningSubsystem as ProvisionerSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem;
use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
let all_subsystems = AllSubsystems {
availability_distribution: AvailabilityDistributionSubsystem::new(
keystore.clone(),
Metrics::register(registry)?,
),
availability_recovery: AvailabilityRecoverySubsystem::new(
),
availability_store: AvailabilityStoreSubsystem::new_on_disk(
availability_config,
Metrics::register(registry)?,
+18
View File
@@ -59,3 +59,21 @@ impl core::fmt::Display for ChainApiError {
}
impl std::error::Error for ChainApiError {}
/// An error that may happen during Availability Recovery process.
#[derive(PartialEq, Debug, Clone)]
pub enum RecoveryError {
/// A chunk is recovered but is invalid.
Invalid,
/// A requested chunk is unavailable.
Unavailable,
}
impl std::fmt::Display for RecoveryError {
fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> {
write!(f, "{}", self)
}
}
impl std::error::Error for RecoveryError {}
+15
View File
@@ -245,6 +245,19 @@ pub enum AvailabilityDistributionMessage {
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>),
}
/// Availability Recovery Message.
#[derive(Debug)]
pub enum AvailabilityRecoveryMessage {
/// Recover available data from validators on the network.
RecoverAvailableData(
CandidateReceipt,
SessionIndex,
oneshot::Sender<Result<AvailableData, crate::errors::RecoveryError>>,
),
/// Event from the network bridge.
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityRecoveryMessage>),
}
impl AvailabilityDistributionMessage {
/// If the current variant contains the relay parent hash, return it.
pub fn relay_parent(&self) -> Option<Hash> {
@@ -596,6 +609,8 @@ pub enum AllMessages {
StatementDistribution(StatementDistributionMessage),
/// Message for the availability distribution subsystem.
AvailabilityDistribution(AvailabilityDistributionMessage),
/// Message for the availability recovery subsystem.
AvailabilityRecovery(AvailabilityRecoveryMessage),
/// Message for the bitfield distribution subsystem.
BitfieldDistribution(BitfieldDistributionMessage),
/// Message for the bitfield signing subsystem.