Limit number of blocks per level (2nd attempt) (#1559)

Prevents the StateDbError::TooManySiblingBlocks error from being triggered by eagerly removing 
stale blocks from the backend on block import and before the error condition is met.

Introduces a just in time block recovery mechanism for blocks that were wrongly removed
via an explicit pov-recovery method

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Davide Galassi
2022-12-20 12:13:49 +01:00
committed by GitHub
parent f621351332
commit 73837c384e
17 changed files with 1096 additions and 184 deletions
@@ -0,0 +1,378 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
/// Value good enough to be used with parachains using the current backend implementation
/// that ships with Substrate. This value may change in the future.
pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;
// Counter threshold after which we are going to eventually cleanup our internal data.
const CLEANUP_THRESHOLD: u32 = 32;
/// Upper bound to the number of leaves allowed for each level of the blockchain.
///
/// If the limit is set and more leaves are detected on block import, then the older ones are
/// dropped to make space for the fresh blocks.
///
/// In environments where blocks confirmations from the relay chain may be "slow", then
/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale
/// leaves and prevents discarding new blocks because we've reached the backend max value.
pub enum LevelLimit {
/// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`].
Default,
/// No explicit limit, however a limit may be implicitly imposed by the backend implementation.
None,
/// Custom value.
Some(usize),
}
/// Support structure to constrain the number of leaves at each level.
pub struct LevelMonitor<Block: BlockT, BE> {
// Max number of leaves for each level.
level_limit: usize,
// Monotonic counter used to keep track of block freshness.
pub(crate) import_counter: NumberFor<Block>,
// Map between blocks hashes and freshness.
pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
// Blockchain levels cache.
pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
// Lower level number stored by the levels map.
lowest_level: NumberFor<Block>,
// Backend reference to remove blocks on level saturation.
backend: Arc<BE>,
}
/// Contains information about the target scheduled for removal.
struct TargetInfo<Block: BlockT> {
/// Index of freshest leaf in the leaves array.
freshest_leaf_idx: usize,
/// Route from target to its freshest leaf.
freshest_route: TreeRoute<Block>,
}
impl<Block, BE> LevelMonitor<Block, BE>
where
Block: BlockT,
BE: Backend<Block>,
{
/// Instance a new monitor structure.
pub fn new(level_limit: usize, backend: Arc<BE>) -> Self {
let mut monitor = LevelMonitor {
level_limit,
import_counter: Zero::zero(),
freshness: HashMap::new(),
levels: HashMap::new(),
lowest_level: Zero::zero(),
backend,
};
monitor.restore();
monitor
}
/// Restore the structure using the backend.
///
/// Blocks freshness values are inferred from the height and not from the effective import
/// moment. This is a not accurate but "good-enough" best effort solution.
///
/// Level limits are not enforced during this phase.
fn restore(&mut self) {
let info = self.backend.blockchain().info();
log::debug!(
target: "parachain",
"Restoring chain level monitor from last finalized block: {} {}",
info.finalized_number, info.finalized_hash
);
self.lowest_level = info.finalized_number;
self.import_counter = info.finalized_number;
self.block_imported(info.finalized_number, info.finalized_hash);
let mut counter_max = info.finalized_number;
for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
let route =
sp_blockchain::tree_route(self.backend.blockchain(), info.finalized_hash, leaf)
.expect("Route from finalized to leaf should be available; qed");
if !route.retracted().is_empty() {
continue
}
route.enacted().iter().for_each(|elem| {
if !self.freshness.contains_key(&elem.hash) {
// Use the block height value as the freshness.
self.import_counter = elem.number;
self.block_imported(elem.number, elem.hash);
}
});
counter_max = std::cmp::max(self.import_counter, counter_max);
}
log::debug!(target: "parachain", "Restored chain level monitor up to height {}", counter_max);
self.import_counter = counter_max;
}
/// Check and enforce the limit bound at the given height.
///
/// In practice this will enforce the given height in having a number of blocks less than
/// the limit passed to the constructor.
///
/// If the given level is found to have a number of blocks greater than or equal the limit
/// then the limit is enforced by chosing one (or more) blocks to remove.
///
/// The removal strategy is driven by the block freshness.
///
/// A block freshness is determined by the most recent leaf freshness descending from the block
/// itself. In other words its freshness is equal to its more "fresh" descendant.
///
/// The least "fresh" blocks are eventually removed.
pub fn enforce_limit(&mut self, number: NumberFor<Block>) {
let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
if level_len < self.level_limit {
return
}
// Sort leaves by freshness only once (less fresh first) and keep track of
// leaves that were invalidated on removal.
let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
let mut invalidated_leaves = HashSet::new();
// This may not be the most efficient way to remove **multiple** entries, but is the easy
// one :-). Should be considered that in "normal" conditions the number of blocks to remove
// is 0 or 1, it is not worth to complicate the code too much. One condition that may
// trigger multiple removals (2+) is if we restart the node using an existing db and a
// smaller limit wrt the one previously used.
let remove_count = level_len - self.level_limit + 1;
log::debug!(
target: "parachain",
"Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
);
(0..remove_count).all(|_| {
self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
self.remove_target(target, number, &leaves, &mut invalidated_leaves);
true
})
});
}
// Helper function to find the best candidate to be removed.
//
// Given a set of blocks with height equal to `number` (potential candidates)
// 1. For each candidate fetch all the leaves that are descending from it.
// 2. Set the candidate freshness equal to the fresher of its descending leaves.
// 3. The target is set as the candidate that is less fresh.
//
// Input `leaves` are assumed to be already ordered by "freshness" (less fresh first).
//
// Returns the index of the target fresher leaf within `leaves` and the route from target to
// such leaf.
fn find_target(
&self,
number: NumberFor<Block>,
leaves: &[Block::Hash],
invalidated_leaves: &HashSet<usize>,
) -> Option<TargetInfo<Block>> {
let mut target_info: Option<TargetInfo<Block>> = None;
let blockchain = self.backend.blockchain();
let best_hash = blockchain.info().best_hash;
// Leaves that where already assigned to some node and thus can be skipped
// during the search.
let mut assigned_leaves = HashSet::new();
let level = self.levels.get(&number)?;
for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
// Search for the fresher leaf information for this block
let candidate_info = leaves
.iter()
.enumerate()
.filter(|(leaf_idx, _)| {
!assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
})
.rev()
.find_map(|(leaf_idx, leaf_hash)| {
if blk_hash == leaf_hash {
let entry = HashAndNumber { number, hash: *blk_hash };
TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
freshest_leaf_idx: leaf_idx,
freshest_route,
})
} else {
match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
freshest_leaf_idx: leaf_idx,
freshest_route: route,
}),
Err(err) => {
log::warn!(
target: "parachain",
"(Lookup) Unable getting route from {:?} to {:?}: {}",
blk_hash, leaf_hash, err,
);
None
},
_ => None,
}
}
});
let candidate_info = match candidate_info {
Some(candidate_info) => {
assigned_leaves.insert(candidate_info.freshest_leaf_idx);
candidate_info
},
None => {
// This should never happen
log::error!(
target: "parachain",
"Unable getting route to any leaf from {:?} (this is a bug)",
blk_hash,
);
continue
},
};
// Found fresher leaf for this candidate.
// This candidate is set as the new target if:
// 1. its fresher leaf is less fresh than the previous target fresher leaf AND
// 2. best block is not in its route
let is_less_fresh = || {
target_info
.as_ref()
.map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
.unwrap_or(true)
};
let not_contains_best = || {
candidate_info
.freshest_route
.enacted()
.iter()
.all(|entry| entry.hash != best_hash)
};
if is_less_fresh() && not_contains_best() {
let early_stop = candidate_info.freshest_leaf_idx == 0;
target_info = Some(candidate_info);
if early_stop {
// We will never find a candidate with an worst freshest leaf than this.
break
}
}
}
target_info
}
// Remove the target block and all its descendants.
//
// Leaves should have already been ordered by "freshness" (less fresh first).
fn remove_target(
&mut self,
target: TargetInfo<Block>,
number: NumberFor<Block>,
leaves: &[Block::Hash],
invalidated_leaves: &mut HashSet<usize>,
) {
let mut remove_leaf = |number, hash| {
log::debug!(target: "parachain", "Removing block (@{}) {:?}", number, hash);
if let Err(err) = self.backend.remove_leaf_block(hash) {
log::debug!(target: "parachain", "Remove not possible for {}: {}", hash, err);
return false
}
self.levels.get_mut(&number).map(|level| level.remove(&hash));
self.freshness.remove(&hash);
true
};
invalidated_leaves.insert(target.freshest_leaf_idx);
// Takes care of route removal. Starts from the leaf and stops as soon as an error is
// encountered. In this case an error is interpreted as the block being not a leaf
// and it will be removed while removing another route from the same block but to a
// different leaf.
let mut remove_route = |route: TreeRoute<Block>| {
route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
};
let target_hash = target.freshest_route.common_block().hash;
debug_assert_eq!(
target.freshest_route.common_block().number,
number,
"This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
);
// Remove freshest (cached) route first.
remove_route(target.freshest_route);
// Don't bother trying with leaves we already found to not be our descendants.
let to_skip = leaves.len() - target.freshest_leaf_idx;
leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
if invalidated_leaves.contains(&leaf_idx) {
return
}
match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
Ok(route) if route.retracted().is_empty() => {
invalidated_leaves.insert(leaf_idx);
remove_route(route);
},
Err(err) => {
log::warn!(
target: "parachain",
"(Removal) unable getting route from {:?} to {:?}: {}",
target_hash, leaf_hash, err,
);
},
_ => (),
};
});
remove_leaf(number, target_hash);
}
/// Add a new imported block information to the monitor.
pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
self.freshness.insert(hash, self.import_counter);
self.levels.entry(number).or_default().insert(hash);
self.import_counter += One::one();
// Do cleanup once in a while, we are allowed to have some obsolete information.
let finalized_num = self.backend.blockchain().info().finalized_number;
let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
if delta >= CLEANUP_THRESHOLD {
for i in 0..delta {
let number = self.lowest_level + i.unique_saturated_into();
self.levels.remove(&number).map(|level| {
level.iter().for_each(|hash| {
self.freshness.remove(hash);
})
});
}
self.lowest_level = finalized_num;
}
}
}
+72 -17
View File
@@ -15,14 +15,23 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData};
use sc_consensus::BlockImport;
use sp_runtime::traits::Block as BlockT;
use sc_client_api::Backend;
use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::sync::Arc;
mod level_monitor;
mod parachain_consensus;
#[cfg(test)]
mod tests;
pub use parachain_consensus::run_parachain_consensus;
use level_monitor::LevelMonitor;
pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
/// The result of [`ParachainConsensus::produce_candidate`].
pub struct ParachainCandidate<B> {
/// The block that was built for this candidate.
@@ -74,47 +83,93 @@ impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send +
/// This is used to set `block_import_params.fork_choice` to `false` as long as the block origin is
/// not `NetworkInitialSync`. The best block for parachains is determined by the relay chain. Meaning
/// we will update the best block, as it is included by the relay-chain.
pub struct ParachainBlockImport<I>(I);
pub struct ParachainBlockImport<Block: BlockT, BI, BE> {
inner: BI,
monitor: Option<SharedData<LevelMonitor<Block, BE>>>,
}
impl<I> ParachainBlockImport<I> {
impl<Block: BlockT, BI, BE: Backend<Block>> ParachainBlockImport<Block, BI, BE> {
/// Create a new instance.
pub fn new(inner: I) -> Self {
Self(inner)
///
/// The number of leaves per level limit is set to `LevelLimit::Default`.
pub fn new(inner: BI, backend: Arc<BE>) -> Self {
Self::new_with_limit(inner, backend, LevelLimit::Default)
}
/// Create a new instance with an explicit limit to the number of leaves per level.
///
/// This function alone doesn't enforce the limit on levels for old imported blocks,
/// the limit is eventually enforced only when new blocks are imported.
pub fn new_with_limit(inner: BI, backend: Arc<BE>, level_leaves_max: LevelLimit) -> Self {
let level_limit = match level_leaves_max {
LevelLimit::None => None,
LevelLimit::Some(limit) => Some(limit),
LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT),
};
let monitor =
level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend)));
Self { inner, monitor }
}
}
impl<I: Clone> Clone for ParachainBlockImport<I> {
impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
fn clone(&self) -> Self {
ParachainBlockImport(self.0.clone())
ParachainBlockImport { inner: self.inner.clone(), monitor: self.monitor.clone() }
}
}
#[async_trait::async_trait]
impl<Block, I> BlockImport<Block> for ParachainBlockImport<I>
impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
where
Block: BlockT,
I: BlockImport<Block> + Send,
BI: BlockImport<Block> + Send,
BE: Backend<Block>,
{
type Error = I::Error;
type Transaction = I::Transaction;
type Error = BI::Error;
type Transaction = BI::Transaction;
async fn check_block(
&mut self,
block: sc_consensus::BlockCheckParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
self.0.check_block(block).await
self.inner.check_block(block).await
}
async fn import_block(
&mut self,
mut block_import_params: sc_consensus::BlockImportParams<Block, Self::Transaction>,
mut params: sc_consensus::BlockImportParams<Block, Self::Transaction>,
cache: std::collections::HashMap<sp_consensus::CacheKeyId, Vec<u8>>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
// Blocks are stored within the backend by using POST hash.
let hash = params.post_hash();
let number = *params.header.number();
// Best block is determined by the relay chain, or if we are doing the initial sync
// we import all blocks as new best.
block_import_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
block_import_params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
));
self.0.import_block(block_import_params, cache).await
let maybe_lock = self.monitor.as_ref().map(|monitor_lock| {
let mut monitor = monitor_lock.shared_data_locked();
monitor.enforce_limit(number);
monitor.release_mutex()
});
let res = self.inner.import_block(params, cache).await?;
if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) {
let mut monitor = monitor_lock.upgrade();
monitor.block_imported(number, hash);
}
Ok(res)
}
}
/// Marker trait denoting a block import type that fits the parachain requirements.
pub trait ParachainBlockImportMarker {}
impl<B: BlockT, BI, BE> ParachainBlockImportMarker for ParachainBlockImport<B, BI, BE> {}
@@ -15,7 +15,6 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
@@ -27,15 +26,25 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};
use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
use futures::{select, FutureExt, Stream, StreamExt};
use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};
use std::{pin::Pin, sync::Arc};
use std::{pin::Pin, sync::Arc, time::Duration};
const LOG_TARGET: &str = "cumulus-consensus";
// Delay range to trigger explicit requests.
// The chosen value doesn't have any special meaning, a random delay within the order of
// seconds in practice should be a good enough to allow a quick recovery without DOSing
// the relay chain.
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
@@ -82,7 +91,7 @@ where
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};
@@ -90,7 +99,7 @@ where
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
@@ -105,12 +114,12 @@ where
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: "cumulus-consensus",
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
@@ -136,6 +145,7 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
@@ -148,8 +158,13 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let follow_new_best =
follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block);
let follow_new_best = follow_new_best(
para_id,
parachain.clone(),
relay_chain.clone(),
announce_block,
recovery_chan_tx,
);
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
_ = follow_new_best.fuse() => {},
@@ -163,6 +178,7 @@ async fn follow_new_best<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
@@ -197,10 +213,11 @@ async fn follow_new_best<P, R, Block, B>(
h,
&*parachain,
&mut unset_best_header,
recovery_chan_tx.clone(),
).await,
None => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
"Stopping following new best.",
);
return
@@ -217,7 +234,7 @@ async fn follow_new_best<P, R, Block, B>(
).await,
None => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
@@ -276,7 +293,7 @@ async fn handle_new_block_imported<Block, P>(
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
?unset_best_header,
?notification.header,
?state,
@@ -290,6 +307,7 @@ async fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
unset_best_header: &mut Option<Block::Header>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
@@ -299,7 +317,7 @@ async fn handle_new_best_parachain_head<Block, P>(
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
error = ?err,
"Could not decode Parachain header while following best heads.",
);
@@ -311,7 +329,7 @@ async fn handle_new_best_parachain_head<Block, P>(
if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
)
@@ -325,7 +343,7 @@ async fn handle_new_best_parachain_head<Block, P>(
},
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: "cumulus-collator",
target: LOG_TARGET,
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
@@ -334,14 +352,30 @@ async fn handle_new_best_parachain_head<Block, P>(
*unset_best_header = Some(parachain_head);
tracing::debug!(
target: "cumulus-collator",
target: LOG_TARGET,
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
// Best effort channel to actively encourage block recovery.
// An error here is not fatal; the relay chain continuously re-announces
// the best block, thus we will have other opportunities to retry.
let req =
RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
if let Err(err) = recovery_chan_tx.try_send(req) {
tracing::warn!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Unable to notify block recovery subsystem"
)
}
}
},
Err(e) => {
tracing::error!(
target: "cumulus-collator",
target: LOG_TARGET,
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
@@ -361,7 +395,7 @@ where
let best_number = parachain.usage_info().chain.best_number;
if *header.number() < best_number {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
%best_number,
block_number = %header.number(),
"Skipping importing block as new best block, because there already exists a \
@@ -377,7 +411,7 @@ where
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await {
tracing::warn!(
target: "cumulus-consensus",
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
+353 -16
View File
@@ -18,6 +18,7 @@ use crate::*;
use async_trait::async_trait;
use codec::Encode;
use cumulus_client_pov_recovery::RecoveryKind;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
@@ -26,10 +27,10 @@ use cumulus_test_client::{
use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use polkadot_primitives::v2::Id as ParaId;
use sc_client_api::UsageProvider;
use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::Error as ClientError;
use sp_consensus::BlockOrigin;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::generic::BlockId;
use std::{
sync::{Arc, Mutex},
@@ -103,21 +104,82 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
}
}
fn build_and_import_block(mut client: Arc<Client>, import_as_best: bool) -> Block {
let builder = client.init_block_builder(None, Default::default());
fn build_block<B: InitBlockBuilder>(
builder: &B,
at: Option<BlockId<Block>>,
timestamp: Option<u64>,
) -> Block {
let builder = match at {
Some(at) => match timestamp {
Some(ts) =>
builder.init_block_builder_with_timestamp(&at, None, Default::default(), ts),
None => builder.init_block_builder_at(&at, None, Default::default()),
},
None => builder.init_block_builder(None, Default::default()),
};
let block = builder.build().unwrap().block;
let (header, body) = block.clone().deconstruct();
let mut block = builder.build().unwrap().block;
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
block_import_params.body = Some(body);
block_on(client.import_block(block_import_params, Default::default())).unwrap();
// Simulate some form of post activity (like a Seal or Other generic things).
// This is mostly used to excercise the `LevelMonitor` correct behavior.
// (in practice we want that header post-hash != pre-hash)
block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3]));
block
}
async fn import_block<I: BlockImport<Block>>(
importer: &mut I,
block: Block,
origin: BlockOrigin,
import_as_best: bool,
) {
let (mut header, body) = block.deconstruct();
let post_digest =
header.digest.pop().expect("post digested is present in manually crafted block");
let mut block_import_params = BlockImportParams::new(origin, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
block_import_params.body = Some(body);
block_import_params.post_digests.push(post_digest);
importer.import_block(block_import_params, Default::default()).await.unwrap();
}
fn import_block_sync<I: BlockImport<Block>>(
importer: &mut I,
block: Block,
origin: BlockOrigin,
import_as_best: bool,
) {
block_on(import_block(importer, block, origin, import_as_best));
}
fn build_and_import_block_ext<B: InitBlockBuilder, I: BlockImport<Block>>(
builder: &B,
origin: BlockOrigin,
import_as_best: bool,
importer: &mut I,
at: Option<BlockId<Block>>,
timestamp: Option<u64>,
) -> Block {
let block = build_block(builder, at, timestamp);
import_block_sync(importer, block.clone(), origin, import_as_best);
block
}
fn build_and_import_block(mut client: Arc<Client>, import_as_best: bool) -> Block {
build_and_import_block_ext(
&*client.clone(),
BlockOrigin::Own,
import_as_best,
&mut client,
None,
None,
)
}
#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
@@ -129,7 +191,7 @@ fn follow_new_best_works() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -152,6 +214,68 @@ fn follow_new_best_works() {
});
}
#[test]
fn follow_new_best_with_dummy_recovery_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3);
let consensus = run_parachain_consensus(
100.into(),
client.clone(),
relay_chain,
Arc::new(|_, _| {}),
Some(recovery_chan_tx),
);
let block = build_block(&*client.clone(), None, None);
let block_clone = block.clone();
let client_clone = client.clone();
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
match client.block_status(&BlockId::Hash(block.hash())).unwrap() {
BlockStatus::Unknown => {},
status => {
assert_eq!(block.hash(), client.usage_info().chain.best_hash);
assert_eq!(status, BlockStatus::InChainWithState);
break
},
}
}
};
let dummy_block_recovery = async move {
loop {
if let Some(req) = recovery_chan_rx.next().await {
assert_eq!(req.hash, block_clone.hash());
assert_eq!(req.kind, RecoveryKind::Full);
Delay::new(Duration::from_millis(500)).await;
import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true)
.await;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = dummy_block_recovery.fuse() => {},
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
@@ -163,7 +287,7 @@ fn follow_finalized_works() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
finalized_sender.unbounded_send(block.header().clone()).unwrap();
@@ -204,7 +328,7 @@ fn follow_finalized_does_not_stop_on_unknown_block() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
for _ in 0..3usize {
@@ -254,7 +378,7 @@ fn follow_new_best_sets_best_after_it_is_imported() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -331,7 +455,7 @@ fn do_not_set_best_block_to_older_block() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let client2 = client.clone();
let work = async move {
@@ -355,3 +479,216 @@ fn do_not_set_best_block_to_older_block() {
// Build and import a new best block.
build_and_import_block(client2.clone(), true);
}
#[test]
fn prune_blocks_on_level_overflow() {
// Here we are using the timestamp value to generate blocks with different hashes.
const LEVEL_LIMIT: usize = 3;
const TIMESTAMP_MULTIPLIER: u64 = 60000;
let backend = Arc::new(Backend::new_test(1000, 3));
let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build());
let mut para_import = ParachainBlockImport::new_with_limit(
client.clone(),
backend.clone(),
LevelLimit::Some(LEVEL_LIMIT),
);
let block0 = build_and_import_block_ext(
&*client,
BlockOrigin::NetworkInitialSync,
true,
&mut para_import,
None,
None,
);
let id0 = BlockId::Hash(block0.header.hash());
let blocks1 = (0..LEVEL_LIMIT)
.into_iter()
.map(|i| {
build_and_import_block_ext(
&*client,
if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own },
i == 1,
&mut para_import,
Some(id0),
Some(i as u64 * TIMESTAMP_MULTIPLIER),
)
})
.collect::<Vec<_>>();
let id10 = BlockId::Hash(blocks1[0].header.hash());
let blocks2 = (0..2)
.into_iter()
.map(|i| {
build_and_import_block_ext(
&*client,
BlockOrigin::Own,
false,
&mut para_import,
Some(id10),
Some(i as u64 * TIMESTAMP_MULTIPLIER),
)
})
.collect::<Vec<_>>();
// Initial scenario (with B11 imported as best)
//
// B0 --+-- B10 --+-- B20
// +-- B11 +-- B21
// +-- B12
let leaves = backend.blockchain().leaves().unwrap();
let mut expected = vec![
blocks2[0].header.hash(),
blocks2[1].header.hash(),
blocks1[1].header.hash(),
blocks1[2].header.hash(),
];
assert_eq!(leaves, expected);
let best = client.usage_info().chain.best_hash;
assert_eq!(best, blocks1[1].header.hash());
let block13 = build_and_import_block_ext(
&*client,
BlockOrigin::Own,
false,
&mut para_import,
Some(id0),
Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
);
// Expected scenario
//
// B0 --+-- B10 --+-- B20
// +-- B11 +-- B21
// +--(B13) <-- B12 has been replaced
let leaves = backend.blockchain().leaves().unwrap();
expected[3] = block13.header.hash();
assert_eq!(leaves, expected);
let block14 = build_and_import_block_ext(
&*client,
BlockOrigin::Own,
false,
&mut para_import,
Some(id0),
Some(2 * LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
);
// Expected scenario
//
// B0 --+--(B14) <-- B10 has been replaced
// +-- B11
// +--(B13)
let leaves = backend.blockchain().leaves().unwrap();
expected.remove(0);
expected.remove(0);
expected.push(block14.header.hash());
assert_eq!(leaves, expected);
}
#[test]
fn restore_limit_monitor() {
// Here we are using the timestamp value to generate blocks with different hashes.
const LEVEL_LIMIT: usize = 2;
const TIMESTAMP_MULTIPLIER: u64 = 60000;
let backend = Arc::new(Backend::new_test(1000, 3));
let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build());
// Start with a block import not enforcing any limit...
let mut para_import = ParachainBlockImport::new_with_limit(
client.clone(),
backend.clone(),
LevelLimit::Some(usize::MAX),
);
let block00 = build_and_import_block_ext(
&*client,
BlockOrigin::NetworkInitialSync,
true,
&mut para_import,
None,
None,
);
let id00 = BlockId::Hash(block00.header.hash());
let blocks1 = (0..LEVEL_LIMIT + 1)
.into_iter()
.map(|i| {
build_and_import_block_ext(
&*client,
if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own },
i == 1,
&mut para_import,
Some(id00),
Some(i as u64 * TIMESTAMP_MULTIPLIER),
)
})
.collect::<Vec<_>>();
let id10 = BlockId::Hash(blocks1[0].header.hash());
let _ = (0..LEVEL_LIMIT)
.into_iter()
.map(|i| {
build_and_import_block_ext(
&*client,
BlockOrigin::Own,
false,
&mut para_import,
Some(id10),
Some(i as u64 * TIMESTAMP_MULTIPLIER),
)
})
.collect::<Vec<_>>();
// Scenario before limit application (with B11 imported as best)
// Import order (freshess): B00, B10, B11, B12, B20, B21
//
// B00 --+-- B10 --+-- B20
// | +-- B21
// +-- B11
// |
// +-- B12
// Simulate a restart by forcing a new monitor structure instance
let mut para_import = ParachainBlockImport::new_with_limit(
client.clone(),
backend.clone(),
LevelLimit::Some(LEVEL_LIMIT),
);
let block13 = build_and_import_block_ext(
&*client,
BlockOrigin::Own,
false,
&mut para_import,
Some(id00),
Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
);
// Expected scenario
//
// B0 --+-- B11
// +--(B13)
let leaves = backend.blockchain().leaves().unwrap();
let expected = vec![blocks1[1].header.hash(), block13.header.hash()];
assert_eq!(leaves, expected);
let monitor = para_import.monitor.unwrap();
let monitor = monitor.shared_data();
assert_eq!(monitor.import_counter, 5);
assert!(monitor.levels.iter().all(|(number, hashes)| {
hashes
.iter()
.filter(|hash| **hash != block13.header.hash())
.all(|hash| *number == *monitor.freshness.get(hash).unwrap())
}));
assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter - 1);
}