A fast-path for requesting AvailableData from backing validators (#2453)

* guide changes for a fast-path requesting from backing validators

* add backing group to availability recovery message

* add new phase to interaction

* typos

* add full data messages

* handle new network messages

* dispatch full data requests

* cleanup

* check chunk index

* test for invalid recovery

* tests

* Typos.

* fix some grumbles

* be more explicit about error handling and control flow

* fast-path param

* use with_chunks_only in Service

Co-authored-by: Robert Klotzner <robert.klotzner@gmx.at>
This commit is contained in:
Robert Habermeier
2021-02-17 13:51:50 -06:00
committed by GitHub
parent 4a5e5f13ae
commit b7aac51341
11 changed files with 950 additions and 196 deletions
@@ -28,6 +28,9 @@ pub enum Error {
#[error("failed to query a chunk from store")]
CanceledQueryChunk(#[source] oneshot::Canceled),
#[error("failed to query full data from store")]
CanceledQueryFullData(#[source] oneshot::Canceled),
#[error("failed to query session info")]
CanceledSessionInfo(#[source] oneshot::Canceled),
@@ -25,13 +25,13 @@ use std::pin::Pin;
use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use lru::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use rand::seq::SliceRandom;
use streamunordered::{StreamUnordered, StreamYield};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
Hash, ErasureChunk, ValidatorId, ValidatorIndex,
SessionInfo, SessionIndex, BlakeTwo256, HashT,
SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex,
};
use polkadot_subsystem::{
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
@@ -59,6 +59,7 @@ const LOG_TARGET: &str = "availability_recovery";
const COST_MERKLE_PROOF_INVALID: Rep = Rep::CostMinor("Merkle proof was invalid");
const COST_UNEXPECTED_CHUNK: Rep = Rep::CostMinor("Peer has sent an unexpected chunk");
const COST_INVALID_AVAILABLE_DATA: Rep = Rep::CostMinor("Peer provided invalid available data");
// How many parallel requests interaction should have going at once.
const N_PARALLEL: usize = 50;
@@ -67,18 +68,54 @@ const N_PARALLEL: usize = 50;
const LRU_SIZE: usize = 16;
// A timeout for a chunk request.
#[cfg(not(test))]
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
// A period to poll and clean AwaitedChunks.
const AWAITED_CHUNKS_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
#[cfg(test)]
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
// A timeout for a full data request.
#[cfg(not(test))]
const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
// A period to poll and clean awaited data.
const AWAITED_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem;
pub struct AvailabilityRecoverySubsystem {
fast_path: bool,
}
type ChunkResponse = Result<(PeerId, ErasureChunk), RecoveryError>;
type DataResponse<T> = (PeerId, ValidatorIndex, T);
/// Data we keep around for every chunk that we are awaiting.
struct AwaitedChunk {
/// Awaited data from the network.
enum Awaited {
Chunk(AwaitedData<ErasureChunk>),
FullData(AwaitedData<AvailableData>),
}
impl Awaited {
fn is_canceled(&self) -> bool {
match *self {
Awaited::Chunk(ref c) => c.response.is_canceled(),
Awaited::FullData(ref fd) => fd.response.is_canceled(),
}
}
/// Token to cancel the connection request to the validator.
fn token(&self) -> usize {
match *self {
Awaited::Chunk(ref c) => c.token,
Awaited::FullData(ref fd) => fd.token,
}
}
}
/// Data we keep around for network data that we are awaiting.
struct AwaitedData<T> {
/// Index of the validator we have requested this chunk from.
validator_index: ValidatorIndex,
@@ -89,7 +126,7 @@ struct AwaitedChunk {
token: usize,
/// Result sender.
response: oneshot::Sender<ChunkResponse>,
response: oneshot::Sender<DataResponse<T>>,
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
@@ -104,11 +141,19 @@ enum FromInteraction {
Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
/// Make a request of a particular chunk from a particular validator.
MakeRequest(
MakeChunkRequest(
AuthorityDiscoveryId,
CandidateHash,
ValidatorIndex,
oneshot::Sender<ChunkResponse>,
oneshot::Sender<DataResponse<ErasureChunk>>,
),
/// Make a request of the full data from a particular validator.
MakeFullDataRequest(
AuthorityDiscoveryId,
CandidateHash,
ValidatorIndex,
oneshot::Sender<DataResponse<AvailableData>>,
),
/// Report a peer.
@@ -118,21 +163,27 @@ enum FromInteraction {
),
}
/// A state of a single interaction reconstructing an available data.
struct Interaction {
/// A communication channel with the `State`.
to_state: mpsc::Sender<FromInteraction>,
struct RequestFromBackersPhase {
// a random shuffling of the validators from the backing group which indicates the order
// in which we connect to them and request the chunk.
shuffled_backers: Vec<ValidatorIndex>,
}
struct RequestChunksPhase {
// a random shuffling of the validators which indicates the order in which we connect to the validators and
// request the chunk from them.
shuffling: Vec<ValidatorIndex>,
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<DataResponse<ErasureChunk>>>>,
}
struct InteractionParams {
/// Discovery ids of `validators`.
validator_authority_keys: Vec<AuthorityDiscoveryId>,
/// Validators relevant to this `Interaction`.
validators: Vec<ValidatorId>,
/// A random shuffling of the validators which indicates the order in which we connect
/// to the validators and request the chunk from them.
shuffling: Vec<ValidatorIndex>,
/// The number of pieces needed.
threshold: usize,
@@ -141,35 +192,118 @@ struct Interaction {
/// The root of the erasure encoding of the para block.
erasure_root: Hash,
/// The chunks that we have received from peers.
received_chunks: HashMap<PeerId, ErasureChunk>,
/// The chunk requests that are waiting to complete.
requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<ChunkResponse>>>,
}
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
n_validators: usize,
threshold: usize,
) -> bool {
received_chunks + requesting_chunks + n_validators < threshold
enum InteractionPhase {
RequestFromBackers(RequestFromBackersPhase),
RequestChunks(RequestChunksPhase),
}
impl Interaction {
async fn launch_parallel_requests(&mut self) -> error::Result<()> {
/// A state of a single interaction reconstructing an available data.
struct Interaction {
/// A communication channel with the `State`.
to_state: mpsc::Sender<FromInteraction>,
/// The parameters of the interaction.
params: InteractionParams,
/// The phase of the interaction.
phase: InteractionPhase,
}
impl RequestFromBackersPhase {
fn new(mut backers: Vec<ValidatorIndex>) -> Self {
backers.shuffle(&mut rand::thread_rng());
RequestFromBackersPhase {
shuffled_backers: backers,
}
}
// Run this phase to completion, returning `true` if data was successfully recovered and
// false otherwise.
async fn run(
&mut self,
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>
) -> Result<bool, mpsc::SendError> {
loop {
// Pop the next backer, and proceed to next phase if we're out.
let validator_index = match self.shuffled_backers.pop() {
None => return Ok(false),
Some(i) => i,
};
let (tx, rx) = oneshot::channel();
// Request data.
to_state.send(FromInteraction::MakeFullDataRequest(
params.validator_authority_keys[validator_index as usize].clone(),
params.candidate_hash.clone(),
validator_index,
tx,
)).await?;
match rx.timeout(FULL_DATA_REQUEST_TIMEOUT).await {
Some(Ok((peer_id, _validator_index, data))) => {
if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
to_state.send(
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
).await?;
return Ok(true);
} else {
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_INVALID_AVAILABLE_DATA,
)).await?;
}
}
Some(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A response channel was cancelled while waiting for full data",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
"A full data request has timed out",
);
}
}
}
}
}
impl RequestChunksPhase {
fn new(n_validators: ValidatorIndex) -> Self {
let mut shuffling: Vec<_> = (0..n_validators).collect();
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksPhase {
shuffling,
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
}
}
async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>,
) -> Result<(), mpsc::SendError> {
while self.requesting_chunks.len() < N_PARALLEL {
if let Some(validator_index) = self.shuffling.pop() {
let (tx, rx) = oneshot::channel();
self.to_state.send(FromInteraction::MakeRequest(
self.validator_authority_keys[validator_index as usize].clone(),
self.candidate_hash.clone(),
to_state.send(FromInteraction::MakeChunkRequest(
params.validator_authority_keys[validator_index as usize].clone(),
params.candidate_hash.clone(),
validator_index,
tx,
)).await.map_err(error::Error::ClosedToState)?;
)).await?;
self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT));
} else {
@@ -180,7 +314,11 @@ impl Interaction {
Ok(())
}
async fn wait_for_chunks(&mut self) -> error::Result<()> {
async fn wait_for_chunks(
&mut self,
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>,
) -> Result<(), mpsc::SendError> {
// Check if the requesting chunks is not empty not to poll to completion.
if self.requesting_chunks.is_empty() {
return Ok(());
@@ -189,43 +327,49 @@ impl Interaction {
// Poll for new updates from requesting_chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
match request_result {
Some(Ok(Ok((peer_id, chunk)))) => {
Some(Ok((peer_id, validator_index, chunk))) => {
// Check merkle proofs of any received chunks, and any failures should
// lead to issuance of a FromInteraction::ReportPeer message.
// We need to check that the validator index matches the chunk index and
// not blindly trust the data from an untrusted peer.
if validator_index != chunk.index {
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
continue;
}
if let Ok(anticipated_hash) = branch_hash(
&self.erasure_root,
&params.erasure_root,
&chunk.proof,
chunk.index as usize,
) {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
self.to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await.map_err(error::Error::ClosedToState)?;
}
} else {
self.to_state.send(FromInteraction::ReportPeer(
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await.map_err(error::Error::ClosedToState)?;
)).await?;
} else {
self.received_chunks.insert(validator_index, chunk);
}
} else {
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
}
self.received_chunks.insert(peer_id, chunk);
}
Some(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A response channel was cacelled while waiting for a chunk",
);
}
Some(Ok(Err(e))) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A chunk request ended with an error",
"A response channel was cancelled while waiting for a chunk",
);
}
None => {
@@ -233,8 +377,6 @@ impl Interaction {
target: LOG_TARGET,
"A chunk request has timed out",
);
// we break here to launch another request.
break;
}
}
}
@@ -242,58 +384,70 @@ impl Interaction {
Ok(())
}
async fn run(mut self) -> error::Result<()> {
async fn run(
&mut self,
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>,
) -> Result<(), mpsc::SendError> {
loop {
if is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
self.threshold,
params.threshold,
) {
self.to_state.send(FromInteraction::Concluded(
self.candidate_hash,
to_state.send(FromInteraction::Concluded(
params.candidate_hash,
Err(RecoveryError::Unavailable),
)).await.map_err(error::Error::ClosedToState)?;
)).await?;
return Ok(());
}
self.launch_parallel_requests().await?;
self.wait_for_chunks().await?;
self.launch_parallel_requests(params, to_state).await?;
self.wait_for_chunks(params, to_state).await?;
// If received_chunks has more than threshold entries, attempt to recover the data.
// If that fails, or a re-encoding of it doesn't match the expected erasure root,
// break and issue a FromInteraction::Concluded(RecoveryError::Invalid).
// Otherwise, issue a FromInteraction::Concluded(Ok(())).
if self.received_chunks.len() >= self.threshold {
if self.received_chunks.len() >= params.threshold {
let concluded = match polkadot_erasure_coding::reconstruct_v1(
self.validators.len(),
params.validators.len(),
self.received_chunks.values().map(|c| (&c.chunk[..], c.index as usize)),
) {
Ok(data) => {
if reconstructed_data_matches_root(self.validators.len(), &self.erasure_root, &data) {
FromInteraction::Concluded(self.candidate_hash.clone(), Ok(data))
if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
} else {
FromInteraction::Concluded(
self.candidate_hash.clone(),
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
}
}
Err(_) => FromInteraction::Concluded(
self.candidate_hash.clone(),
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
),
};
self.to_state.send(concluded).await.map_err(error::Error::ClosedToState)?;
to_state.send(concluded).await?;
return Ok(());
}
}
}
}
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
n_validators: usize,
threshold: usize,
) -> bool {
received_chunks + requesting_chunks + n_validators < threshold
}
fn reconstructed_data_matches_root(
n_validators: usize,
expected_root: &Hash,
@@ -316,6 +470,32 @@ fn reconstructed_data_matches_root(
branches.root() == *expected_root
}
impl Interaction {
async fn run(mut self) -> error::Result<()> {
loop {
// These only fail if we cannot reach the underlying subsystem, which case there is nothing
// meaningful we can do.
match self.phase {
InteractionPhase::RequestFromBackers(ref mut from_backers) => {
if from_backers.run(&self.params, &mut self.to_state).await
.map_err(error::Error::ClosedToState)?
{
break Ok(())
} else {
self.phase = InteractionPhase::RequestChunks(
RequestChunksPhase::new(self.params.validators.len() as _)
);
}
}
InteractionPhase::RequestChunks(ref mut from_all) => {
break from_all.run(&self.params, &mut self.to_state).await
.map_err(error::Error::ClosedToState)
}
}
}
}
}
struct State {
/// Each interaction is implemented as its own async task,
/// and these handles are for communicating with them.
@@ -325,12 +505,12 @@ struct State {
live_block_hash: Hash,
/// We are waiting for these validators to connect and as soon as they
/// do to request the needed chunks we are awaitinf for.
discovering_validators: HashMap<AuthorityDiscoveryId, Vec<AwaitedChunk>>,
/// do, request the needed data we are waiting for.
discovering_validators: HashMap<AuthorityDiscoveryId, Vec<Awaited>>,
/// Requests that we have issued to the already connected validators
/// about the chunks we are interested in.
live_chunk_requests: HashMap<RequestId, (PeerId, AwaitedChunk)>,
/// about the data we are interested in.
live_requests: HashMap<RequestId, (PeerId, Awaited)>,
/// Derive request ids from this.
next_request_id: RequestId,
@@ -357,7 +537,7 @@ impl Default for State {
interactions: HashMap::new(),
live_block_hash: Hash::default(),
discovering_validators: HashMap::new(),
live_chunk_requests: HashMap::new(),
live_requests: HashMap::new(),
next_request_id: 0,
connecting_validators: StreamUnordered::new(),
availability_lru: LruCache::new(LRU_SIZE),
@@ -394,7 +574,7 @@ async fn handle_signal(
Ok(false)
}
OverseerSignal::BlockFinalized(_, _) => Ok(false)
OverseerSignal::BlockFinalized(_, _) => Ok(false)
}
}
@@ -415,16 +595,12 @@ async fn launch_interaction(
session_index: SessionIndex,
session_info: SessionInfo,
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
let threshold = recovery_threshold(session_info.validators.len())?;
let to_state = state.from_interaction_tx.clone();
let candidate_hash = receipt.hash();
let erasure_root = receipt.descriptor.erasure_root;
let validators = session_info.validators.clone();
let validator_authority_keys = session_info.discovery_keys.clone();
let mut shuffling: Vec<_> = (0..validators.len() as ValidatorIndex).collect();
let candidate_hash = receipt.hash();
state.interactions.insert(
candidate_hash.clone(),
InteractionHandle {
@@ -432,22 +608,27 @@ async fn launch_interaction(
}
);
{
// make borrow checker happy.
let mut rng = thread_rng();
shuffling.shuffle(&mut rng);
}
let params = InteractionParams {
validator_authority_keys: session_info.discovery_keys.clone(),
validators: session_info.validators.clone(),
threshold: recovery_threshold(session_info.validators.len())?,
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
};
let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g.0 as usize))
.map(|group| InteractionPhase::RequestFromBackers(
RequestFromBackersPhase::new(group.clone())
))
.unwrap_or_else(|| InteractionPhase::RequestChunks(
RequestChunksPhase::new(params.validators.len() as _)
));
let interaction = Interaction {
to_state,
validator_authority_keys,
validators,
shuffling,
threshold,
candidate_hash,
erasure_root,
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
params,
phase,
};
let future = async move {
@@ -478,6 +659,7 @@ async fn handle_recover(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
receipt: CandidateReceipt,
session_index: SessionIndex,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
@@ -512,6 +694,7 @@ async fn handle_recover(
session_index,
session_info,
receipt,
backing_group,
response_sender,
).await
}
@@ -543,6 +726,20 @@ async fn query_chunk(
Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?)
}
/// Queries a chunk from av-store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_full_data(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
candidate_hash: CandidateHash,
) -> error::Result<Option<AvailableData>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx),
)).await;
Ok(rx.await.map_err(error::Error::CanceledQueryFullData)?)
}
/// Handles message from interaction.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_from_interaction(
@@ -574,7 +771,7 @@ async fn handle_from_interaction(
state.availability_lru.put(candidate_hash, result);
}
FromInteraction::MakeRequest(id, candidate_hash, validator_index, response) => {
FromInteraction::MakeChunkRequest(id, candidate_hash, validator_index, response) => {
let (tx, rx) = mpsc::channel(2);
let message = NetworkBridgeMessage::ConnectToValidators {
@@ -587,12 +784,33 @@ async fn handle_from_interaction(
let token = state.connecting_validators.push(rx);
state.discovering_validators.entry(id).or_default().push(AwaitedChunk {
state.discovering_validators.entry(id).or_default().push(Awaited::Chunk(AwaitedData {
validator_index,
candidate_hash,
token,
response,
});
}));
}
FromInteraction::MakeFullDataRequest(id, candidate_hash, validator_index, response) => {
let (tx, rx) = mpsc::channel(2);
let message = NetworkBridgeMessage::ConnectToValidators {
validator_ids: vec![id.clone()],
peer_set: PeerSet::Validation,
connected: tx,
};
ctx.send_message(AllMessages::NetworkBridge(message)).await;
let token = state.connecting_validators.push(rx);
println!("pushing full data request");
state.discovering_validators.entry(id).or_default().push(Awaited::FullData(AwaitedData {
validator_index,
candidate_hash,
token,
response,
}));
}
FromInteraction::ReportPeer(peer_id, rep) => {
report_peer(ctx, peer_id, rep).await;
@@ -637,16 +855,18 @@ async fn handle_network_update(
)).await;
}
protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
match state.live_chunk_requests.remove(&request_id) {
match state.live_requests.remove(&request_id) {
None => {
// If there doesn't exist one, report the peer and return.
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, awaited_chunk)) if peer_id == peer => {
Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => {
// If there exists an entry under r_id, remove it.
// Send the chunk response on the awaited_chunk for the interaction to handle.
if let Some(chunk) = chunk {
if awaited_chunk.response.send(Ok((peer_id, chunk))).is_err() {
if awaited_chunk.response.send(
(peer_id, awaited_chunk.validator_index, chunk)
).is_err() {
tracing::debug!(
target: LOG_TARGET,
"A sending side of the recovery request is closed",
@@ -657,14 +877,59 @@ async fn handle_network_update(
Some(a) => {
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
state.live_chunk_requests.insert(request_id, a);
state.live_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
}
}
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(_, _) |
protocol_v1::AvailabilityRecoveryMessage::FullData(_, _) => {
// handled in https://github.com/paritytech/polkadot/pull/2453
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
candidate_hash,
) => {
// Issue a
// AvailabilityStore::QueryAvailableData(candidate-hash, response)
// message.
let full_data = query_full_data(ctx, candidate_hash).await?;
// Whatever the result, issue an
// AvailabilityRecoveryV1Message::FullData(r_id, response) message.
let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData(
request_id,
full_data,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer],
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
),
)).await;
}
protocol_v1::AvailabilityRecoveryMessage::FullData(request_id, data) => {
match state.live_requests.remove(&request_id) {
None => {
// If there doesn't exist one, report the peer and return.
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
// If there exists an entry under r_id, remove it.
// Send the response on the awaited for the interaction to handle.
if let Some(data) = data {
if awaited.response.send((peer_id, awaited.validator_index, data)).is_err() {
tracing::debug!(
target: LOG_TARGET,
"A sending side of the recovery request is closed",
);
}
}
}
Some(a) => {
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
state.live_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
}
}
}
}
@@ -680,21 +945,27 @@ async fn handle_network_update(
Ok(())
}
/// Issues a chunk request to the validator we've been waiting for to connect to us.
async fn issue_chunk_request(
/// Issues a request to the validator we've been waiting for to connect to us.
async fn issue_request(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
peer_id: PeerId,
awaited_chunk: AwaitedChunk,
awaited: Awaited,
) -> error::Result<()> {
let request_id = state.next_request_id;
state.next_request_id += 1;
let wire_message = protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
);
let wire_message = match awaited {
Awaited::Chunk(ref awaited_chunk) => protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
),
Awaited::FullData(ref awaited_data) => protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
awaited_data.candidate_hash,
),
};
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
@@ -703,7 +974,7 @@ async fn issue_chunk_request(
),
)).await;
state.live_chunk_requests.insert(request_id, (peer_id, awaited_chunk));
state.live_requests.insert(request_id, (peer_id, awaited));
Ok(())
}
@@ -716,23 +987,23 @@ async fn handle_validator_connected(
peer_id: PeerId,
) -> error::Result<()> {
if let Some(discovering) = state.discovering_validators.remove(&authority_id) {
for chunk in discovering {
issue_chunk_request(state, ctx, peer_id.clone(), chunk).await?;
for awaited in discovering {
issue_request(state, ctx, peer_id.clone(), awaited).await?;
}
}
Ok(())
}
/// Awaited chunks info that `State` holds has to be cleaned up
/// Awaited info that `State` holds has to be cleaned up
/// periodically since there is no way `Interaction` can communicate
/// a timedout request.
fn cleanup_awaited_chunks(state: &mut State) {
fn cleanup_awaited(state: &mut State) {
let mut removed_tokens = Vec::new();
for (_, v) in state.discovering_validators.iter_mut() {
v.retain(|e| if e.response.is_canceled() {
removed_tokens.push(e.token);
v.retain(|e| if e.is_canceled() {
removed_tokens.push(e.token());
false
} else {
true
@@ -744,13 +1015,18 @@ fn cleanup_awaited_chunks(state: &mut State) {
}
state.discovering_validators.retain(|_, v| !v.is_empty());
state.live_chunk_requests.retain(|_, v| !v.1.response.is_canceled());
state.live_requests.retain(|_, v| !v.1.is_canceled());
}
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem`.
pub fn new() -> Self {
Self
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers.
pub fn with_fast_path() -> Self {
Self { fast_path: true }
}
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only() -> Self {
Self { fast_path: false }
}
async fn run(
@@ -759,16 +1035,16 @@ impl AvailabilityRecoverySubsystem {
) -> SubsystemResult<()> {
let mut state = State::default();
let awaited_chunk_cleanup_interval = futures::stream::repeat(()).then(|_| async move {
Delay::new(AWAITED_CHUNKS_CLEANUP_INTERVAL).await;
let awaited_cleanup_interval = futures::stream::repeat(()).then(|_| async move {
Delay::new(AWAITED_CLEANUP_INTERVAL).await;
});
futures::pin_mut!(awaited_chunk_cleanup_interval);
futures::pin_mut!(awaited_cleanup_interval);
loop {
futures::select_biased! {
_v = awaited_chunk_cleanup_interval.next() => {
cleanup_awaited_chunks(&mut state);
_v = awaited_cleanup_interval.next() => {
cleanup_awaited(&mut state);
}
v = state.connecting_validators.next() => {
if let Some((v, token)) = v {
@@ -806,6 +1082,7 @@ impl AvailabilityRecoverySubsystem {
AvailabilityRecoveryMessage::RecoverAvailableData(
receipt,
session_index,
maybe_backing_group,
response_sender,
) => {
if let Err(e) = handle_recover(
@@ -813,6 +1090,7 @@ impl AvailabilityRecoverySubsystem {
&mut ctx,
receipt,
session_index,
maybe_backing_group.filter(|_| self.fast_path),
response_sender,
).await {
tracing::warn!(
@@ -38,7 +38,7 @@ struct TestHarness {
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = ()>>(
fn test_harness_fast_path<T: Future<Output = ()>>(
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
@@ -53,7 +53,33 @@ fn test_harness<T: Future<Output = ()>>(
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityRecoverySubsystem::new();
let subsystem = AvailabilityRecoverySubsystem::with_fast_path();
let subsystem = subsystem.run(context);
let test_fut = test(TestHarness { virtual_overseer });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
}
fn test_harness_chunks_only<T: Future<Output = ()>>(
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_availability_recovery"),
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityRecoverySubsystem::with_chunks_only();
let subsystem = subsystem.run(context);
let test_fut = test(TestHarness { virtual_overseer });
@@ -112,6 +138,14 @@ async fn overseer_recv(
use sp_keyring::Sr25519Keyring;
#[derive(Debug, Clone)]
enum HasAvailableData {
No,
Yes,
Timeout,
Other(AvailableData),
}
#[derive(Clone)]
struct TestState {
validators: Vec<Sr25519Keyring>,
@@ -149,21 +183,31 @@ impl TestState {
tx.send(Ok(Some(SessionInfo {
validators: self.validator_public.clone(),
discovery_keys: self.validator_authority_id.clone(),
// all validators in the same group.
validator_groups: vec![(0..self.validators.len()).map(|i| i as ValidatorIndex).collect()],
..Default::default()
}))).unwrap();
}
);
}
async fn test_connect_to_all_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
) {
self.test_connect_to_validators(virtual_overseer, self.validator_public.len()).await;
}
async fn test_connect_to_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
n: usize,
) {
// Channels by AuthorityDiscoveryId to send results to.
// Gather them here and send in batch after the loop not to race.
let mut results = HashMap::new();
for _ in 0..self.validator_public.len() {
for _ in 0..n {
// Connect to shuffled validators one by one.
assert_matches!(
overseer_recv(virtual_overseer).await,
@@ -305,6 +349,84 @@ impl TestState {
}
}
}
async fn test_full_data_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
who_has: &[HasAvailableData],
) {
for _ in 0..self.validator_public.len() {
self.test_connect_to_validators(virtual_overseer, 1).await;
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
candidate_hash_recvd,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
assert_eq!(peers.len(), 1);
let validator_index = self.validator_peer_id.iter().position(|p| p == &peers[0]).unwrap();
(request_id, validator_index)
}
);
let available_data = match who_has[validator_index] {
HasAvailableData::No => Some(None),
HasAvailableData::Yes => Some(Some(self.available_data.clone())),
HasAvailableData::Timeout => None,
HasAvailableData::Other(ref other) => Some(Some(other.clone())),
};
if let Some(maybe_data) = available_data {
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index].clone(),
protocol_v1::AvailabilityRecoveryMessage::FullData(
request_id,
maybe_data,
)
)
)
).await;
}
match who_has[validator_index] {
HasAvailableData::Yes => break, // done
HasAvailableData::No => {}
HasAvailableData::Timeout => { Delay::new(FULL_DATA_REQUEST_TIMEOUT).await }
HasAvailableData::Other(_) => {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(
p,
rep,
)
) => {
assert_eq!(p, self.validator_peer_id[validator_index]);
assert_eq!(rep, COST_INVALID_AVAILABLE_DATA);
}
);
}
}
}
);
}
}
}
@@ -319,8 +441,13 @@ fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryI
fn derive_erasure_chunks_with_proofs_and_root(
n_validators: usize,
available_data: &AvailableData,
alter_chunk: impl Fn(usize, &mut Vec<u8>),
) -> (Vec<ErasureChunk>, Hash) {
let chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
let mut chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
for (i, chunk) in chunks.iter_mut().enumerate() {
alter_chunk(i, chunk)
}
// create proofs for each erasure chunk
let branches = branches(chunks.as_ref());
@@ -379,6 +506,7 @@ impl Default for TestState {
let (chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
validators.len(),
&available_data,
|_, _| {},
);
candidate.descriptor.erasure_root = erasure_root;
@@ -400,10 +528,10 @@ impl Default for TestState {
}
#[test]
fn availability_is_recovered() {
fn availability_is_recovered_from_chunks_if_no_group_provided() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
@@ -421,13 +549,14 @@ fn availability_is_recovered() {
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
@@ -448,13 +577,14 @@ fn availability_is_recovered() {
AvailabilityRecoveryMessage::RecoverAvailableData(
new_candidate,
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
@@ -462,10 +592,10 @@ fn availability_is_recovered() {
}
#[test]
fn a_faulty_chunk_leads_to_recovery_error() {
let mut test_state = TestState::default();
fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunks_only() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness_chunks_only(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
@@ -483,22 +613,90 @@ fn a_faulty_chunk_leads_to_recovery_error() {
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
Some(GroupIndex(0)),
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
let (tx, rx) = oneshot::channel();
// Test another candidate, send no chunks.
let mut new_candidate = CandidateReceipt::default();
new_candidate.descriptor.relay_parent = test_state.candidate.descriptor.relay_parent;
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
new_candidate,
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
});
}
#[test]
fn bad_merkle_path_leads_to_recovery_error() {
let mut test_state = TestState::default();
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
// Create some faulty chunks.
test_state.chunks[0].chunk = vec![1; 32];
test_state.chunks[1].chunk = vec![2; 32];
test_state.chunks[0].chunk = vec![0; 32];
test_state.chunks[1].chunk = vec![1; 32];
test_state.chunks[2].chunk = vec![2; 32];
test_state.chunks[3].chunk = vec![3; 32];
let mut faulty = vec![false; test_state.chunks.len()];
faulty[0] = true;
faulty[1] = true;
faulty[2] = true;
faulty[3] = true;
test_state.test_faulty_chunk_requests(
candidate_hash,
@@ -507,15 +705,15 @@ fn a_faulty_chunk_leads_to_recovery_error() {
).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
});
}
#[test]
fn a_wrong_chunk_leads_to_recovery_error() {
fn wrong_chunk_index_leads_to_recovery_error() {
let mut test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
@@ -533,23 +731,25 @@ fn a_wrong_chunk_leads_to_recovery_error() {
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_validators(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
// Send a wrong chunk so it passes proof check but fails to reconstruct.
// These chunks should fail the index check as they don't have the correct index for validator.
test_state.chunks[1] = test_state.chunks[0].clone();
test_state.chunks[2] = test_state.chunks[0].clone();
test_state.chunks[3] = test_state.chunks[0].clone();
test_state.chunks[4] = test_state.chunks[0].clone();
let faulty = vec![false; test_state.chunks.len()];
let mut faulty = vec![true; test_state.chunks.len()];
faulty[0] = false;
test_state.test_faulty_chunk_requests(
candidate_hash,
@@ -557,7 +757,206 @@ fn a_wrong_chunk_leads_to_recovery_error() {
&faulty,
).await;
// A request times out with `Unavailable` error.
// A request times out with `Unavailable` error as there are no good peers.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
});
}
#[test]
fn invalid_erasure_coding_leads_to_invalid_error() {
let mut test_state = TestState::default();
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pov = PoV {
block_data: BlockData(vec![69; 64]),
};
let (bad_chunks, bad_erasure_root) = derive_erasure_chunks_with_proofs_and_root(
test_state.chunks.len(),
&AvailableData {
validation_data: test_state.persisted_validation_data.clone(),
pov: Arc::new(pov),
},
|i, chunk| *chunk = vec![i as u8; 32],
);
test_state.chunks = bad_chunks;
test_state.candidate.descriptor.erasure_root = bad_erasure_root;
let candidate_hash = test_state.candidate.hash();
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
).await;
// A request times out with `Unavailable` error as there are no good peers.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
});
}
#[test]
fn fast_path_backing_group_recovers() {
let test_state = TestState::default();
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
Some(GroupIndex(0)),
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::No).collect();
who_has[3] = HasAvailableData::Yes;
test_state.test_full_data_requests(
candidate_hash,
&mut virtual_overseer,
&who_has,
).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
});
}
#[test]
fn wrong_data_from_fast_path_peer_leads_to_punishment() {
let test_state = TestState::default();
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, _rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
Some(GroupIndex(0)),
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
let mut a = test_state.available_data.clone();
a.pov = Arc::new(PoV { block_data: BlockData(vec![69; 420]) });
let who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Other(a.clone())).collect();
// This function implicitly punishes.
test_state.test_full_data_requests(
candidate_hash,
&mut virtual_overseer,
&who_has,
).await;
});
}
#[test]
fn no_answers_in_fast_path_causes_chunk_requests() {
let test_state = TestState::default();
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
Some(GroupIndex(0)),
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
// mix of timeout and no.
let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Timeout).collect();
who_has[0] = HasAvailableData::No;
who_has[3] = HasAvailableData::No;
test_state.test_full_data_requests(
candidate_hash,
&mut virtual_overseer,
&who_has,
).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
});
}