Limit number of blocks per level (2nd attempt) (#1559)

Prevents the StateDbError::TooManySiblingBlocks error from being triggered by eagerly removing 
stale blocks from the backend on block import and before the error condition is met.

Introduces a just in time block recovery mechanism for blocks that were wrongly removed
via an explicit pov-recovery method

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Davide Galassi
2022-12-20 12:13:49 +01:00
committed by GitHub
parent 79d8c5c3b8
commit 030ba80ba0
17 changed files with 1096 additions and 184 deletions
@@ -42,19 +42,19 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
}
/// Recover the given `pending_candidate`.
/// Recover the given `candidate`.
pub async fn recover_candidate(
&mut self,
block_hash: Block::Hash,
pending_candidate: crate::PendingCandidate<Block>,
candidate: &crate::Candidate<Block>,
) {
let (tx, rx) = oneshot::channel();
self.overseer_handle
.send_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
pending_candidate.receipt,
pending_candidate.session_index,
candidate.receipt.clone(),
candidate.session_index,
None,
tx,
),
+153 -85
View File
@@ -59,7 +59,9 @@ use cumulus_primitives_core::ParachainBlockData;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use codec::Decode;
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use futures::{
channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use rand::{thread_rng, Rng};
@@ -75,38 +77,52 @@ use active_candidate_recovery::ActiveCandidateRecovery;
const LOG_TARGET: &str = "cumulus-pov-recovery";
/// Represents a pending candidate.
struct PendingCandidate<Block: BlockT> {
/// Type of recovery to trigger.
#[derive(Debug, PartialEq)]
pub enum RecoveryKind {
/// Single block recovery.
Simple,
/// Full ancestry recovery.
Full,
}
/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
pub struct RecoveryRequest<Block: BlockT> {
/// Hash of the last block to recover.
pub hash: Block::Hash,
/// Recovery delay range. Randomizing the start of the recovery within this interval
/// can be used to prevent self-DOSing if the recovery request is part of a
/// distributed protocol and there is the possibility that multiple actors are
/// requiring to perform the recovery action at approximately the same time.
pub delay: RecoveryDelay,
/// Recovery type.
pub kind: RecoveryKind,
}
/// The delay between observing an unknown block and triggering the recovery of a block.
#[derive(Clone, Copy)]
pub struct RecoveryDelay {
/// Start recovering after `min` delay.
pub min: Duration,
/// Start recovering before `max` delay.
pub max: Duration,
}
/// Represents an outstanding block candidate.
struct Candidate<Block: BlockT> {
receipt: CandidateReceipt,
session_index: SessionIndex,
block_number: NumberFor<Block>,
}
/// The delay between observing an unknown block and recovering this block.
#[derive(Clone, Copy)]
pub enum RecoveryDelay {
/// Start recovering the block in maximum of the given delay.
WithMax { max: Duration },
/// Start recovering the block after at least `min` delay and in maximum `max` delay.
WithMinAndMax { min: Duration, max: Duration },
}
impl RecoveryDelay {
/// Return as [`Delay`].
fn as_delay(self) -> Delay {
match self {
Self::WithMax { max } => Delay::new(max.mul_f64(thread_rng().gen())),
Self::WithMinAndMax { min, max } =>
Delay::new(min + max.saturating_sub(min).mul_f64(thread_rng().gen())),
}
}
parent_hash: Block::Hash,
// Lazy recovery has been submitted.
waiting_recovery: bool,
}
/// Encapsulates the logic of the pov recovery.
pub struct PoVRecovery<Block: BlockT, PC, RC> {
/// All the pending candidates that we are waiting for to be imported or that need to be
/// recovered when `next_candidate_to_recover` tells us to do so.
pending_candidates: HashMap<Block::Hash, PendingCandidate<Block>>,
candidates: HashMap<Block::Hash, Candidate<Block>>,
/// A stream of futures that resolve to hashes of candidates that need to be recovered.
///
/// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not
@@ -122,6 +138,8 @@ pub struct PoVRecovery<Block: BlockT, PC, RC> {
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RC,
para_id: ParaId,
/// Explicit block recovery requests channel.
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
}
impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
@@ -137,9 +155,10 @@ where
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RCInterface,
para_id: ParaId,
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
) -> Self {
Self {
pending_candidates: HashMap::new(),
candidates: HashMap::new(),
next_candidate_to_recover: Default::default(),
active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle),
recovery_delay,
@@ -148,6 +167,7 @@ where
parachain_import_queue,
relay_chain_interface,
para_id,
recovery_chan_rx,
}
}
@@ -174,69 +194,54 @@ where
}
let hash = header.hash();
match self.parachain_client.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::Unknown) => (),
// Any other state means, we should ignore it.
Ok(_) => return,
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to get block status",
);
return
},
}
tracing::debug!(target: LOG_TARGET, ?hash, "Adding pending candidate");
if self
.pending_candidates
.insert(
hash,
PendingCandidate {
block_number: *header.number(),
receipt: receipt.to_plain(),
session_index,
},
)
.is_some()
{
if self.candidates.contains_key(&hash) {
return
}
// Delay the recovery by some random time to not spam the relay chain.
let delay = self.recovery_delay.as_delay();
self.next_candidate_to_recover.push(
async move {
delay.await;
hash
}
.boxed(),
tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
self.candidates.insert(
hash,
Candidate {
block_number: *header.number(),
receipt: receipt.to_plain(),
session_index,
parent_hash: *header.parent_hash(),
waiting_recovery: false,
},
);
// If required, triggers a lazy recovery request that will eventually be blocked
// if in the meantime the block is imported.
self.recover(RecoveryRequest {
hash,
delay: self.recovery_delay,
kind: RecoveryKind::Simple,
});
}
/// Handle an imported block.
fn handle_block_imported(&mut self, hash: &Block::Hash) {
self.pending_candidates.remove(hash);
fn handle_block_imported(&mut self, block_hash: &Block::Hash) {
self.candidates.get_mut(block_hash).map(|candidate| {
// Prevents triggering an already enqueued recovery request
candidate.waiting_recovery = false;
});
}
/// Handle a finalized block with the given `block_number`.
fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
self.pending_candidates.retain(|_, pc| pc.block_number > block_number);
self.candidates.retain(|_, pc| pc.block_number > block_number);
}
/// Recover the candidate for the given `block_hash`.
async fn recover_candidate(&mut self, block_hash: Block::Hash) {
let pending_candidate = match self.pending_candidates.remove(&block_hash) {
Some(pending_candidate) => pending_candidate,
None => return,
};
tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
self.active_candidate_recovery
.recover_candidate(block_hash, pending_candidate)
.await;
match self.candidates.get(&block_hash) {
Some(candidate) if candidate.waiting_recovery => {
tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
},
_ => (),
}
}
/// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child
@@ -348,7 +353,7 @@ where
async fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();
tracing::debug!(target: LOG_TARGET, hash = ?block.hash(), "Importing block retrieved using pov_recovery");
tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
blocks.push_back(block);
let mut incoming_blocks = Vec::new();
@@ -379,6 +384,70 @@ where
.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
}
/// Attempts an explicit recovery of one or more blocks.
pub fn recover(&mut self, req: RecoveryRequest<Block>) {
let RecoveryRequest { mut hash, delay, kind } = req;
let mut to_recover = Vec::new();
let do_recover = loop {
let candidate = match self.candidates.get_mut(&hash) {
Some(candidate) => candidate,
None => {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Cound not recover. Block was never announced as candidate"
);
break false
},
};
match self.parachain_client.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
candidate.waiting_recovery = true;
to_recover.push(hash);
},
Ok(_) => break true,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to get block status",
);
break false
},
}
if kind == RecoveryKind::Simple {
break true
}
hash = candidate.parent_hash;
};
if do_recover {
for hash in to_recover.into_iter().rev() {
let delay =
delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen());
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Starting {:?} block recovery in {:?} sec",
kind,
delay.as_secs(),
);
self.next_candidate_to_recover.push(
async move {
Delay::new(delay).await;
hash
}
.boxed(),
);
}
}
}
/// Run the pov-recovery.
pub async fn run(mut self) {
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
@@ -400,10 +469,15 @@ where
if let Some((receipt, session_index)) = pending_candidate {
self.handle_pending_candidate(receipt, session_index);
} else {
tracing::debug!(
target: LOG_TARGET,
"Pending candidates stream ended",
);
tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
return;
}
},
recovery_req = self.recovery_chan_rx.next() => {
if let Some(req) = recovery_req {
self.recover(req);
} else {
tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
return;
}
},
@@ -411,10 +485,7 @@ where
if let Some(imported) = imported {
self.handle_block_imported(&imported.hash);
} else {
tracing::debug!(
target: LOG_TARGET,
"Imported blocks stream ended",
);
tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
return;
}
},
@@ -422,10 +493,7 @@ where
if let Some(finalized) = finalized {
self.handle_block_finalized(*finalized.header.number());
} else {
tracing::debug!(
target: LOG_TARGET,
"Finalized blocks stream ended",
);
tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
return;
}
},