From 0bc42785b4355fd016ce3959a9f149287d76f68c Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Tue, 30 Mar 2021 00:28:43 +0200 Subject: [PATCH] availability-distribution: Retry failed fetches on next block. (#2762) * availability-distribution: Retry on fail on next block. Retry failed fetches on next block when still pending availability. * Update node/network/availability-distribution/src/requester/fetch_task/mod.rs Co-authored-by: Andronik Ordian * Fix existing tests. * Add test for trying all validators. * Add test for testing retries. Co-authored-by: Andronik Ordian --- .../src/requester/fetch_task/mod.rs | 31 ++++++--- .../src/requester/fetch_task/tests.rs | 1 + .../src/requester/mod.rs | 11 ++-- .../src/tests/mock.rs | 1 + .../src/tests/mod.rs | 64 +++++++++++++++++++ .../src/tests/state.rs | 15 +++-- 6 files changed, 105 insertions(+), 18 deletions(-) diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index c0a09842fd..04b50a78ff 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -26,10 +26,7 @@ use polkadot_node_network_protocol::request_response::{ request::{OutgoingRequest, RequestError, Requests, Recipient}, v1::{ChunkFetchingRequest, ChunkFetchingResponse}, }; -use polkadot_primitives::v1::{ - AuthorityDiscoveryId, BlakeTwo256, GroupIndex, Hash, HashT, OccupiedCore, - SessionIndex, -}; +use polkadot_primitives::v1::{AuthorityDiscoveryId, BlakeTwo256, CandidateHash, GroupIndex, Hash, HashT, OccupiedCore, SessionIndex}; use polkadot_node_primitives::ErasureChunk; use polkadot_subsystem::messages::{ AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected, @@ -89,6 +86,9 @@ pub enum FromFetchTask { /// In case of `None` everything was fine, in case of `Some`, some validators in the group /// did not serve us our chunk as expected. Concluded(Option), + + /// We were not able to fetch the desired chunk for the given `CandidateHash`. + Failed(CandidateHash), } /// Information a running task needs. @@ -262,7 +262,7 @@ impl RunningTask { /// Try validators in backing group in order. async fn run_inner(mut self) { let mut bad_validators = Vec::new(); - let mut label = FAILED; + let mut succeeded = false; let mut count: u32 = 0; let mut _span = self.span.child("fetch-task") .with_chunk_index(self.request.index.0) @@ -315,13 +315,18 @@ impl RunningTask { // Ok, let's store it and be happy: self.store_chunk(chunk).await; - label = SUCCEEDED; + succeeded = true; _span.add_string_tag("success", "true"); break; } _span.add_int_tag("tries", count as _); - self.metrics.on_fetch(label); - self.conclude(bad_validators).await; + if succeeded { + self.metrics.on_fetch(SUCCEEDED); + self.conclude(bad_validators).await; + } else { + self.metrics.on_fetch(FAILED); + self.conclude_fail().await + } } /// Do request and return response, if successful. @@ -434,4 +439,14 @@ impl RunningTask { ); } } + + async fn conclude_fail(&mut self) { + if let Err(err) = self.sender.send(FromFetchTask::Failed(self.request.candidate_hash)).await { + tracing::warn!( + target: LOG_TARGET, + ?err, + "Sending `Failed` message for task failed" + ); + } + } } diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs index d84d2c646b..db8790435b 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -228,6 +228,7 @@ impl TestRun { ); match msg { FromFetchTask::Concluded(_) => break, + FromFetchTask::Failed(_) => break, FromFetchTask::Message(msg) => end_ok = self.handle_message(msg).await, } diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 5516340df3..4881977355 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -54,6 +54,8 @@ pub struct Requester { /// /// We keep those around as long as a candidate is pending availability on some leaf, so we /// won't fetch chunks multiple times. + /// + /// We remove them on failure, so we get retries on the next block still pending availability. fetches: HashMap, /// Localized information about sessions we are currently interested in. @@ -76,10 +78,7 @@ impl Requester { /// by advancing the stream. #[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))] pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { - // All we do is forwarding messages, no need to make this big. - // Each sender will get one slot, see - // [here](https://docs.rs/futures/0.3.13/futures/channel/mpsc/fn.channel.html). - let (tx, rx) = mpsc::channel(0); + let (tx, rx) = mpsc::channel(1); Requester { fetches: HashMap::new(), session_cache: SessionCache::new(keystore), @@ -214,6 +213,10 @@ impl Stream for Requester { } Poll::Ready(Some(FromFetchTask::Concluded(None))) => continue, + Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => { + // Make sure we retry on next block still pending availability. + self.fetches.remove(&candidate_hash); + } Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => diff --git a/polkadot/node/network/availability-distribution/src/tests/mock.rs b/polkadot/node/network/availability-distribution/src/tests/mock.rs index 7c1b57b808..6ba0973400 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mock.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mock.rs @@ -139,6 +139,7 @@ impl TestCandidateBuilder { } } +// Get chunk for index 0 pub fn get_valid_chunk_data(pov: PoV) -> (Hash, ErasureChunk) { let fake_validator_count = 10; let persisted = PersistedValidationData { diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs index be9d4c7d1e..6a2caaca5f 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mod.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs @@ -14,8 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use std::collections::HashSet; + use futures::{executor, future, Future}; +use polkadot_primitives::v1::CoreState; use sp_keystore::SyncCryptoStorePtr; use polkadot_subsystem_testhelpers as test_helpers; @@ -61,3 +64,64 @@ fn check_basic() { state.run(harness) }); } + +/// Check whether requester tries all validators in group. +#[test] +fn check_fetch_tries_all() { + let mut state = TestState::default(); + for (_, v) in state.chunks.iter_mut() { + // 4 validators in group, so this should still succeed: + v.push(None); + v.push(None); + v.push(None); + } + test_harness(state.keystore.clone(), move |harness| { + state.run(harness) + }); +} + +/// Check whether requester tries all validators in group +/// +/// Check that requester will retry the fetch on error on the next block still pending +/// availability. +#[test] +fn check_fetch_retry() { + let mut state = TestState::default(); + state.cores.insert( + state.relay_chain[2], + state.cores.get(&state.relay_chain[1]).unwrap().clone(), + ); + // We only care about the first three blocks. + // 1. scheduled + // 2. occupied + // 3. still occupied + state.relay_chain.truncate(3); + + // Get rid of unused valid chunks: + let valid_candidate_hashes: HashSet<_> = state.cores + .get(&state.relay_chain[1]) + .iter() + .map(|v| v.iter()) + .flatten() + .filter_map(|c| { + match c { + CoreState::Occupied(core) => Some(core.candidate_hash), + _ => None, + } + }) + .collect(); + state.valid_chunks.retain(|(ch, _)| valid_candidate_hashes.contains(ch)); + + + for (_, v) in state.chunks.iter_mut() { + // This should still succeed as cores are still pending availability on next block. + v.push(None); + v.push(None); + v.push(None); + v.push(None); + v.push(None); + } + test_harness(state.keystore.clone(), move |harness| { + state.run(harness) + }); +} diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index dc3ce8d697..b1a1850024 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -63,9 +63,12 @@ pub struct TestHarness { /// `valid_chunks`. #[derive(Clone)] pub struct TestState { - // Simulated relay chain heads: + /// Simulated relay chain heads: pub relay_chain: Vec, - pub chunks: HashMap<(CandidateHash, ValidatorIndex), ErasureChunk>, + /// Whenever the subsystem tries to fetch an erasure chunk one item of the given vec will be + /// popped. So you can experiment with serving invalid chunks or no chunks on request and see + /// whether the subystem still succeds with its goal. + pub chunks: HashMap<(CandidateHash, ValidatorIndex), Vec>>, /// All chunks that are valid and should be accepted. pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>, pub session_info: SessionInfo, @@ -125,7 +128,7 @@ impl Default for TestState { let mut chunks_other_groups = p_chunks.into_iter(); chunks_other_groups.next(); for (validator_index, chunk) in chunks_other_groups { - chunks.insert((validator_index, chunk.index), chunk); + chunks.insert((validator_index, chunk.index), vec![Some(chunk)]); } } (cores, chunks) @@ -158,7 +161,7 @@ impl TestState { /// /// We try to be as agnostic about details as possible, how the subsystem achieves those goals /// should not be a matter to this test suite. - async fn run_inner(self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle) { + async fn run_inner(mut self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle) { // We skip genesis here (in reality ActiveLeavesUpdate can also skip a block: let updates = { let mut advanced = self.relay_chain.iter(); @@ -217,8 +220,8 @@ impl TestState { } } AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx)) => { - let chunk = self.chunks.get(&(candidate_hash, validator_index)); - tx.send(chunk.map(Clone::clone)) + let chunk = self.chunks.get_mut(&(candidate_hash, validator_index)).map(Vec::pop).flatten().flatten(); + tx.send(chunk) .expect("Receiver is expected to be alive"); } AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk{candidate_hash, chunk, tx, ..}) => {