Port availability recovery to use req/res (#2694)

* add AvailableDataFetchingRequest

* rename AvailabilityFetchingRequest to ChunkFetchingRequest

* rename AvailabilityFetchingResponse to Chunk_

* add AvailableDataFetching request

* add available data fetching request to availability recovery message

* remove availability recovery message

* fix

* update network bridge

* port availability recovery to request/response

* use validators.len(), not shuffling

* fix availability recovery tests

* update guide

* Update node/network/availability-recovery/src/lib.rs

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Update node/network/availability-recovery/src/lib.rs

Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com>

* remove println

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com>
This commit is contained in:
Robert Habermeier
2021-03-25 15:34:24 +01:00
committed by GitHub
parent 349879df6b
commit 8a396c678f
19 changed files with 379 additions and 1067 deletions
@@ -25,9 +25,6 @@ pub enum Error {
#[error(transparent)]
Subsystem(#[from] polkadot_subsystem::SubsystemError),
#[error("failed to query a chunk from store")]
CanceledQueryChunk(#[source] oneshot::Canceled),
#[error("failed to query full data from store")]
CanceledQueryFullData(#[source] oneshot::Canceled),
@@ -19,14 +19,11 @@
#![warn(missing_docs)]
use std::collections::HashMap;
use std::time::Duration;
use std::pin::Pin;
use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use futures::future::BoxFuture;
use lru::LruCache;
use rand::seq::SliceRandom;
use streamunordered::{StreamUnordered, StreamYield};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
@@ -40,16 +37,16 @@ use polkadot_subsystem::{
jaeger,
messages::{
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
NetworkBridgeEvent,
},
};
use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, PeerId, RequestId, UnifiedReputationChange as Rep,
};
use polkadot_node_subsystem_util::{
Timeout, TimeoutExt,
request_session_info_ctx,
IfDisconnected,
request_response::{
self as req_res, OutgoingRequest, Recipient, Requests,
request::RequestError,
},
};
use polkadot_node_subsystem_util::request_session_info_ctx;
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;
@@ -58,78 +55,17 @@ mod tests;
const LOG_TARGET: &str = "parachain::availability-recovery";
const COST_MERKLE_PROOF_INVALID: Rep = Rep::CostMinor("Merkle proof was invalid");
const COST_UNEXPECTED_CHUNK: Rep = Rep::CostMinor("Peer has sent an unexpected chunk");
const COST_INVALID_AVAILABLE_DATA: Rep = Rep::CostMinor("Peer provided invalid available data");
// How many parallel requests interaction should have going at once.
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
const LRU_SIZE: usize = 16;
// A timeout for a chunk request.
#[cfg(not(test))]
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
#[cfg(test)]
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
// A timeout for a full data request.
#[cfg(not(test))]
const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
// A period to poll and clean awaited data.
const AWAITED_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
fast_path: bool,
}
type DataResponse<T> = (PeerId, ValidatorIndex, T);
/// Awaited data from the network.
enum Awaited {
Chunk(AwaitedData<ErasureChunk>),
FullData(AwaitedData<AvailableData>),
}
impl Awaited {
fn is_canceled(&self) -> bool {
match *self {
Awaited::Chunk(ref c) => c.response.is_canceled(),
Awaited::FullData(ref fd) => fd.response.is_canceled(),
}
}
/// Token to cancel the connection request to the validator.
fn token(&self) -> usize {
match *self {
Awaited::Chunk(ref c) => c.token,
Awaited::FullData(ref fd) => fd.token,
}
}
}
/// Data we keep around for network data that we are awaiting.
struct AwaitedData<T> {
/// Index of the validator we have requested this chunk from.
validator_index: ValidatorIndex,
/// The hash of the candidate the chunks belongs to.
candidate_hash: CandidateHash,
/// Token to cancel the connection request to the validator.
token: usize,
/// Result sender.
response: oneshot::Sender<DataResponse<T>>,
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
@@ -141,27 +77,8 @@ enum FromInteraction {
/// An interaction concluded.
Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
/// Make a request of a particular chunk from a particular validator.
MakeChunkRequest(
AuthorityDiscoveryId,
CandidateHash,
ValidatorIndex,
oneshot::Sender<DataResponse<ErasureChunk>>,
),
/// Make a request of the full data from a particular validator.
MakeFullDataRequest(
AuthorityDiscoveryId,
CandidateHash,
ValidatorIndex,
oneshot::Sender<DataResponse<AvailableData>>,
),
/// Report a peer.
ReportPeer(
PeerId,
Rep,
),
/// Send a request on the network service.
NetworkRequest(Requests),
}
struct RequestFromBackersPhase {
@@ -175,7 +92,10 @@ struct RequestChunksPhase {
// request the chunk from them.
shuffling: Vec<ValidatorIndex>,
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<DataResponse<ErasureChunk>>>>,
requesting_chunks: FuturesUnordered<BoxFuture<
'static,
Result<Option<ErasureChunk>, RequestError>>,
>,
}
struct InteractionParams {
@@ -241,18 +161,16 @@ impl RequestFromBackersPhase {
Some(i) => i,
};
let (tx, rx) = oneshot::channel();
// Request data.
to_state.send(FromInteraction::MakeFullDataRequest(
params.validator_authority_keys[validator_index.0 as usize].clone(),
params.candidate_hash.clone(),
validator_index,
tx,
)).await?;
let (req, res) = OutgoingRequest::new(
Recipient::Authority(params.validator_authority_keys[validator_index.0 as usize].clone()),
req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
);
match rx.timeout(FULL_DATA_REQUEST_TIMEOUT).await {
Some(Ok((peer_id, _validator_index, data))) => {
to_state.send(FromInteraction::NetworkRequest(Requests::AvailableDataFetching(req))).await?;
match res.await {
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
to_state.send(
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
@@ -268,30 +186,21 @@ impl RequestFromBackersPhase {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator = ?peer_id,
?validator_index,
"Invalid data response",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_INVALID_AVAILABLE_DATA,
)).await?;
// it doesn't help to report the peer with req/res.
}
}
Some(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
"A response channel was cancelled while waiting for full data",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
"A full data request has timed out",
);
}
Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {}
Err(e) => tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
err = ?e,
"Error fetching full available data."
),
}
}
}
@@ -314,24 +223,39 @@ impl RequestChunksPhase {
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>,
) -> Result<(), mpsc::SendError> {
while self.requesting_chunks.len() < N_PARALLEL {
let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
while self.requesting_chunks.len() < max_requests {
if let Some(validator_index) = self.shuffling.pop() {
let (tx, rx) = oneshot::channel();
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
tracing::trace!(
target: LOG_TARGET,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
?validator,
?validator_index,
candidate_hash = ?params.candidate_hash,
"Requesting chunk",
);
to_state.send(FromInteraction::MakeChunkRequest(
params.validator_authority_keys[validator_index.0 as usize].clone(),
params.candidate_hash.clone(),
validator_index,
tx,
)).await?;
self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT));
// Request data.
let raw_request = req_res::v1::ChunkFetchingRequest {
candidate_hash: params.candidate_hash,
index: validator_index,
};
let (req, res) = OutgoingRequest::new(
Recipient::Authority(validator),
raw_request.clone(),
);
to_state.send(FromInteraction::NetworkRequest(Requests::ChunkFetching(req))).await?;
self.requesting_chunks.push(Box::pin(async move {
match res.await {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk))
=> Ok(Some(chunk.recombine_into_chunk(&raw_request))),
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
Err(e) => Err(e),
}
}));
} else {
break;
}
@@ -343,7 +267,6 @@ impl RequestChunksPhase {
async fn wait_for_chunks(
&mut self,
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>,
) -> Result<(), mpsc::SendError> {
// Check if the requesting chunks is not empty not to poll to completion.
if self.requesting_chunks.is_empty() {
@@ -353,28 +276,11 @@ impl RequestChunksPhase {
// Poll for new updates from requesting_chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
match request_result {
Some(Ok((peer_id, validator_index, chunk))) => {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks, and any failures should
// lead to issuance of a FromInteraction::ReportPeer message.
// We need to check that the validator index matches the chunk index and
// not blindly trust the data from an untrusted peer.
if validator_index != chunk.index {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
chunk_index = ?chunk.index,
"Index mismatch",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
continue;
}
let validator_index = chunk.index;
if let Ok(anticipated_hash) = branch_hash(
&params.erasure_root,
@@ -386,47 +292,31 @@ impl RequestChunksPhase {
if erasure_chunk_hash != anticipated_hash {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Merkle proof mismatch",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
} else {
tracing::trace!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Received valid Merkle proof",
"Received valid chunk.",
);
self.received_chunks.insert(validator_index, chunk);
}
} else {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Invalid Merkle proof",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
}
}
Some(Err(e)) => {
Ok(None) => {}
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A response channel was cancelled while waiting for a chunk",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
"A chunk request has timed out",
"Failure requesting chunk",
);
}
}
@@ -453,7 +343,7 @@ impl RequestChunksPhase {
erasure_root = ?params.erasure_root,
received = %self.received_chunks.len(),
requesting = %self.requesting_chunks.len(),
n_validators = %self.shuffling.len(),
n_validators = %params.validators.len(),
"Data recovery is not possible",
);
to_state.send(FromInteraction::Concluded(
@@ -465,7 +355,7 @@ impl RequestChunksPhase {
}
self.launch_parallel_requests(params, to_state).await?;
self.wait_for_chunks(params, to_state).await?;
self.wait_for_chunks(params).await?;
// If received_chunks has more than threshold entries, attempt to recover the data.
// If that fails, or a re-encoding of it doesn't match the expected erasure root,
@@ -523,10 +413,10 @@ impl RequestChunksPhase {
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
n_validators: usize,
unrequested_validators: usize,
threshold: usize,
) -> bool {
received_chunks + requesting_chunks + n_validators < threshold
received_chunks + requesting_chunks + unrequested_validators < threshold
}
fn reconstructed_data_matches_root(
@@ -585,19 +475,6 @@ struct State {
/// A recent block hash for which state should be available.
live_block_hash: Hash,
/// We are waiting for these validators to connect and as soon as they
/// do, request the needed data we are waiting for.
discovering_validators: HashMap<AuthorityDiscoveryId, Vec<Awaited>>,
/// Requests that we have issued to the already connected validators
/// about the data we are interested in.
live_requests: HashMap<RequestId, (PeerId, Awaited)>,
/// Derive request ids from this.
next_request_id: RequestId,
connecting_validators: StreamUnordered<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
/// interaction communication. This is cloned and given to interactions that are spun up.
from_interaction_tx: mpsc::Sender<FromInteraction>,
@@ -613,14 +490,10 @@ impl Default for State {
let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16);
Self {
from_interaction_tx,
from_interaction_rx,
interactions: HashMap::new(),
live_block_hash: Hash::default(),
discovering_validators: HashMap::new(),
live_requests: HashMap::new(),
next_request_id: 0,
connecting_validators: StreamUnordered::new(),
from_interaction_tx,
from_interaction_rx,
availability_lru: LruCache::new(LRU_SIZE),
}
}
@@ -659,15 +532,6 @@ async fn handle_signal(
}
}
/// Report a reputation change for a peer.
async fn report_peer(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
peer: PeerId,
rep: Rep,
) {
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await;
}
/// Machinery around launching interactions into the background.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn launch_interaction(
@@ -797,21 +661,6 @@ async fn handle_recover(
}
}
/// Queries a chunk from av-store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> error::Result<Option<ErasureChunk>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
)).await;
Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?)
}
/// Queries a chunk from av-store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_full_data(
@@ -857,336 +706,17 @@ async fn handle_from_interaction(
state.availability_lru.put(candidate_hash, result);
}
FromInteraction::MakeChunkRequest(id, candidate_hash, validator_index, response) => {
let (tx, rx) = mpsc::channel(2);
let message = NetworkBridgeMessage::ConnectToValidators {
validator_ids: vec![id.clone()],
peer_set: PeerSet::Validation,
connected: tx,
};
ctx.send_message(AllMessages::NetworkBridge(message)).await;
let token = state.connecting_validators.push(rx);
state.discovering_validators.entry(id).or_default().push(Awaited::Chunk(AwaitedData {
validator_index,
candidate_hash,
token,
response,
}));
}
FromInteraction::MakeFullDataRequest(id, candidate_hash, validator_index, response) => {
let (tx, rx) = mpsc::channel(2);
let message = NetworkBridgeMessage::ConnectToValidators {
validator_ids: vec![id.clone()],
peer_set: PeerSet::Validation,
connected: tx,
};
ctx.send_message(AllMessages::NetworkBridge(message)).await;
let token = state.connecting_validators.push(rx);
state.discovering_validators.entry(id).or_default().push(Awaited::FullData(AwaitedData {
validator_index,
candidate_hash,
token,
response,
}));
}
FromInteraction::ReportPeer(peer_id, rep) => {
report_peer(ctx, peer_id, rep).await;
FromInteraction::NetworkRequest(request) => {
ctx.send_message(NetworkBridgeMessage::SendRequests(
vec![request],
IfDisconnected::TryConnect,
).into()).await;
}
}
Ok(())
}
/// Handles a network bridge update.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
update: NetworkBridgeEvent<protocol_v1::AvailabilityRecoveryMessage>,
) -> error::Result<()> {
match update {
NetworkBridgeEvent::PeerMessage(peer, message) => {
match message {
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash,
validator_index,
) => {
// Issue a
// AvailabilityStore::QueryChunk(candidate-hash, validator_index, response)
// message.
let chunk = query_chunk(ctx, candidate_hash, validator_index).await?;
tracing::trace!(
target: LOG_TARGET,
data_set = %chunk.is_some(),
%request_id,
?candidate_hash,
validator_index = validator_index.0,
"Responding to chunk request",
);
// Whatever the result, issue an
// AvailabilityRecoveryV1Message::Chunk(r_id, response) message.
let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
chunk,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer],
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
),
)).await;
}
protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
match state.live_requests.remove(&request_id) {
None => {
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected chunk response",
);
// If there doesn't exist one, report the peer and return.
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => {
tracing::trace!(
target: LOG_TARGET,
data_set = %chunk.is_some(),
%request_id,
candidate_hash = ?awaited_chunk.candidate_hash,
validator_index = awaited_chunk.validator_index.0,
"Received chunk response",
);
// If there exists an entry under r_id, remove it.
// Send the chunk response on the awaited_chunk for the interaction to handle.
if let Some(chunk) = chunk {
if awaited_chunk.response.send(
(peer_id, awaited_chunk.validator_index, chunk)
).is_err() {
tracing::debug!(
target: LOG_TARGET,
"A sending side of the recovery request is closed",
);
}
}
}
Some(a) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected chunk response",
);
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
state.live_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
}
}
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
candidate_hash,
) => {
// Issue a
// AvailabilityStore::QueryAvailableData(candidate-hash, response)
// message.
let full_data = query_full_data(ctx, candidate_hash).await?;
tracing::trace!(
target: LOG_TARGET,
data_set = full_data.is_some(),
%request_id,
?candidate_hash,
"Responding to full data request",
);
// Whatever the result, issue an
// AvailabilityRecoveryV1Message::FullData(r_id, response) message.
let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData(
request_id,
full_data,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer],
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
),
)).await;
}
protocol_v1::AvailabilityRecoveryMessage::FullData(request_id, data) => {
match state.live_requests.remove(&request_id) {
None => {
// If there doesn't exist one, report the peer and return.
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected full data response",
);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
tracing::trace!(
target: LOG_TARGET,
%request_id,
candidate_hash = ?awaited.candidate_hash,
data_set = %data.is_some(),
"Received full data response",
);
// If there exists an entry under r_id, remove it.
// Send the response on the awaited for the interaction to handle.
if let Some(data) = data {
if awaited.response.send((peer_id, awaited.validator_index, data)).is_err() {
tracing::debug!(
target: LOG_TARGET,
"A sending side of the recovery request is closed",
);
}
}
}
Some(a) => {
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected full data response",
);
state.live_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
}
}
}
}
// We do not really need to track the peers' views in this subsystem
// since the peers are _required_ to have the data we are interested in.
NetworkBridgeEvent::PeerViewChange(_, _) => {}
NetworkBridgeEvent::OurViewChange(_) => {}
// All peer connections are handled via validator discovery API.
NetworkBridgeEvent::PeerConnected(_, _) => {}
NetworkBridgeEvent::PeerDisconnected(_) => {}
}
Ok(())
}
/// Issues a request to the validator we've been waiting for to connect to us.
async fn issue_request(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
peer_id: PeerId,
awaited: Awaited,
) -> error::Result<()> {
let request_id = state.next_request_id;
state.next_request_id += 1;
let wire_message = match awaited {
Awaited::Chunk(ref awaited_chunk) => {
tracing::trace!(
target: LOG_TARGET,
%request_id,
%peer_id,
candidate_hash = ?awaited_chunk.candidate_hash,
validator_index = %awaited_chunk.validator_index.0,
"Requesting chunk",
);
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
)
}
Awaited::FullData(ref awaited_data) => {
tracing::trace!(
target: LOG_TARGET,
%request_id,
%peer_id,
candidate_hash = ?awaited_data.candidate_hash,
validator_index = %awaited_data.validator_index.0,
"Requesting full data",
);
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
awaited_data.candidate_hash,
)
}
};
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
),
)).await;
state.live_requests.insert(request_id, (peer_id, awaited));
Ok(())
}
/// Handles a newly connected validator in the context of some relay leaf.
async fn handle_validator_connected(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
authority_id: AuthorityDiscoveryId,
peer_id: PeerId,
) -> error::Result<()> {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?authority_id,
"Validator connected",
);
if let Some(discovering) = state.discovering_validators.remove(&authority_id) {
for awaited in discovering {
issue_request(state, ctx, peer_id.clone(), awaited).await?;
}
}
Ok(())
}
/// Awaited info that `State` holds has to be cleaned up
/// periodically since there is no way `Interaction` can communicate
/// a timedout request.
fn cleanup_awaited(state: &mut State) {
let mut removed_tokens = Vec::new();
for (_, v) in state.discovering_validators.iter_mut() {
v.retain(|e| if e.is_canceled() {
removed_tokens.push(e.token());
false
} else {
true
});
}
for token in removed_tokens {
Pin::new(&mut state.connecting_validators).remove(token);
}
state.discovering_validators.retain(|_, v| !v.is_empty());
state.live_requests.retain(|_, v| !v.1.is_canceled());
}
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers.
pub fn with_fast_path() -> Self {
@@ -1204,40 +734,8 @@ impl AvailabilityRecoverySubsystem {
) -> SubsystemResult<()> {
let mut state = State::default();
let awaited_cleanup_interval = futures::stream::repeat(()).then(|_| async move {
Delay::new(AWAITED_CLEANUP_INTERVAL).await;
});
futures::pin_mut!(awaited_cleanup_interval);
loop {
futures::select_biased! {
_v = awaited_cleanup_interval.next() => {
cleanup_awaited(&mut state);
}
v = state.connecting_validators.next() => {
if let Some((v, token)) = v {
match v {
StreamYield::Item(v) => {
if let Err(e) = handle_validator_connected(
&mut state,
&mut ctx,
v.0,
v.1,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to handle a newly connected validator",
);
}
}
StreamYield::Finished(_) => {
Pin::new(&mut state.connecting_validators).remove(token);
}
}
}
}
futures::select! {
v = ctx.recv().fuse() => {
match v? {
FromOverseer::Signal(signal) => if handle_signal(
@@ -1269,17 +767,20 @@ impl AvailabilityRecoverySubsystem {
);
}
}
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(event) => {
if let Err(e) = handle_network_update(
&mut state,
&mut ctx,
event,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error handling a network bridge update",
);
AvailabilityRecoveryMessage::AvailableDataFetchingRequest(req) => {
match query_full_data(&mut ctx, req.payload.candidate_hash).await {
Ok(res) => {
let _ = req.send_response(res.into());
}
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to query available data.",
);
let _ = req.send_response(None.into());
}
}
}
}
@@ -22,6 +22,8 @@ use futures_timer::Delay;
use assert_matches::assert_matches;
use smallvec::smallvec;
use parity_scale_codec::Encode;
use super::*;
use polkadot_primitives::v1::{
@@ -30,7 +32,7 @@ use polkadot_primitives::v1::{
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, jaeger};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
@@ -139,11 +141,10 @@ async fn overseer_recv(
use sp_keyring::Sr25519Keyring;
#[derive(Debug, Clone)]
enum HasAvailableData {
enum Has {
No,
Yes,
Timeout,
Other(AvailableData),
}
#[derive(Clone)]
@@ -151,12 +152,10 @@ struct TestState {
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validator_authority_id: Vec<AuthorityDiscoveryId>,
validator_peer_id: Vec<PeerId>,
current: Hash,
candidate: CandidateReceipt,
session_index: SessionIndex,
persisted_validation_data: PersistedValidationData,
available_data: AvailableData,
@@ -164,6 +163,26 @@ struct TestState {
}
impl TestState {
fn threshold(&self) -> usize {
recovery_threshold(self.validators.len()).unwrap()
}
fn impossibility_threshold(&self) -> usize {
self.validators.len() - self.threshold() + 1
}
fn all_have(&self) -> Vec<Has> {
(0..self.validators.len()).map(|_| Has::Yes).collect()
}
fn all_dont_have(&self) -> Vec<Has> {
(0..self.validators.len()).map(|_| Has::Yes).collect()
}
fn all_timeout(&self) -> Vec<Has> {
(0..self.validators.len()).map(|_| Has::Timeout).collect()
}
async fn test_runtime_api(
&self,
virtual_overseer: &mut VirtualOverseer,
@@ -191,238 +210,104 @@ impl TestState {
);
}
async fn test_connect_to_all_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
) {
self.test_connect_to_validators(virtual_overseer, self.validator_public.len()).await;
}
async fn test_connect_to_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
n: usize,
) {
// Channels by AuthorityDiscoveryId to send results to.
// Gather them here and send in batch after the loop not to race.
let mut results = HashMap::new();
for _ in 0..n {
// Connect to shuffled validators one by one.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
..
}
) => {
for validator_id in validator_ids {
let idx = self.validator_authority_id
.iter()
.position(|x| *x == validator_id)
.unwrap();
results.insert(
(
self.validator_authority_id[idx].clone(),
self.validator_peer_id[idx].clone(),
),
connected.clone(),
);
}
}
);
}
for (k, mut v) in results.into_iter() {
v.send(k).await.unwrap();
}
}
async fn test_chunk_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
n: usize,
who_has: &[Has],
) {
for _ in 0..self.validator_public.len() {
// arbitrary order.
for _ in 0..n {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
_peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
NetworkBridgeMessage::SendRequests(
mut requests,
IfDisconnected::TryConnect,
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash_recvd,
validator_index,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
(request_id, validator_index)
}
);
assert_eq!(requests.len(), 1);
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index.0 as usize].clone(),
protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
Some(self.chunks[validator_index.0 as usize].clone()),
assert_matches!(
requests.pop().unwrap(),
Requests::ChunkFetching(req) => {
assert_eq!(req.payload.candidate_hash, candidate_hash);
let validator_index = req.payload.index.0 as usize;
let available_data = match who_has[validator_index] {
Has::No => Ok(None),
Has::Yes => Ok(Some(self.chunks[validator_index].clone().into())),
Has::Timeout => {
Err(sc_network::RequestFailure::Network(
sc_network::OutboundFailure::Timeout
))
}
};
let _ = req.pending_response.send(
available_data.map(|r|
req_res::v1::ChunkFetchingResponse::from(r).encode()
)
)
)
).await;
}
);
}
}
async fn test_faulty_chunk_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
faulty: &[bool],
) {
for _ in 0..self.validator_public.len() {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
_peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
);
}
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash_recvd,
validator_index,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
(request_id, validator_index)
}
);
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index.0 as usize].clone(),
protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
Some(self.chunks[validator_index.0 as usize].clone()),
)
)
)
).await;
}
);
}
for i in 0..self.validator_public.len() {
if faulty[i] {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(
peer,
rep,
)
) => {
assert_eq!(rep, COST_MERKLE_PROOF_INVALID);
// These may arrive in any order since the interaction implementation
// uses `FuturesUnordered`.
assert!(self.validator_peer_id.iter().find(|p| **p == peer).is_some());
}
);
}
}
}
async fn test_full_data_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
who_has: &[HasAvailableData],
who_has: &[Has],
) {
for _ in 0..self.validator_public.len() {
self.test_connect_to_validators(virtual_overseer, 1).await;
for _ in 0..self.validators.len() {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
NetworkBridgeMessage::SendRequests(
mut requests,
IfDisconnected::TryConnect,
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
candidate_hash_recvd,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
assert_eq!(peers.len(), 1);
assert_eq!(requests.len(), 1);
let validator_index = self.validator_peer_id.iter().position(|p| p == &peers[0]).unwrap();
(request_id, validator_index)
}
);
assert_matches!(
requests.pop().unwrap(),
Requests::AvailableDataFetching(req) => {
assert_eq!(req.payload.candidate_hash, candidate_hash);
let validator_index = self.validator_authority_id
.iter()
.position(|a| Recipient::Authority(a.clone()) == req.peer)
.unwrap();
let available_data = match who_has[validator_index] {
HasAvailableData::No => Some(None),
HasAvailableData::Yes => Some(Some(self.available_data.clone())),
HasAvailableData::Timeout => None,
HasAvailableData::Other(ref other) => Some(Some(other.clone())),
};
if let Some(maybe_data) = available_data {
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index].clone(),
protocol_v1::AvailabilityRecoveryMessage::FullData(
request_id,
maybe_data,
)
)
)
).await;
}
match who_has[validator_index] {
HasAvailableData::Yes => break, // done
HasAvailableData::No => {}
HasAvailableData::Timeout => { Delay::new(FULL_DATA_REQUEST_TIMEOUT).await }
HasAvailableData::Other(_) => {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(
p,
rep,
)
) => {
assert_eq!(p, self.validator_peer_id[validator_index]);
assert_eq!(rep, COST_INVALID_AVAILABLE_DATA);
let available_data = match who_has[validator_index] {
Has::No => Ok(None),
Has::Yes => Ok(Some(self.available_data.clone())),
Has::Timeout => {
Err(sc_network::RequestFailure::Network(
sc_network::OutboundFailure::Timeout
))
}
};
let _ = req.pending_response.send(
available_data.map(|r|
req_res::v1::AvailableDataFetchingResponse::from(r).encode()
)
);
match who_has[validator_index].clone() {
Has::Yes => break, // done
Has::No => {}
Has::Timeout => {}
}
}
}
)
}
);
}
@@ -477,9 +362,6 @@ impl Default for TestState {
let validator_public = validator_pubkeys(&validators);
let validator_authority_id = validator_authority_id(&validators);
let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
.take(validator_public.len())
.collect();
let current = Hash::repeat_byte(1);
@@ -516,7 +398,6 @@ impl Default for TestState {
validators,
validator_public,
validator_authority_id,
validator_peer_id,
current,
candidate,
session_index,
@@ -556,11 +437,14 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.threshold(),
&test_state.all_have(),
).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
@@ -575,7 +459,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
new_candidate,
new_candidate.clone(),
test_state.session_index,
None,
tx,
@@ -584,7 +468,12 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
test_state.test_chunk_requests(
new_candidate.hash(),
&mut virtual_overseer,
test_state.impossibility_threshold(),
&test_state.all_dont_have(),
).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
@@ -619,11 +508,15 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.threshold(),
&test_state.all_have(),
).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
@@ -638,7 +531,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
new_candidate,
new_candidate.clone(),
test_state.session_index,
None,
tx,
@@ -647,7 +540,12 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
test_state.test_chunk_requests(
new_candidate.hash(),
&mut virtual_overseer,
test_state.impossibility_threshold(),
&test_state.all_dont_have(),
).await;
// A request times out with `Unavailable` error.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
@@ -682,7 +580,6 @@ fn bad_merkle_path_leads_to_recovery_error() {
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
@@ -691,17 +588,13 @@ fn bad_merkle_path_leads_to_recovery_error() {
test_state.chunks[1].chunk = vec![1; 32];
test_state.chunks[2].chunk = vec![2; 32];
test_state.chunks[3].chunk = vec![3; 32];
test_state.chunks[4].chunk = vec![4; 32];
let mut faulty = vec![false; test_state.chunks.len()];
faulty[0] = true;
faulty[1] = true;
faulty[2] = true;
faulty[3] = true;
test_state.test_faulty_chunk_requests(
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
&faulty,
test_state.impossibility_threshold(),
&test_state.all_have(),
).await;
// A request times out with `Unavailable` error.
@@ -738,8 +631,6 @@ fn wrong_chunk_index_leads_to_recovery_error() {
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
// These chunks should fail the index check as they don't have the correct index for validator.
@@ -748,13 +639,14 @@ fn wrong_chunk_index_leads_to_recovery_error() {
test_state.chunks[3] = test_state.chunks[0].clone();
test_state.chunks[4] = test_state.chunks[0].clone();
let mut faulty = vec![true; test_state.chunks.len()];
faulty[0] = false;
let mut have = test_state.all_dont_have();
have[0] = Has::No;
test_state.test_faulty_chunk_requests(
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
&faulty,
test_state.impossibility_threshold(),
&have,
).await;
// A request times out with `Unavailable` error as there are no good peers.
@@ -809,14 +701,14 @@ fn invalid_erasure_coding_leads_to_invalid_error() {
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.threshold(),
&test_state.all_have(),
).await;
// A request times out with `Unavailable` error as there are no good peers.
// f+1 'valid' chunks can't produce correct data.
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
});
}
@@ -852,8 +744,8 @@ fn fast_path_backing_group_recovers() {
let candidate_hash = test_state.candidate.hash();
let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::No).collect();
who_has[3] = HasAvailableData::Yes;
let mut who_has = test_state.all_dont_have();
who_has[3] = Has::Yes;
test_state.test_full_data_requests(
candidate_hash,
@@ -866,51 +758,6 @@ fn fast_path_backing_group_recovers() {
});
}
#[test]
fn wrong_data_from_fast_path_peer_leads_to_punishment() {
let test_state = TestState::default();
test_harness_fast_path(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
deactivated: smallvec![],
}),
).await;
let (tx, _rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
Some(GroupIndex(0)),
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
let candidate_hash = test_state.candidate.hash();
let mut a = test_state.available_data.clone();
a.pov = Arc::new(PoV { block_data: BlockData(vec![69; 420]) });
let who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Other(a.clone())).collect();
// This function implicitly punishes.
test_state.test_full_data_requests(
candidate_hash,
&mut virtual_overseer,
&who_has,
).await;
});
}
#[test]
fn no_answers_in_fast_path_causes_chunk_requests() {
let test_state = TestState::default();
@@ -943,9 +790,9 @@ fn no_answers_in_fast_path_causes_chunk_requests() {
let candidate_hash = test_state.candidate.hash();
// mix of timeout and no.
let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Timeout).collect();
who_has[0] = HasAvailableData::No;
who_has[3] = HasAvailableData::No;
let mut who_has = test_state.all_timeout();
who_has[0] = Has::No;
who_has[3] = Has::No;
test_state.test_full_data_requests(
candidate_hash,
@@ -953,8 +800,12 @@ fn no_answers_in_fast_path_causes_chunk_requests() {
&who_has,
).await;
test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.threshold(),
&test_state.all_have(),
).await;
// Recovered data should match the original one.
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);