mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 11:01:01 +00:00
Remove pov-recovery race condition/Improve zombienet test (#2526)
The test was a bit flaky on CI. There was a race condition in the pov-recovery system. If the timing is bad, it can happen that a block waits for a parent that is already queued for import. The check if a block has children waiting happens when we insert into the import queue. So we need to do an additional check once we receive the import notification for the parent block. Second issue is that `alice` was missing `--in-peers 0` and `--out-peers 0`, so alice was sometimes still fetching block via sync and the assertion on the logs in zombienet would fail. There is another potential issue that I saw once locally. We have a failing pov-recovery queue that fails from time to time to check that the retry mechanism does what it should. We now make sure that the same candidate is never failed twice, so the tests become more predictable.
This commit is contained in:
@@ -410,6 +410,7 @@ where
|
|||||||
?block_hash,
|
?block_hash,
|
||||||
parent_hash = ?parent,
|
parent_hash = ?parent,
|
||||||
parent_scheduled_for_recovery,
|
parent_scheduled_for_recovery,
|
||||||
|
waiting_blocks = self.waiting_for_parent.len(),
|
||||||
"Waiting for recovery of parent.",
|
"Waiting for recovery of parent.",
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -442,13 +443,13 @@ where
|
|||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
self.import_block(block).await;
|
self.import_block(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Import the given `block`.
|
/// Import the given `block`.
|
||||||
///
|
///
|
||||||
/// This will also recursivley drain `waiting_for_parent` and import them as well.
|
/// This will also recursivley drain `waiting_for_parent` and import them as well.
|
||||||
async fn import_block(&mut self, block: Block) {
|
fn import_block(&mut self, block: Block) {
|
||||||
let mut blocks = VecDeque::new();
|
let mut blocks = VecDeque::new();
|
||||||
|
|
||||||
tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
|
tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
|
||||||
@@ -551,7 +552,6 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
futures::pin_mut!(pending_candidates);
|
futures::pin_mut!(pending_candidates);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
pending_candidate = pending_candidates.next() => {
|
pending_candidate = pending_candidates.next() => {
|
||||||
@@ -573,6 +573,17 @@ where
|
|||||||
imported = imported_blocks.next() => {
|
imported = imported_blocks.next() => {
|
||||||
if let Some(imported) = imported {
|
if let Some(imported) = imported {
|
||||||
self.clear_waiting_recovery(&imported.hash);
|
self.clear_waiting_recovery(&imported.hash);
|
||||||
|
|
||||||
|
// We need to double check that no blocks are waiting for this block.
|
||||||
|
// Can happen when a waiting child block is queued to wait for parent while the parent block is still
|
||||||
|
// in the import queue.
|
||||||
|
if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
|
||||||
|
for block in waiting_blocks {
|
||||||
|
tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
|
||||||
|
self.import_block(block);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
|
tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ mod genesis;
|
|||||||
use runtime::AccountId;
|
use runtime::AccountId;
|
||||||
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
|
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
future::Future,
|
future::Future,
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
@@ -57,7 +58,7 @@ use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
|
|||||||
use frame_system_rpc_runtime_api::AccountNonceApi;
|
use frame_system_rpc_runtime_api::AccountNonceApi;
|
||||||
use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage};
|
use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage};
|
||||||
use polkadot_overseer::Handle as OverseerHandle;
|
use polkadot_overseer::Handle as OverseerHandle;
|
||||||
use polkadot_primitives::{CollatorPair, Hash as PHash, PersistedValidationData};
|
use polkadot_primitives::{CandidateHash, CollatorPair, Hash as PHash, PersistedValidationData};
|
||||||
use polkadot_service::ProvideRuntimeApi;
|
use polkadot_service::ProvideRuntimeApi;
|
||||||
use sc_consensus::ImportQueue;
|
use sc_consensus::ImportQueue;
|
||||||
use sc_network::{
|
use sc_network::{
|
||||||
@@ -144,12 +145,13 @@ pub type TransactionPool = Arc<sc_transaction_pool::FullPool<Block, Client>>;
|
|||||||
pub struct FailingRecoveryHandle {
|
pub struct FailingRecoveryHandle {
|
||||||
overseer_handle: OverseerHandle,
|
overseer_handle: OverseerHandle,
|
||||||
counter: u32,
|
counter: u32,
|
||||||
|
failed_hashes: HashSet<CandidateHash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FailingRecoveryHandle {
|
impl FailingRecoveryHandle {
|
||||||
/// Create a new FailingRecoveryHandle
|
/// Create a new FailingRecoveryHandle
|
||||||
pub fn new(overseer_handle: OverseerHandle) -> Self {
|
pub fn new(overseer_handle: OverseerHandle) -> Self {
|
||||||
Self { overseer_handle, counter: 0 }
|
Self { overseer_handle, counter: 0, failed_hashes: Default::default() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -160,11 +162,15 @@ impl RecoveryHandle for FailingRecoveryHandle {
|
|||||||
message: AvailabilityRecoveryMessage,
|
message: AvailabilityRecoveryMessage,
|
||||||
origin: &'static str,
|
origin: &'static str,
|
||||||
) {
|
) {
|
||||||
// For every 5th block we immediately signal unavailability to trigger
|
let AvailabilityRecoveryMessage::RecoverAvailableData(ref receipt, _, _, _) = message;
|
||||||
// a retry.
|
let candidate_hash = receipt.hash();
|
||||||
if self.counter % 5 == 0 {
|
|
||||||
|
// For every 3rd block we immediately signal unavailability to trigger
|
||||||
|
// a retry. The same candidate is never failed multiple times to ensure progress.
|
||||||
|
if self.counter % 3 == 0 && self.failed_hashes.insert(candidate_hash) {
|
||||||
|
tracing::info!(target: LOG_TARGET, ?candidate_hash, "Failing pov recovery.");
|
||||||
|
|
||||||
let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message;
|
let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message;
|
||||||
tracing::info!(target: LOG_TARGET, "Failing pov recovery.");
|
|
||||||
back_sender
|
back_sender
|
||||||
.send(Err(RecoveryError::Unavailable))
|
.send(Err(RecoveryError::Unavailable))
|
||||||
.expect("Return channel should work here.");
|
.expect("Return channel should work here.");
|
||||||
|
|||||||
@@ -34,13 +34,12 @@ add_to_genesis = false
|
|||||||
args = ["--disable-block-announcements"]
|
args = ["--disable-block-announcements"]
|
||||||
|
|
||||||
# run 'alice' as a parachain collator who does not produce blocks
|
# run 'alice' as a parachain collator who does not produce blocks
|
||||||
# 'alice' is a bootnode for 'bob' and 'charlie'
|
|
||||||
[[parachains.collators]]
|
[[parachains.collators]]
|
||||||
name = "alice"
|
name = "alice"
|
||||||
validator = true # collator
|
validator = true # collator
|
||||||
image = "{{COL_IMAGE}}"
|
image = "{{COL_IMAGE}}"
|
||||||
command = "test-parachain"
|
command = "test-parachain"
|
||||||
args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]
|
args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--in-peers 0", "--out-peers 0", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]
|
||||||
|
|
||||||
# run 'charlie' as a parachain full node
|
# run 'charlie' as a parachain full node
|
||||||
[[parachains.collators]]
|
[[parachains.collators]]
|
||||||
|
|||||||
Reference in New Issue
Block a user