Add retry mechanism for pov-recovery, fix full-node pov-recovery (#2164)

* Increase delay for pov-recovery

* Update client/service/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* Comment

* FMT

* Clear waiting_recovery when block is recovered or recovery failed

* Introduce recovery queue that preserved insertion order

* Better error logs

* Decrease slot duration

* Style improvements

* Add option to use unordered queue

* Maintain cache of finalized blocks

* Wait for one relay chain slot before recovery

* Make retries testable

* fmt

* Improve docs

* Improve docs

* Simplify RecoveryQueue

* Remove unwanted changes

* Adjust to comments

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Move recovery delay into the queue

* Check for finalized number

* Clean up

* Use timer

Co-authored-by: Bastian Köcher <git@kchr.de>

* Simplify implementation

* Revert "Use timer"

This reverts commit 3809eed840d3a09d54212f99486782ff80cdc1c9.

* Properly clear `to_recover` flag

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Sebastian Kunert
2023-02-09 14:18:55 +01:00
committed by GitHub
parent b3d68426a2
commit 588bdad7f6
15 changed files with 419 additions and 161 deletions
+1
View File
@@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
# Cumulus
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = {path = "../relay-chain-interface"}
async-trait = "0.1.64"
[dev-dependencies]
tokio = { version = "1.25.0", features = ["macros"] }
@@ -18,12 +18,13 @@ use sp_runtime::traits::Block as BlockT;
use polkadot_node_primitives::AvailableData;
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use std::{collections::HashSet, pin::Pin};
use crate::RecoveryHandle;
/// The active candidate recovery.
///
/// This handles the candidate recovery and tracks the activate recoveries.
@@ -34,12 +35,12 @@ pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
>,
/// The block hashes of the candidates currently being recovered.
candidates: HashSet<Block::Hash>,
overseer_handle: OverseerHandle,
recovery_handle: Box<dyn RecoveryHandle>,
}
impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub fn new(overseer_handle: OverseerHandle) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle }
}
/// Recover the given `candidate`.
@@ -50,8 +51,8 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
) {
let (tx, rx) = oneshot::channel();
self.overseer_handle
.send_msg(
self.recovery_handle
.send_recovery_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.receipt.clone(),
candidate.session_index,
@@ -90,11 +91,6 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
);
}
/// Returns if the given `candidate` is being recovered.
pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool {
self.candidates.contains(candidate)
}
/// Waits for the next recovery.
///
/// If the returned [`AvailableData`] is `None`, it means that the recovery failed.
+161 -74
View File
@@ -29,14 +29,18 @@
//!
//! 1. For every included relay chain block we note the backed candidate of our parachain. If the
//! block belonging to the PoV is already known, we do nothing. Otherwise we start
//! a timer that waits a random time between 0..relay_chain_slot_length before starting to recover
//! a timer that waits for a randomized time inside a specified interval before starting to recover
//! the PoV.
//!
//! 2. If between starting and firing the timer the block is imported, we skip the recovery of the
//! PoV.
//!
//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol. After it
//! is recovered, we restore the block and import it.
//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol.
//!
//! 4a. After it is recovered, we restore the block and import it.
//!
//! 4b. Since we are trying to recover pending candidates, availability is not guaranteed. If the block
//! PoV is not yet available, we retry.
//!
//! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we
//! make sure that the blocks are imported in the correct order.
@@ -47,6 +51,7 @@ use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex,
@@ -60,10 +65,10 @@ use futures::{
channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use rand::{thread_rng, Rng};
use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
use std::{
collections::{HashMap, VecDeque},
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
sync::Arc,
time::Duration,
@@ -74,6 +79,28 @@ use active_candidate_recovery::ActiveCandidateRecovery;
const LOG_TARGET: &str = "cumulus-pov-recovery";
/// Test-friendly wrapper trait for the overseer handle.
/// Can be used to simulate failing recovery requests.
#[async_trait::async_trait]
pub trait RecoveryHandle: Send {
async fn send_recovery_msg(
&mut self,
message: AvailabilityRecoveryMessage,
origin: &'static str,
);
}
#[async_trait::async_trait]
impl RecoveryHandle for OverseerHandle {
async fn send_recovery_msg(
&mut self,
message: AvailabilityRecoveryMessage,
origin: &'static str,
) {
self.send_msg(message, origin).await;
}
}
/// Type of recovery to trigger.
#[derive(Debug, PartialEq)]
pub enum RecoveryKind {
@@ -87,24 +114,30 @@ pub enum RecoveryKind {
pub struct RecoveryRequest<Block: BlockT> {
/// Hash of the last block to recover.
pub hash: Block::Hash,
/// Recovery delay range. Randomizing the start of the recovery within this interval
/// can be used to prevent self-DOSing if the recovery request is part of a
/// distributed protocol and there is the possibility that multiple actors are
/// requiring to perform the recovery action at approximately the same time.
pub delay: RecoveryDelay,
/// Recovery type.
pub kind: RecoveryKind,
}
/// The delay between observing an unknown block and triggering the recovery of a block.
/// Randomizing the start of the recovery within this interval
/// can be used to prevent self-DOSing if the recovery request is part of a
/// distributed protocol and there is the possibility that multiple actors are
/// requiring to perform the recovery action at approximately the same time.
#[derive(Clone, Copy)]
pub struct RecoveryDelay {
pub struct RecoveryDelayRange {
/// Start recovering after `min` delay.
pub min: Duration,
/// Start recovering before `max` delay.
pub max: Duration,
}
impl RecoveryDelayRange {
/// Produce a randomized duration between `min` and `max`.
fn duration(&self) -> Duration {
Uniform::from(self.min..=self.max).sample(&mut thread_rng())
}
}
/// Represents an outstanding block candidate.
struct Candidate<Block: BlockT> {
receipt: CandidateReceipt,
@@ -112,9 +145,66 @@ struct Candidate<Block: BlockT> {
block_number: NumberFor<Block>,
parent_hash: Block::Hash,
// Lazy recovery has been submitted.
// Should be true iff a block is either queued to be recovered or
// recovery is currently in progress.
waiting_recovery: bool,
}
/// Queue that is used to decide when to start PoV-recovery operations.
struct RecoveryQueue<Block: BlockT> {
recovery_delay_range: RecoveryDelayRange,
// Queue that keeps the hashes of blocks to be recovered.
recovery_queue: VecDeque<Block::Hash>,
// Futures that resolve when a new recovery should be started.
signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl<Block: BlockT> RecoveryQueue<Block> {
pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
Self {
recovery_delay_range,
recovery_queue: Default::default(),
signaling_queue: Default::default(),
}
}
/// Add hash of a block that should go to the end of the recovery queue.
/// A new recovery will be signaled after `delay` has passed.
pub fn push_recovery(&mut self, hash: Block::Hash) {
let delay = self.recovery_delay_range.duration();
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Adding block to queue and adding new recovery slot in {:?} sec",
delay.as_secs(),
);
self.recovery_queue.push_back(hash);
self.signaling_queue.push(
async move {
Delay::new(delay).await;
}
.boxed(),
);
}
/// Get the next hash for block recovery.
pub async fn next_recovery(&mut self) -> Block::Hash {
loop {
if let Some(_) = self.signaling_queue.next().await {
if let Some(hash) = self.recovery_queue.pop_front() {
return hash
} else {
tracing::error!(
target: LOG_TARGET,
"Recovery was signaled, but no candidate hash available. This is a bug."
);
};
}
futures::pending!()
}
}
}
/// Encapsulates the logic of the pov recovery.
pub struct PoVRecovery<Block: BlockT, PC, RC> {
/// All the pending candidates that we are waiting for to be imported or that need to be
@@ -122,21 +212,22 @@ pub struct PoVRecovery<Block: BlockT, PC, RC> {
candidates: HashMap<Block::Hash, Candidate<Block>>,
/// A stream of futures that resolve to hashes of candidates that need to be recovered.
///
/// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not
/// The candidates to the hashes are stored in `candidates`. If a candidate is not
/// available anymore in this map, it means that it was already imported.
next_candidate_to_recover: FuturesUnordered<Pin<Box<dyn Future<Output = Block::Hash> + Send>>>,
candidate_recovery_queue: RecoveryQueue<Block>,
active_candidate_recovery: ActiveCandidateRecovery<Block>,
/// Blocks that wait that the parent is imported.
///
/// Uses parent -> blocks mapping.
waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
recovery_delay: RecoveryDelay,
parachain_client: Arc<PC>,
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RC,
para_id: ParaId,
/// Explicit block recovery requests channel.
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
/// Blocks that we are retrying currently
candidates_in_retry: HashSet<Block::Hash>,
}
impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
@@ -146,8 +237,8 @@ where
{
/// Create a new instance.
pub fn new(
overseer_handle: OverseerHandle,
recovery_delay: RecoveryDelay,
recovery_handle: Box<dyn RecoveryHandle>,
recovery_delay_range: RecoveryDelayRange,
parachain_client: Arc<PC>,
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RCInterface,
@@ -156,14 +247,14 @@ where
) -> Self {
Self {
candidates: HashMap::new(),
next_candidate_to_recover: Default::default(),
active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle),
recovery_delay,
candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
waiting_for_parent: HashMap::new(),
parachain_client,
parachain_import_queue,
relay_chain_interface,
para_id,
candidates_in_retry: HashSet::new(),
recovery_chan_rx,
}
}
@@ -210,15 +301,11 @@ where
// If required, triggers a lazy recovery request that will eventually be blocked
// if in the meantime the block is imported.
self.recover(RecoveryRequest {
hash,
delay: self.recovery_delay,
kind: RecoveryKind::Simple,
});
self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
}
/// Handle an imported block.
fn handle_block_imported(&mut self, block_hash: &Block::Hash) {
/// Block is no longer waiting for recovery
fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
self.candidates.get_mut(block_hash).map(|candidate| {
// Prevents triggering an already enqueued recovery request
candidate.waiting_recovery = false;
@@ -241,9 +328,9 @@ where
}
}
/// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child
/// blocks.
fn clear_waiting_for_parent(&mut self, hash: Block::Hash) {
/// Clear `waiting_for_parent` and `waiting_recovery` for the candidate with `hash`.
/// Also clears children blocks waiting for this parent.
fn reset_candidate(&mut self, hash: Block::Hash) {
let mut blocks_to_delete = vec![hash];
while let Some(delete) = blocks_to_delete.pop() {
@@ -251,6 +338,7 @@ where
blocks_to_delete.extend(childs.iter().map(BlockT::hash));
}
}
self.clear_waiting_recovery(&hash);
}
/// Handle a recovered candidate.
@@ -260,11 +348,25 @@ where
available_data: Option<AvailableData>,
) {
let available_data = match available_data {
Some(data) => data,
None => {
self.clear_waiting_for_parent(block_hash);
return
Some(data) => {
self.candidates_in_retry.remove(&block_hash);
data
},
None =>
if self.candidates_in_retry.insert(block_hash) {
tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
self.candidate_recovery_queue.push_recovery(block_hash);
return
} else {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
"Unable to recover block after retry.",
);
self.candidates_in_retry.remove(&block_hash);
self.reset_candidate(block_hash);
return
},
};
let raw_block_data = match sp_maybe_compressed_blob::decompress(
@@ -275,8 +377,7 @@ where
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
self.clear_waiting_for_parent(block_hash);
self.reset_candidate(block_hash);
return
},
};
@@ -290,8 +391,7 @@ where
"Failed to decode parachain block data from recovered PoV",
);
self.clear_waiting_for_parent(block_hash);
self.reset_candidate(block_hash);
return
},
};
@@ -302,12 +402,17 @@ where
match self.parachain_client.block_status(parent) {
Ok(BlockStatus::Unknown) => {
if self.active_candidate_recovery.is_being_recovered(&parent) {
// If the parent block is currently being recovered or is scheduled to be recovered,
// we want to wait for the parent.
let parent_scheduled_for_recovery =
self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
if parent_scheduled_for_recovery {
tracing::debug!(
target: LOG_TARGET,
?block_hash,
parent_hash = ?parent,
"Parent is still being recovered, waiting.",
parent_scheduled_for_recovery,
"Waiting for recovery of parent.",
);
self.waiting_for_parent.entry(parent).or_default().push(block);
@@ -320,8 +425,7 @@ where
"Parent not found while trying to import recovered block.",
);
self.clear_waiting_for_parent(block_hash);
self.reset_candidate(block_hash);
return
}
},
@@ -333,8 +437,7 @@ where
"Error while checking block status",
);
self.clear_waiting_for_parent(block_hash);
self.reset_candidate(block_hash);
return
},
// Any other status is fine to "ignore/accept"
@@ -383,10 +486,10 @@ where
/// Attempts an explicit recovery of one or more blocks.
pub fn recover(&mut self, req: RecoveryRequest<Block>) {
let RecoveryRequest { mut hash, delay, kind } = req;
let RecoveryRequest { mut hash, kind } = req;
let mut to_recover = Vec::new();
let do_recover = loop {
loop {
let candidate = match self.candidates.get_mut(&hash) {
Some(candidate) => candidate,
None => {
@@ -395,7 +498,7 @@ where
block_hash = ?hash,
"Cound not recover. Block was never announced as candidate"
);
break false
return
},
};
@@ -404,7 +507,7 @@ where
candidate.waiting_recovery = true;
to_recover.push(hash);
},
Ok(_) => break true,
Ok(_) => break,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
@@ -412,36 +515,22 @@ where
block_hash = ?hash,
"Failed to get block status",
);
break false
for hash in to_recover {
self.clear_waiting_recovery(&hash);
}
return
},
}
if kind == RecoveryKind::Simple {
break true
break
}
hash = candidate.parent_hash;
};
}
if do_recover {
for hash in to_recover.into_iter().rev() {
let delay =
delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen());
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Starting {:?} block recovery in {:?} sec",
kind,
delay.as_secs(),
);
self.next_candidate_to_recover.push(
async move {
Delay::new(delay).await;
hash
}
.boxed(),
);
}
for hash in to_recover.into_iter().rev() {
self.candidate_recovery_queue.push_recovery(hash);
}
}
@@ -480,7 +569,7 @@ where
},
imported = imported_blocks.next() => {
if let Some(imported) = imported {
self.handle_block_imported(&imported.hash);
self.clear_waiting_recovery(&imported.hash);
} else {
tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
return;
@@ -494,10 +583,8 @@ where
return;
}
},
next_to_recover = self.next_candidate_to_recover.next() => {
if let Some(block_hash) = next_to_recover {
self.recover_candidate(block_hash).await;
}
next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
self.recover_candidate(next_to_recover).await;
},
(block_hash, available_data) =
self.active_candidate_recovery.wait_for_recovery().fuse() =>