diff --git a/polkadot/node/network/availability-recovery/Cargo.toml b/polkadot/node/network/availability-recovery/Cargo.toml
index 2d3c68753d..a28611dc67 100644
--- a/polkadot/node/network/availability-recovery/Cargo.toml
+++ b/polkadot/node/network/availability-recovery/Cargo.toml
@@ -18,6 +18,7 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
+sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
[dev-dependencies]
assert_matches = "1.4.0"
diff --git a/polkadot/node/network/availability-recovery/src/futures_undead.rs b/polkadot/node/network/availability-recovery/src/futures_undead.rs
new file mode 100644
index 0000000000..9715916590
--- /dev/null
+++ b/polkadot/node/network/availability-recovery/src/futures_undead.rs
@@ -0,0 +1,237 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! FuturesUndead: A `FuturesUnordered` with support for semi canceled futures. Those undead
+//! futures will still get polled, but will not count towards length. So length will only count
+//! futures, which are still considered live.
+//!
+//! Usecase: If futures take longer than we would like them too, we maybe able to request the data
+//! from somewhere else as well. We don't really want to cancel the old future, because maybe it
+//! was almost done, thus we would have wasted time with our impatience. By simply making them
+//! not count towards length, we can make sure to have enough "live" requests ongoing, while at the
+//! same time taking advantage of some maybe "late" response from the undead.
+//!
+
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+ time::Duration,
+};
+
+use futures::{future::BoxFuture, stream::FuturesUnordered, Future, Stream, StreamExt};
+use polkadot_node_subsystem_util::TimeoutExt;
+
+/// FuturesUndead - `FuturesUnordered` with semi canceled (undead) futures.
+///
+/// Limitations: Keeps track of undead futures by means of a counter, which is limited to 64
+/// bits, so after `1.8*10^19` pushed futures, this implementation will panic.
+pub struct FuturesUndead {
+ /// Actual `FuturesUnordered`.
+ inner: FuturesUnordered>,
+ /// Next sequence number to assign to the next future that gets pushed.
+ next_sequence: SequenceNumber,
+ /// Sequence number of first future considered live.
+ first_live: Option,
+ /// How many undead are there right now.
+ undead: usize,
+}
+
+/// All futures get a number, to determine which are live.
+#[derive(Eq, PartialEq, Copy, Clone, Debug, PartialOrd)]
+struct SequenceNumber(usize);
+
+struct Undead {
+ inner: BoxFuture<'static, Output>,
+ our_sequence: SequenceNumber,
+}
+
+impl FuturesUndead {
+ pub fn new() -> Self {
+ Self {
+ inner: FuturesUnordered::new(),
+ next_sequence: SequenceNumber(0),
+ first_live: None,
+ undead: 0,
+ }
+ }
+
+ pub fn push(&mut self, f: BoxFuture<'static, Output>) {
+ self.inner.push(Undead { inner: f, our_sequence: self.next_sequence });
+ self.next_sequence.inc();
+ }
+
+ /// Make all contained futures undead.
+ ///
+ /// They will no longer be counted on a call to `len`.
+ pub fn soft_cancel(&mut self) {
+ self.undead = self.inner.len();
+ self.first_live = Some(self.next_sequence);
+ }
+
+ /// Number of contained futures minus undead.
+ pub fn len(&self) -> usize {
+ self.inner.len() - self.undead
+ }
+
+ /// Total number of futures, including undead.
+ pub fn total_len(&self) -> usize {
+ self.inner.len()
+ }
+
+ /// Wait for next future to return with timeout.
+ ///
+ /// When timeout passes, return `None` and make all currently contained futures undead.
+ pub async fn next_with_timeout(&mut self, timeout: Duration) -> Option {
+ match self.next().timeout(timeout).await {
+ // Timeout:
+ None => {
+ self.soft_cancel();
+ None
+ },
+ Some(inner) => inner,
+ }
+ }
+}
+
+impl Stream for FuturesUndead {
+ type Item = Output;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ match self.inner.poll_next_unpin(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(None) => Poll::Ready(None),
+ Poll::Ready(Some((sequence, v))) => {
+ // Cleanup in case we became completely empty:
+ if self.inner.len() == 0 {
+ *self = Self::new();
+ return Poll::Ready(Some(v))
+ }
+
+ let first_live = match self.first_live {
+ None => return Poll::Ready(Some(v)),
+ Some(first_live) => first_live,
+ };
+ // An undead came back:
+ if sequence < first_live {
+ self.undead = self.undead.saturating_sub(1);
+ }
+ Poll::Ready(Some(v))
+ },
+ }
+ }
+}
+
+impl SequenceNumber {
+ pub fn inc(&mut self) {
+ self.0 = self.0.checked_add(1).expect(
+ "We don't expect an `UndeadFuture` to live long enough for 2^64 entries ever getting inserted."
+ );
+ }
+}
+
+impl Future for Undead {
+ type Output = (SequenceNumber, T);
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ match self.inner.as_mut().poll(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(v) => Poll::Ready((self.our_sequence, v)),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use futures::{executor, pending, FutureExt};
+
+ #[test]
+ fn cancel_sets_len_to_zero() {
+ let mut undead = FuturesUndead::new();
+ undead.push((async { () }).boxed());
+ assert_eq!(undead.len(), 1);
+ undead.soft_cancel();
+ assert_eq!(undead.len(), 0);
+ }
+
+ #[test]
+ fn finished_undead_does_not_change_len() {
+ executor::block_on(async {
+ let mut undead = FuturesUndead::new();
+ undead.push(async { 1_i32 }.boxed());
+ undead.push(async { 2_i32 }.boxed());
+ assert_eq!(undead.len(), 2);
+ undead.soft_cancel();
+ assert_eq!(undead.len(), 0);
+ undead.push(
+ async {
+ pending!();
+ 0_i32
+ }
+ .boxed(),
+ );
+ undead.next().await;
+ assert_eq!(undead.len(), 1);
+ undead.push(async { 9_i32 }.boxed());
+ undead.soft_cancel();
+ assert_eq!(undead.len(), 0);
+ });
+ }
+
+ #[test]
+ fn len_stays_correct_when_live_future_ends() {
+ executor::block_on(async {
+ let mut undead = FuturesUndead::new();
+ undead.push(
+ async {
+ pending!();
+ 1_i32
+ }
+ .boxed(),
+ );
+ undead.push(
+ async {
+ pending!();
+ 2_i32
+ }
+ .boxed(),
+ );
+ assert_eq!(undead.len(), 2);
+ undead.soft_cancel();
+ assert_eq!(undead.len(), 0);
+ undead.push(async { 0_i32 }.boxed());
+ undead.push(async { 1_i32 }.boxed());
+ undead.next().await;
+ assert_eq!(undead.len(), 1);
+ undead.next().await;
+ assert_eq!(undead.len(), 0);
+ undead.push(async { 9_i32 }.boxed());
+ assert_eq!(undead.len(), 1);
+ });
+ }
+
+ #[test]
+ fn cleanup_works() {
+ executor::block_on(async {
+ let mut undead = FuturesUndead::new();
+ undead.push(async { 1_i32 }.boxed());
+ undead.soft_cancel();
+ undead.push(async { 2_i32 }.boxed());
+ undead.next().await;
+ undead.next().await;
+ assert_eq!(undead.first_live, None);
+ });
+ }
+}
diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs
index 732bb373f1..1e1f6af113 100644
--- a/polkadot/node/network/availability-recovery/src/lib.rs
+++ b/polkadot/node/network/availability-recovery/src/lib.rs
@@ -26,7 +26,7 @@ use std::{
use futures::{
channel::oneshot,
- future::{BoxFuture, FutureExt, RemoteHandle},
+ future::{FutureExt, RemoteHandle},
pin_mut,
prelude::*,
stream::FuturesUnordered,
@@ -36,6 +36,8 @@ use lru::LruCache;
use rand::seq::SliceRandom;
use polkadot_erasure_coding::{branch_hash, branches, obtain_chunks_v1, recovery_threshold};
+#[cfg(not(test))]
+use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
use polkadot_node_network_protocol::{
request_response::{
self as req_res, incoming, outgoing::RequestError, v1 as request_v1,
@@ -44,7 +46,7 @@ use polkadot_node_network_protocol::{
IfDisconnected, UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
-use polkadot_node_subsystem_util::{request_session_info, TimeoutExt};
+use polkadot_node_subsystem_util::request_session_info;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
Hash, HashT, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
@@ -59,6 +61,12 @@ use polkadot_subsystem::{
};
mod error;
+mod futures_undead;
+mod metrics;
+use metrics::Metrics;
+
+use futures_undead::FuturesUndead;
+use sc_network::{OutboundFailure, RequestFailure};
#[cfg(test)]
mod tests;
@@ -73,15 +81,27 @@ const LRU_SIZE: usize = 16;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
-/// Max time we want to wait for responses, before calling `launch_parallel_requests` again to fill
-/// up slots.
-const MAX_CHUNK_WAIT: Duration = Duration::from_secs(1);
+/// Time after which we consider a request to have failed
+///
+/// and we should try more peers. Note in theory the request times out at the network level,
+/// measurements have shown, that in practice requests might actually take longer to fail in
+/// certain occasions. (The very least, authority discovery is not part of the timeout.)
+///
+/// For the time being this value is the same as the timeout on the networking layer, but as this
+/// timeout is more soft than the networking one, it might make sense to pick different values as
+/// well.
+#[cfg(not(test))]
+const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
+#[cfg(test)]
+const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(4);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
fast_path: bool,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver,
+ /// Metrics for this subsystem.
+ metrics: Metrics,
}
struct RequestFromBackersPhase {
@@ -91,13 +111,18 @@ struct RequestFromBackersPhase {
}
struct RequestChunksPhase {
- // a random shuffling of the validators which indicates the order in which we connect to the validators and
- // request the chunk from them.
+ /// How many request have been unsuccessful so far.
+ error_count: usize,
+ /// Total number of responses that have been received.
+ ///
+ /// including failed ones.
+ total_received_responses: usize,
+ /// a random shuffling of the validators which indicates the order in which we connect to the validators and
+ /// request the chunk from them.
shuffling: VecDeque,
received_chunks: HashMap,
- requesting_chunks: FuturesUnordered<
- BoxFuture<'static, Result, (ValidatorIndex, RequestError)>>,
- >,
+ /// Pending chunk requests with soft timeout.
+ requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>,
}
struct InteractionParams {
@@ -115,6 +140,9 @@ struct InteractionParams {
/// The root of the erasure encoding of the para block.
erasure_root: Hash,
+
+ /// Metrics to report
+ metrics: Metrics,
}
enum InteractionPhase {
@@ -219,16 +247,18 @@ impl RequestChunksPhase {
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksPhase {
+ error_count: 0,
+ total_received_responses: 0,
shuffling: shuffling.into(),
received_chunks: HashMap::new(),
- requesting_chunks: FuturesUnordered::new(),
+ requesting_chunks: FuturesUndead::new(),
}
}
fn is_unavailable(&self, params: &InteractionParams) -> bool {
is_unavailable(
self.received_chunks.len(),
- self.requesting_chunks.len(),
+ self.requesting_chunks.total_len(),
self.shuffling.len(),
params.threshold,
)
@@ -238,13 +268,40 @@ impl RequestChunksPhase {
self.received_chunks.len() >= params.threshold || self.is_unavailable(params)
}
+ /// Desired number of parallel requests.
+ ///
+ /// For the given threshold (total required number of chunks) get the desired number of
+ /// requests we want to have running in parallel at this time.
+ fn get_desired_request_count(&self, threshold: usize) -> usize {
+ // Upper bound for parallel requests.
+ // We want to limit this, so requests can be processed within the timeout and we limit the
+ // following feedback loop:
+ // 1. Requests fail due to timeout
+ // 2. We request more chunks to make up for it
+ // 3. Bandwidth is spread out even more, so we get even more timeouts
+ // 4. We request more chunks to make up for it ...
+ let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
+ // How many chunks are still needed?
+ let remaining_chunks = threshold.saturating_sub(self.received_chunks.len());
+ // What is the current error rate, so we can make up for it?
+ let inv_error_rate =
+ self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
+ // Actual number of requests we want to have in flight in parallel:
+ std::cmp::min(
+ max_requests_boundary,
+ remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
+ )
+ }
+
async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
sender: &mut impl SubsystemSender,
) {
- let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
- while self.requesting_chunks.len() < max_requests {
+ let num_requests = self.get_desired_request_count(params.threshold);
+ let mut requests = Vec::with_capacity(num_requests - self.requesting_chunks.len());
+
+ while self.requesting_chunks.len() < num_requests {
if let Some(validator_index) = self.shuffling.pop_back() {
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
tracing::trace!(
@@ -263,18 +320,13 @@ impl RequestChunksPhase {
let (req, res) =
OutgoingRequest::new(Recipient::Authority(validator), raw_request.clone());
+ requests.push(Requests::ChunkFetching(req));
- sender
- .send_message(
- NetworkBridgeMessage::SendRequests(
- vec![Requests::ChunkFetching(req)],
- IfDisconnected::TryConnect,
- )
- .into(),
- )
- .await;
+ params.metrics.on_chunk_request_issued();
+ let timer = params.metrics.time_chunk_request();
self.requesting_chunks.push(Box::pin(async move {
+ let _timer = timer;
match res.await {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
Ok(Some(chunk.recombine_into_chunk(&raw_request))),
@@ -286,15 +338,25 @@ impl RequestChunksPhase {
break
}
}
+
+ sender
+ .send_message(
+ NetworkBridgeMessage::SendRequests(requests, IfDisconnected::TryConnect).into(),
+ )
+ .await;
}
async fn wait_for_chunks(&mut self, params: &InteractionParams) {
+ let metrics = ¶ms.metrics;
+
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
- // We will also stop, if there has not been a response for `MAX_CHUNK_WAIT`, so
- // `launch_parallel_requests` cann fill up slots again.
+ // We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
+ // return in that case for `launch_parallel_requests` to fill up slots again.
while let Some(request_result) =
- self.requesting_chunks.next().timeout(MAX_CHUNK_WAIT).await.flatten()
+ self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
{
+ self.total_received_responses += 1;
+
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
@@ -309,6 +371,9 @@ impl RequestChunksPhase {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
+ metrics.on_chunk_request_invalid();
+ self.error_count += 1;
+
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
@@ -316,6 +381,8 @@ impl RequestChunksPhase {
"Merkle proof mismatch",
);
} else {
+ metrics.on_chunk_request_succeeded();
+
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
@@ -325,6 +392,9 @@ impl RequestChunksPhase {
self.received_chunks.insert(validator_index, chunk);
}
} else {
+ metrics.on_chunk_request_invalid();
+ self.error_count += 1;
+
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
@@ -333,8 +403,13 @@ impl RequestChunksPhase {
);
}
},
- Ok(None) => {},
+ Ok(None) => {
+ metrics.on_chunk_request_no_such_chunk();
+ self.error_count += 1;
+ },
Err((validator_index, e)) => {
+ self.error_count += 1;
+
tracing::debug!(
target: LOG_TARGET,
candidate_hash= ?params.candidate_hash,
@@ -344,8 +419,21 @@ impl RequestChunksPhase {
);
match e {
- RequestError::InvalidResponse(_) => {},
- RequestError::NetworkError(_) | RequestError::Canceled(_) => {
+ RequestError::InvalidResponse(_) => {
+ metrics.on_chunk_request_invalid();
+ },
+ RequestError::NetworkError(err) => {
+ if let RequestFailure::Network(OutboundFailure::Timeout) = err {
+ metrics.on_chunk_request_timeout();
+ } else {
+ metrics.on_chunk_request_error();
+ }
+
+ self.shuffling.push_front(validator_index);
+ },
+ RequestError::Canceled(_) => {
+ metrics.on_chunk_request_error();
+
self.shuffling.push_front(validator_index);
},
}
@@ -403,6 +491,7 @@ impl RequestChunksPhase {
erasure_root = ?params.erasure_root,
received = %self.received_chunks.len(),
requesting = %self.requesting_chunks.len(),
+ total_requesting = %self.requesting_chunks.total_len(),
n_validators = %params.validators.len(),
"Data recovery is not possible",
);
@@ -653,6 +742,7 @@ async fn launch_interaction(
receipt: CandidateReceipt,
backing_group: Option,
response_sender: oneshot::Sender>,
+ metrics: &Metrics,
) -> error::Result<()>
where
Context: SubsystemContext,
@@ -666,6 +756,7 @@ where
threshold: recovery_threshold(session_info.validators.len())?,
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
+ metrics: metrics.clone(),
};
let phase = backing_group
@@ -706,6 +797,7 @@ async fn handle_recover(
session_index: SessionIndex,
backing_group: Option,
response_sender: oneshot::Sender>,
+ metrics: &Metrics,
) -> error::Result<()>
where
Context: SubsystemContext,
@@ -741,8 +833,16 @@ where
let _span = span.child("session-info-ctx-received");
match session_info {
Some(session_info) =>
- launch_interaction(state, ctx, session_info, receipt, backing_group, response_sender)
- .await,
+ launch_interaction(
+ state,
+ ctx,
+ session_info,
+ receipt,
+ backing_group,
+ response_sender,
+ metrics,
+ )
+ .await,
None => {
tracing::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block);
response_sender
@@ -770,18 +870,21 @@ where
}
impl AvailabilityRecoverySubsystem {
- /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers.
+ /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
+ /// request data from backers.
pub fn with_fast_path(
req_receiver: IncomingRequestReceiver,
+ metrics: Metrics,
) -> Self {
- Self { fast_path: true, req_receiver }
+ Self { fast_path: true, req_receiver, metrics }
}
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver,
+ metrics: Metrics,
) -> Self {
- Self { fast_path: false, req_receiver }
+ Self { fast_path: false, req_receiver, metrics }
}
async fn run(self, mut ctx: Context) -> SubsystemResult<()>
@@ -790,7 +893,7 @@ impl AvailabilityRecoverySubsystem {
Context: overseer::SubsystemContext,
{
let mut state = State::default();
- let Self { fast_path, mut req_receiver } = self;
+ let Self { fast_path, mut req_receiver, metrics } = self;
loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
@@ -819,6 +922,7 @@ impl AvailabilityRecoverySubsystem {
session_index,
maybe_backing_group.filter(|_| fast_path),
response_sender,
+ &metrics,
).await {
tracing::warn!(
target: LOG_TARGET,
diff --git a/polkadot/node/network/availability-recovery/src/metrics.rs b/polkadot/node/network/availability-recovery/src/metrics.rs
new file mode 100644
index 0000000000..2461d6cff7
--- /dev/null
+++ b/polkadot/node/network/availability-recovery/src/metrics.rs
@@ -0,0 +1,132 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use polkadot_node_subsystem_util::{
+ metrics,
+ metrics::{
+ prometheus,
+ prometheus::{Counter, CounterVec, Opts, PrometheusError, Registry, U64},
+ },
+};
+
+/// Availability Distribution metrics.
+#[derive(Clone, Default)]
+pub struct Metrics(Option);
+
+#[derive(Clone)]
+struct MetricsInner {
+ /// Number of sent chunk requests.
+ ///
+ /// Gets incremented on each sent chunk requests.
+ chunk_requests_issued: Counter,
+
+ /// A counter for finished chunk requests.
+ ///
+ /// Split by result:
+ /// - `no_such_chunk` ... peer did not have the requested chunk
+ /// - `timeout` ... request timed out.
+ /// - `network_error` ... Some networking issue except timeout
+ /// - `invalid` ... Chunk was received, but not valid.
+ /// - `success`
+ chunk_requests_finished: CounterVec,
+ /// The duration of request to response.
+ time_chunk_request: prometheus::Histogram,
+}
+
+impl Metrics {
+ /// Create new dummy metrics, not reporting anything.
+ pub fn new_dummy() -> Self {
+ Metrics(None)
+ }
+
+ /// Increment counter on fetched labels.
+ pub fn on_chunk_request_issued(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.chunk_requests_issued.inc()
+ }
+ }
+
+ /// A chunk request timed out.
+ pub fn on_chunk_request_timeout(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.chunk_requests_finished.with_label_values(&["timeout"]).inc()
+ }
+ }
+
+ /// A chunk request failed because validator did not have its chunk.
+ pub fn on_chunk_request_no_such_chunk(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.chunk_requests_finished.with_label_values(&["no_such_chunk"]).inc()
+ }
+ }
+
+ /// A chunk request failed for some non timeout related network error.
+ pub fn on_chunk_request_error(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.chunk_requests_finished.with_label_values(&["error"]).inc()
+ }
+ }
+
+ /// A chunk request succeeded, but was not valid.
+ pub fn on_chunk_request_invalid(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.chunk_requests_finished.with_label_values(&["invalid"]).inc()
+ }
+ }
+
+ /// A chunk request succeeded.
+ pub fn on_chunk_request_succeeded(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.chunk_requests_finished.with_label_values(&["success"]).inc()
+ }
+ }
+ /// Get a timer to time request/response duration.
+ pub fn time_chunk_request(&self) -> Option {
+ self.0.as_ref().map(|metrics| metrics.time_chunk_request.start_timer())
+ }
+}
+
+impl metrics::Metrics for Metrics {
+ fn try_register(registry: &Registry) -> Result {
+ let metrics = MetricsInner {
+ chunk_requests_issued: prometheus::register(
+ Counter::new(
+ "parachain_availability_recovery_chunk_requests_issued",
+ "Total number of issued chunk requests.",
+ )?,
+ registry,
+ )?,
+ chunk_requests_finished: prometheus::register(
+ CounterVec::new(
+ Opts::new(
+ "parachain_availability_recovery_chunk_requests_finished",
+ "Total number of chunk requests finished.",
+ ),
+ &["result"],
+ )?,
+ registry,
+ )?,
+ time_chunk_request: prometheus::register(
+ prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+ "parachain_availability_recovery_time_chunk_request",
+ "Time spent waiting for a response to a chunk request",
+ ))?,
+ registry,
+ )?,
+ };
+ Ok(Metrics(Some(metrics)))
+ }
+}
diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs
index ed9b5b7eba..e31a4f4360 100644
--- a/polkadot/node/network/availability-recovery/src/tests.rs
+++ b/polkadot/node/network/availability-recovery/src/tests.rs
@@ -53,7 +53,8 @@ fn test_harness_fast_path Has,
- ) {
+ ) -> Vec, RequestFailure>>> {
// arbitrary order.
- for _ in 0..n {
+ let mut i = 0;
+ let mut senders = Vec::new();
+ while i < n {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
- mut requests,
+ requests,
IfDisconnected::TryConnect,
)
) => {
- assert_eq!(requests.len(), 1);
+ for req in requests {
+ i += 1;
+ assert_matches!(
+ req,
+ Requests::ChunkFetching(req) => {
+ assert_eq!(req.payload.candidate_hash, candidate_hash);
- assert_matches!(
- requests.pop().unwrap(),
- Requests::ChunkFetching(req) => {
- assert_eq!(req.payload.candidate_hash, candidate_hash);
+ let validator_index = req.payload.index.0 as usize;
+ let available_data = match who_has(validator_index) {
+ Has::No => Ok(None),
+ Has::Yes => Ok(Some(self.chunks[validator_index].clone().into())),
+ Has::NetworkError(e) => Err(e),
+ Has::DoesNotReturn => {
+ senders.push(req.pending_response);
+ continue
+ }
+ };
- let validator_index = req.payload.index.0 as usize;
- let available_data = match who_has(validator_index) {
- Has::No => Ok(None),
- Has::Yes => Ok(Some(self.chunks[validator_index].clone().into())),
- Has::NetworkError(e) => Err(e),
- };
-
- let _ = req.pending_response.send(
- available_data.map(|r|
- req_res::v1::ChunkFetchingResponse::from(r).encode()
- )
- );
- }
- )
+ let _ = req.pending_response.send(
+ available_data.map(|r|
+ req_res::v1::ChunkFetchingResponse::from(r).encode()
+ )
+ );
+ }
+ )
+ }
}
);
}
+ senders
}
async fn test_full_data_requests(
@@ -298,7 +315,8 @@ impl TestState {
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
who_has: impl Fn(usize) -> Has,
- ) {
+ ) -> Vec, sc_network::RequestFailure>>> {
+ let mut senders = Vec::new();
for _ in 0..self.validators.len() {
// Receive a request for a chunk.
assert_matches!(
@@ -324,6 +342,10 @@ impl TestState {
Has::No => Ok(None),
Has::Yes => Ok(Some(self.available_data.clone())),
Has::NetworkError(e) => Err(e),
+ Has::DoesNotReturn => {
+ senders.push(req.pending_response);
+ continue
+ }
};
let done = available_data.as_ref().ok().map_or(false, |x| x.is_some());
@@ -340,6 +362,7 @@ impl TestState {
}
);
}
+ senders
}
}
@@ -980,13 +1003,12 @@ fn chunks_retry_until_all_nodes_respond() {
.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
- test_state.validators.len(),
+ test_state.validators.len() - test_state.threshold(),
|_| Has::timeout(),
)
.await;
// we get to go another round!
-
test_state
.test_chunk_requests(
candidate_hash,
@@ -1002,6 +1024,155 @@ fn chunks_retry_until_all_nodes_respond() {
});
}
+#[test]
+fn not_returning_requests_wont_stall_retrieval() {
+ let test_state = TestState::default();
+
+ test_harness_chunks_only(|mut virtual_overseer, req_cfg| async move {
+ overseer_signal(
+ &mut virtual_overseer,
+ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
+ hash: test_state.current.clone(),
+ number: 1,
+ status: LeafStatus::Fresh,
+ span: Arc::new(jaeger::Span::Disabled),
+ })),
+ )
+ .await;
+
+ let (tx, rx) = oneshot::channel();
+
+ overseer_send(
+ &mut virtual_overseer,
+ AvailabilityRecoveryMessage::RecoverAvailableData(
+ test_state.candidate.clone(),
+ test_state.session_index,
+ Some(GroupIndex(0)),
+ tx,
+ ),
+ )
+ .await;
+
+ test_state.test_runtime_api(&mut virtual_overseer).await;
+
+ let candidate_hash = test_state.candidate.hash();
+
+ test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
+ test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
+
+ // How many validators should not respond at all:
+ let not_returning_count = 1;
+
+ // Not returning senders won't cause the retrieval to stall:
+ let _senders = test_state
+ .test_chunk_requests(candidate_hash, &mut virtual_overseer, not_returning_count, |_| {
+ Has::DoesNotReturn
+ })
+ .await;
+
+ test_state
+ .test_chunk_requests(
+ candidate_hash,
+ &mut virtual_overseer,
+ // Should start over:
+ test_state.validators.len() + 3,
+ |_| Has::timeout(),
+ )
+ .await;
+
+ // we get to go another round!
+ test_state
+ .test_chunk_requests(
+ candidate_hash,
+ &mut virtual_overseer,
+ test_state.threshold(),
+ |_| Has::Yes,
+ )
+ .await;
+
+ // Recovered data should match the original one:
+ assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
+ (virtual_overseer, req_cfg)
+ });
+}
+
+#[test]
+fn all_not_returning_requests_still_recovers_on_return() {
+ let test_state = TestState::default();
+
+ test_harness_chunks_only(|mut virtual_overseer, req_cfg| async move {
+ overseer_signal(
+ &mut virtual_overseer,
+ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
+ hash: test_state.current.clone(),
+ number: 1,
+ status: LeafStatus::Fresh,
+ span: Arc::new(jaeger::Span::Disabled),
+ })),
+ )
+ .await;
+
+ let (tx, rx) = oneshot::channel();
+
+ overseer_send(
+ &mut virtual_overseer,
+ AvailabilityRecoveryMessage::RecoverAvailableData(
+ test_state.candidate.clone(),
+ test_state.session_index,
+ Some(GroupIndex(0)),
+ tx,
+ ),
+ )
+ .await;
+
+ test_state.test_runtime_api(&mut virtual_overseer).await;
+
+ let candidate_hash = test_state.candidate.hash();
+
+ test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
+ test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
+
+ let senders = test_state
+ .test_chunk_requests(
+ candidate_hash,
+ &mut virtual_overseer,
+ test_state.validators.len(),
+ |_| Has::DoesNotReturn,
+ )
+ .await;
+
+ future::join(
+ async {
+ Delay::new(Duration::from_millis(10)).await;
+ // Now retrieval should be able to recover.
+ std::mem::drop(senders);
+ },
+ test_state.test_chunk_requests(
+ candidate_hash,
+ &mut virtual_overseer,
+ // Should start over:
+ test_state.validators.len() + 3,
+ |_| Has::timeout(),
+ ),
+ )
+ .await;
+
+ // we get to go another round!
+ test_state
+ .test_chunk_requests(
+ candidate_hash,
+ &mut virtual_overseer,
+ test_state.threshold(),
+ |_| Has::Yes,
+ )
+ .await;
+
+ // Recovered data should match the original one:
+ assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
+ (virtual_overseer, req_cfg)
+ });
+}
+
#[test]
fn returns_early_if_we_have_the_data() {
let test_state = TestState::default();
@@ -1097,3 +1268,34 @@ fn does_not_query_local_validator() {
(virtual_overseer, req_cfg)
});
}
+
+#[test]
+fn parallel_request_calculation_works_as_expected() {
+ let num_validators = 100;
+ let threshold = recovery_threshold(num_validators).unwrap();
+ let mut phase = RequestChunksPhase::new(100);
+ assert_eq!(phase.get_desired_request_count(threshold), threshold);
+ phase.error_count = 1;
+ phase.total_received_responses = 1;
+ // We saturate at threshold (34):
+ assert_eq!(phase.get_desired_request_count(threshold), threshold);
+
+ let dummy_chunk =
+ ErasureChunk { chunk: Vec::new(), index: ValidatorIndex(0), proof: Proof::dummy_proof() };
+ phase.received_chunks.insert(ValidatorIndex(0), dummy_chunk.clone());
+ phase.total_received_responses = 2;
+ // With given error rate - still saturating:
+ assert_eq!(phase.get_desired_request_count(threshold), threshold);
+ for i in 1..9 {
+ phase.received_chunks.insert(ValidatorIndex(i), dummy_chunk.clone());
+ }
+ phase.total_received_responses += 8;
+ // error rate: 1/10
+ // remaining chunks needed: threshold (34) - 9
+ // expected: 24 * (1+ 1/10) = (next greater integer) = 27
+ assert_eq!(phase.get_desired_request_count(threshold), 27);
+ phase.received_chunks.insert(ValidatorIndex(9), dummy_chunk.clone());
+ phase.error_count = 0;
+ // With error count zero - we should fetch exactly as needed:
+ assert_eq!(phase.get_desired_request_count(threshold), threshold - phase.received_chunks.len());
+}
diff --git a/polkadot/node/network/protocol/src/request_response/mod.rs b/polkadot/node/network/protocol/src/request_response/mod.rs
index 4f922f5539..3d5e445e26 100644
--- a/polkadot/node/network/protocol/src/request_response/mod.rs
+++ b/polkadot/node/network/protocol/src/request_response/mod.rs
@@ -89,6 +89,9 @@ const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
/// peer set as well).
const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1);
+/// Timeout for requesting availability chunks.
+pub const CHUNK_REQUEST_TIMEOUT: Duration = DEFAULT_REQUEST_TIMEOUT_CONNECTED;
+
/// This timeout is based on what seems sensible from a time budget perspective, considering 6
/// second block time. This is going to be tough, if we have multiple forks and large PoVs, but we
/// only have so much time.
@@ -132,7 +135,7 @@ impl Protocol {
max_request_size: 1_000,
max_response_size: POV_RESPONSE_SIZE as u64 / 10,
// We are connected to all validators:
- request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED,
+ request_timeout: CHUNK_REQUEST_TIMEOUT,
inbound_queue: Some(tx),
},
Protocol::CollationFetching => RequestResponseConfig {
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index 6aabb93d76..7503874796 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -304,6 +304,13 @@ impl Proof {
pub fn as_vec(&self) -> Vec> {
self.0.as_vec().iter().map(|v| v.as_vec().clone()).collect()
}
+
+ /// Construct an invalid dummy proof
+ ///
+ /// Useful for testing, should absolutely not be used in production.
+ pub fn dummy_proof() -> Proof {
+ Proof(BoundedVec::from_vec(vec![BoundedVec::from_vec(vec![0]).unwrap()]).unwrap())
+ }
}
#[derive(thiserror::Error, Debug)]
diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs
index fc82b7cf30..280187f977 100644
--- a/polkadot/node/service/src/overseer.rs
+++ b/polkadot/node/service/src/overseer.rs
@@ -170,6 +170,7 @@ where
),
availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only(
available_data_req_receiver,
+ Metrics::register(registry)?,
),
availability_store: AvailabilityStoreSubsystem::new(
parachains_db.clone(),