mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 22:07:58 +00:00
availability-distribution: Retry failed fetches on next block. (#2762)
* availability-distribution: Retry on fail on next block. Retry failed fetches on next block when still pending availability. * Update node/network/availability-distribution/src/requester/fetch_task/mod.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Fix existing tests. * Add test for trying all validators. * Add test for testing retries. Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
@@ -26,10 +26,7 @@ use polkadot_node_network_protocol::request_response::{
|
||||
request::{OutgoingRequest, RequestError, Requests, Recipient},
|
||||
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
AuthorityDiscoveryId, BlakeTwo256, GroupIndex, Hash, HashT, OccupiedCore,
|
||||
SessionIndex,
|
||||
};
|
||||
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlakeTwo256, CandidateHash, GroupIndex, Hash, HashT, OccupiedCore, SessionIndex};
|
||||
use polkadot_node_primitives::ErasureChunk;
|
||||
use polkadot_subsystem::messages::{
|
||||
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected,
|
||||
@@ -89,6 +86,9 @@ pub enum FromFetchTask {
|
||||
/// In case of `None` everything was fine, in case of `Some`, some validators in the group
|
||||
/// did not serve us our chunk as expected.
|
||||
Concluded(Option<BadValidators>),
|
||||
|
||||
/// We were not able to fetch the desired chunk for the given `CandidateHash`.
|
||||
Failed(CandidateHash),
|
||||
}
|
||||
|
||||
/// Information a running task needs.
|
||||
@@ -262,7 +262,7 @@ impl RunningTask {
|
||||
/// Try validators in backing group in order.
|
||||
async fn run_inner(mut self) {
|
||||
let mut bad_validators = Vec::new();
|
||||
let mut label = FAILED;
|
||||
let mut succeeded = false;
|
||||
let mut count: u32 = 0;
|
||||
let mut _span = self.span.child("fetch-task")
|
||||
.with_chunk_index(self.request.index.0)
|
||||
@@ -315,13 +315,18 @@ impl RunningTask {
|
||||
|
||||
// Ok, let's store it and be happy:
|
||||
self.store_chunk(chunk).await;
|
||||
label = SUCCEEDED;
|
||||
succeeded = true;
|
||||
_span.add_string_tag("success", "true");
|
||||
break;
|
||||
}
|
||||
_span.add_int_tag("tries", count as _);
|
||||
self.metrics.on_fetch(label);
|
||||
self.conclude(bad_validators).await;
|
||||
if succeeded {
|
||||
self.metrics.on_fetch(SUCCEEDED);
|
||||
self.conclude(bad_validators).await;
|
||||
} else {
|
||||
self.metrics.on_fetch(FAILED);
|
||||
self.conclude_fail().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Do request and return response, if successful.
|
||||
@@ -434,4 +439,14 @@ impl RunningTask {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn conclude_fail(&mut self) {
|
||||
if let Err(err) = self.sender.send(FromFetchTask::Failed(self.request.candidate_hash)).await {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"Sending `Failed` message for task failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,6 +228,7 @@ impl TestRun {
|
||||
);
|
||||
match msg {
|
||||
FromFetchTask::Concluded(_) => break,
|
||||
FromFetchTask::Failed(_) => break,
|
||||
FromFetchTask::Message(msg) =>
|
||||
end_ok = self.handle_message(msg).await,
|
||||
}
|
||||
|
||||
@@ -54,6 +54,8 @@ pub struct Requester {
|
||||
///
|
||||
/// We keep those around as long as a candidate is pending availability on some leaf, so we
|
||||
/// won't fetch chunks multiple times.
|
||||
///
|
||||
/// We remove them on failure, so we get retries on the next block still pending availability.
|
||||
fetches: HashMap<CandidateHash, FetchTask>,
|
||||
|
||||
/// Localized information about sessions we are currently interested in.
|
||||
@@ -76,10 +78,7 @@ impl Requester {
|
||||
/// by advancing the stream.
|
||||
#[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))]
|
||||
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
|
||||
// All we do is forwarding messages, no need to make this big.
|
||||
// Each sender will get one slot, see
|
||||
// [here](https://docs.rs/futures/0.3.13/futures/channel/mpsc/fn.channel.html).
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
Requester {
|
||||
fetches: HashMap::new(),
|
||||
session_cache: SessionCache::new(keystore),
|
||||
@@ -214,6 +213,10 @@ impl Stream for Requester {
|
||||
}
|
||||
Poll::Ready(Some(FromFetchTask::Concluded(None))) =>
|
||||
continue,
|
||||
Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => {
|
||||
// Make sure we retry on next block still pending availability.
|
||||
self.fetches.remove(&candidate_hash);
|
||||
}
|
||||
Poll::Ready(None) =>
|
||||
return Poll::Ready(None),
|
||||
Poll::Pending =>
|
||||
|
||||
@@ -139,6 +139,7 @@ impl TestCandidateBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
// Get chunk for index 0
|
||||
pub fn get_valid_chunk_data(pov: PoV) -> (Hash, ErasureChunk) {
|
||||
let fake_validator_count = 10;
|
||||
let persisted = PersistedValidationData {
|
||||
|
||||
@@ -14,8 +14,11 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use futures::{executor, future, Future};
|
||||
|
||||
use polkadot_primitives::v1::CoreState;
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
|
||||
use polkadot_subsystem_testhelpers as test_helpers;
|
||||
@@ -61,3 +64,64 @@ fn check_basic() {
|
||||
state.run(harness)
|
||||
});
|
||||
}
|
||||
|
||||
/// Check whether requester tries all validators in group.
|
||||
#[test]
|
||||
fn check_fetch_tries_all() {
|
||||
let mut state = TestState::default();
|
||||
for (_, v) in state.chunks.iter_mut() {
|
||||
// 4 validators in group, so this should still succeed:
|
||||
v.push(None);
|
||||
v.push(None);
|
||||
v.push(None);
|
||||
}
|
||||
test_harness(state.keystore.clone(), move |harness| {
|
||||
state.run(harness)
|
||||
});
|
||||
}
|
||||
|
||||
/// Check whether requester tries all validators in group
|
||||
///
|
||||
/// Check that requester will retry the fetch on error on the next block still pending
|
||||
/// availability.
|
||||
#[test]
|
||||
fn check_fetch_retry() {
|
||||
let mut state = TestState::default();
|
||||
state.cores.insert(
|
||||
state.relay_chain[2],
|
||||
state.cores.get(&state.relay_chain[1]).unwrap().clone(),
|
||||
);
|
||||
// We only care about the first three blocks.
|
||||
// 1. scheduled
|
||||
// 2. occupied
|
||||
// 3. still occupied
|
||||
state.relay_chain.truncate(3);
|
||||
|
||||
// Get rid of unused valid chunks:
|
||||
let valid_candidate_hashes: HashSet<_> = state.cores
|
||||
.get(&state.relay_chain[1])
|
||||
.iter()
|
||||
.map(|v| v.iter())
|
||||
.flatten()
|
||||
.filter_map(|c| {
|
||||
match c {
|
||||
CoreState::Occupied(core) => Some(core.candidate_hash),
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
state.valid_chunks.retain(|(ch, _)| valid_candidate_hashes.contains(ch));
|
||||
|
||||
|
||||
for (_, v) in state.chunks.iter_mut() {
|
||||
// This should still succeed as cores are still pending availability on next block.
|
||||
v.push(None);
|
||||
v.push(None);
|
||||
v.push(None);
|
||||
v.push(None);
|
||||
v.push(None);
|
||||
}
|
||||
test_harness(state.keystore.clone(), move |harness| {
|
||||
state.run(harness)
|
||||
});
|
||||
}
|
||||
|
||||
@@ -63,9 +63,12 @@ pub struct TestHarness {
|
||||
/// `valid_chunks`.
|
||||
#[derive(Clone)]
|
||||
pub struct TestState {
|
||||
// Simulated relay chain heads:
|
||||
/// Simulated relay chain heads:
|
||||
pub relay_chain: Vec<Hash>,
|
||||
pub chunks: HashMap<(CandidateHash, ValidatorIndex), ErasureChunk>,
|
||||
/// Whenever the subsystem tries to fetch an erasure chunk one item of the given vec will be
|
||||
/// popped. So you can experiment with serving invalid chunks or no chunks on request and see
|
||||
/// whether the subystem still succeds with its goal.
|
||||
pub chunks: HashMap<(CandidateHash, ValidatorIndex), Vec<Option<ErasureChunk>>>,
|
||||
/// All chunks that are valid and should be accepted.
|
||||
pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
|
||||
pub session_info: SessionInfo,
|
||||
@@ -125,7 +128,7 @@ impl Default for TestState {
|
||||
let mut chunks_other_groups = p_chunks.into_iter();
|
||||
chunks_other_groups.next();
|
||||
for (validator_index, chunk) in chunks_other_groups {
|
||||
chunks.insert((validator_index, chunk.index), chunk);
|
||||
chunks.insert((validator_index, chunk.index), vec![Some(chunk)]);
|
||||
}
|
||||
}
|
||||
(cores, chunks)
|
||||
@@ -158,7 +161,7 @@ impl TestState {
|
||||
///
|
||||
/// We try to be as agnostic about details as possible, how the subsystem achieves those goals
|
||||
/// should not be a matter to this test suite.
|
||||
async fn run_inner(self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) {
|
||||
async fn run_inner(mut self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) {
|
||||
// We skip genesis here (in reality ActiveLeavesUpdate can also skip a block:
|
||||
let updates = {
|
||||
let mut advanced = self.relay_chain.iter();
|
||||
@@ -217,8 +220,8 @@ impl TestState {
|
||||
}
|
||||
}
|
||||
AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx)) => {
|
||||
let chunk = self.chunks.get(&(candidate_hash, validator_index));
|
||||
tx.send(chunk.map(Clone::clone))
|
||||
let chunk = self.chunks.get_mut(&(candidate_hash, validator_index)).map(Vec::pop).flatten().flatten();
|
||||
tx.send(chunk)
|
||||
.expect("Receiver is expected to be alive");
|
||||
}
|
||||
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk{candidate_hash, chunk, tx, ..}) => {
|
||||
|
||||
Reference in New Issue
Block a user