Clean obsolete BABE's weight data (#10748)

* Clean obsolete BABE weight data
* Take out test assertion from check closure
* Optimize metadata access using `HeaderMetadata` trait
* Apply suggestions from code review
* Introduce finalize and import pre-commit synchronous actions
* Do not hold locks between internal methods calls
* Remove unused generic bound
* Apply suggestions from code review
* Register BABE's pre-commit actions on `block_import` instead of `start_babe`
* PreCommit actions should be `Fn` instead of `FnMut`
* More robust safenet in case of malformed finality notifications

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Davide Galassi
2022-02-23 19:58:33 +01:00
committed by GitHub
parent a2b80edf12
commit 26ec5d71c8
6 changed files with 330 additions and 95 deletions
+91 -14
View File
@@ -66,7 +66,16 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::{borrow::Cow, collections::HashMap, convert::TryInto, pin::Pin, sync::Arc, u64};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
convert::TryInto,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use codec::{Decode, Encode};
use futures::{
@@ -82,7 +91,10 @@ use prometheus_endpoint::Registry;
use retain_mut::RetainMut;
use schnorrkel::SignatureError;
use sc_client_api::{backend::AuxStore, BlockchainEvents, ProvideUncles, UsageProvider};
use sc_client_api::{
backend::AuxStore, AuxDataOperations, BlockchainEvents, FinalityNotification, PreCommitActions,
ProvideUncles, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
@@ -98,7 +110,7 @@ use sc_consensus_slots::{
SlotInfo, StorageChanges,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
use sp_api::{ApiExt, NumberFor, ProvideRuntimeApi};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppKey;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
@@ -113,7 +125,7 @@ use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvid
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header, Zero},
traits::{Block as BlockT, Header, NumberFor, One, SaturatedConversion, Saturating, Zero},
DigestItem,
};
@@ -458,6 +470,7 @@ where
C: ProvideRuntimeApi<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ PreCommitActions<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
@@ -501,7 +514,8 @@ where
};
info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(
let slot_worker = sc_consensus_slots::start_slot_worker(
babe_link.config.slot_duration(),
select_chain,
worker,
@@ -515,13 +529,69 @@ where
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes.clone());
let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests));
Ok(BabeWorker {
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
inner: Box::pin(inner.map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}
// Remove obsolete block's weight data by leveraging finality notifications.
// This includes data for all finalized blocks (excluding the most recent one)
// and all stale branches.
fn aux_storage_cleanup<C: HeaderMetadata<Block>, Block: BlockT>(
client: &C,
notification: &FinalityNotification<Block>,
) -> AuxDataOperations {
let mut aux_keys = HashSet::new();
// Cleans data for finalized block's ancestors down to, and including, the previously
// finalized one.
let first_new_finalized = notification.tree_route.get(0).unwrap_or(&notification.hash);
match client.header_metadata(*first_new_finalized) {
Ok(meta) => {
aux_keys.insert(aux_schema::block_weight_key(meta.parent));
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", first_new_finalized.to_string(), err.to_string());
},
}
aux_keys.extend(notification.tree_route.iter().map(aux_schema::block_weight_key));
// Cleans data for stale branches.
// A safenet in case of malformed notification.
let height_limit = notification.header.number().saturating_sub(
notification.tree_route.len().saturated_into::<NumberFor<Block>>() + One::one(),
);
for head in notification.stale_heads.iter() {
let mut hash = *head;
// Insert stale blocks hashes until canonical chain is not reached.
// Soon or late we should hit an element already present within the `aux_keys` set.
while aux_keys.insert(aux_schema::block_weight_key(hash)) {
match client.header_metadata(hash) {
Ok(meta) => {
// This should never happen and must be considered a bug.
if meta.number <= height_limit {
warn!(target: "babe", "unexpected canonical chain state or malformed finality notification");
break
}
hash = meta.parent;
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", head.to_string(), err.to_string());
break
},
}
}
}
aux_keys.into_iter().map(|val| (val, None)).collect()
}
async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
config: Config,
@@ -604,7 +674,7 @@ impl<B: BlockT> BabeWorkerHandle<B> {
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>,
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
@@ -628,13 +698,10 @@ impl<B: BlockT> BabeWorker<B> {
}
}
impl<B: BlockT> futures::Future for BabeWorker<B> {
impl<B: BlockT> Future for BabeWorker<B> {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context,
) -> futures::task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
@@ -857,7 +924,7 @@ where
self.telemetry.clone()
}
fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> std::time::Duration {
fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
sc_consensus_slots::proposing_remaining_duration(
@@ -1683,7 +1750,11 @@ pub fn block_import<Client, Block: BlockT, I>(
client: Arc<Client>,
) -> ClientResult<(BabeBlockImport<Block, Client, I>, BabeLink<Block>)>
where
Client: AuxStore + HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: AuxStore
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ PreCommitActions<Block>
+ 'static,
{
let epoch_changes =
aux_schema::load_epoch_changes::<Block, _>(&*client, &config.genesis_config)?;
@@ -1694,6 +1765,12 @@ where
// startup rather than waiting until importing the next epoch change block.
prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
let client_clone = client.clone();
let on_finality = move |summary: &FinalityNotification<Block>| {
aux_storage_cleanup(client_clone.as_ref(), summary)
};
client.register_finality_action(Box::new(on_finality));
let import = BabeBlockImport::new(client, epoch_changes, wrapped_block_import, config);
Ok((import, link))
+100 -32
View File
@@ -28,7 +28,7 @@ use log::debug;
use rand::RngCore;
use rand_chacha::{rand_core::SeedableRng, ChaChaRng};
use sc_block_builder::{BlockBuilder, BlockBuilderProvider};
use sc_client_api::{backend::TransactionFor, BlockchainEvents};
use sc_client_api::{backend::TransactionFor, BlockchainEvents, Finalizer};
use sc_consensus::{BoxBlockImport, BoxJustificationImport};
use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging;
use sc_keystore::LocalKeystore;
@@ -608,8 +608,8 @@ fn propose_and_import_block<Transaction: Send + 'static>(
slot: Option<Slot>,
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
) -> sp_core::H256 {
let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap();
) -> Hash {
let mut proposer = block_on(proposer_factory.init(parent)).unwrap();
let slot = slot.unwrap_or_else(|| {
let parent_pre_digest = find_pre_digest::<TestBlock>(parent).unwrap();
@@ -625,7 +625,7 @@ fn propose_and_import_block<Transaction: Send + 'static>(
let parent_hash = parent.hash();
let mut block = futures::executor::block_on(proposer.propose_with(pre_digest)).unwrap().block;
let mut block = block_on(proposer.propose_with(pre_digest)).unwrap().block;
let epoch_descriptor = proposer_factory
.epoch_changes
@@ -673,6 +673,29 @@ fn propose_and_import_block<Transaction: Send + 'static>(
post_hash
}
// Propose and import n valid BABE blocks that are built on top of the given parent.
// The proposer takes care of producing epoch change digests according to the epoch
// duration (which is set to 6 slots in the test runtime).
fn propose_and_import_blocks<Transaction: Send + 'static>(
client: &PeersFullClient,
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
parent_id: BlockId<TestBlock>,
n: usize,
) -> Vec<Hash> {
let mut hashes = Vec::with_capacity(n);
let mut parent_header = client.header(&parent_id).unwrap().unwrap();
for _ in 0..n {
let block_hash =
propose_and_import_block(&parent_header, None, proposer_factory, block_import);
hashes.push(block_hash);
parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap();
}
hashes
}
#[test]
fn importing_block_one_sets_genesis_epoch() {
let mut net = BabeTestNet::new(1);
@@ -714,8 +737,6 @@ fn importing_block_one_sets_genesis_epoch() {
#[test]
fn importing_epoch_change_block_prunes_tree() {
use sc_client_api::Finalizer;
let mut net = BabeTestNet::new(1);
let peer = net.peer(0);
@@ -732,26 +753,8 @@ fn importing_epoch_change_block_prunes_tree() {
mutator: Arc::new(|_, _| ()),
};
// This is just boilerplate code for proposing and importing n valid BABE
// blocks that are built on top of the given parent. The proposer takes care
// of producing epoch change digests according to the epoch duration (which
// is set to 6 slots in the test runtime).
let mut propose_and_import_blocks = |parent_id, n| {
let mut hashes = Vec::new();
let mut parent_header = client.header(&parent_id).unwrap().unwrap();
for _ in 0..n {
let block_hash = propose_and_import_block(
&parent_header,
None,
&mut proposer_factory,
&mut block_import,
);
hashes.push(block_hash);
parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap();
}
hashes
let mut propose_and_import_blocks_wrap = |parent_id, n| {
propose_and_import_blocks(&client, &mut proposer_factory, &mut block_import, parent_id, n)
};
// This is the block tree that we're going to use in this test. Each node
@@ -766,12 +769,12 @@ fn importing_epoch_change_block_prunes_tree() {
// Create and import the canon chain and keep track of fork blocks (A, C, D)
// from the diagram above.
let canon_hashes = propose_and_import_blocks(BlockId::Number(0), 30);
let canon_hashes = propose_and_import_blocks_wrap(BlockId::Number(0), 30);
// Create the forks
let fork_1 = propose_and_import_blocks(BlockId::Hash(canon_hashes[0]), 10);
let fork_2 = propose_and_import_blocks(BlockId::Hash(canon_hashes[12]), 15);
let fork_3 = propose_and_import_blocks(BlockId::Hash(canon_hashes[18]), 10);
let fork_1 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[0]), 10);
let fork_2 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[12]), 15);
let fork_3 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[18]), 10);
// We should be tracking a total of 9 epochs in the fork tree
assert_eq!(epoch_changes.shared_data().tree().iter().count(), 9);
@@ -782,7 +785,7 @@ fn importing_epoch_change_block_prunes_tree() {
// We finalize block #13 from the canon chain, so on the next epoch
// change the tree should be pruned, to not contain F (#7).
client.finalize_block(BlockId::Hash(canon_hashes[12]), None, false).unwrap();
propose_and_import_blocks(BlockId::Hash(client.chain_info().best_hash), 7);
propose_and_import_blocks_wrap(BlockId::Hash(client.chain_info().best_hash), 7);
// at this point no hashes from the first fork must exist on the tree
assert!(!epoch_changes
@@ -809,7 +812,7 @@ fn importing_epoch_change_block_prunes_tree() {
// finalizing block #25 from the canon chain should prune out the second fork
client.finalize_block(BlockId::Hash(canon_hashes[24]), None, false).unwrap();
propose_and_import_blocks(BlockId::Hash(client.chain_info().best_hash), 8);
propose_and_import_blocks_wrap(BlockId::Hash(client.chain_info().best_hash), 8);
// at this point no hashes from the second fork must exist on the tree
assert!(!epoch_changes
@@ -894,3 +897,68 @@ fn babe_transcript_generation_match() {
};
debug_assert!(test(orig_transcript) == test(transcript_from_data(new_transcript)));
}
#[test]
fn obsolete_blocks_aux_data_cleanup() {
let mut net = BabeTestNet::new(1);
let peer = net.peer(0);
let data = peer.data.as_ref().expect("babe link set up during initialization");
let client = peer.client().as_client();
// Register the handler (as done by `babe_start`)
let client_clone = client.clone();
let on_finality = move |summary: &FinalityNotification<TestBlock>| {
aux_storage_cleanup(client_clone.as_ref(), summary)
};
client.register_finality_action(Box::new(on_finality));
let mut proposer_factory = DummyFactory {
client: client.clone(),
config: data.link.config.clone(),
epoch_changes: data.link.epoch_changes.clone(),
mutator: Arc::new(|_, _| ()),
};
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut propose_and_import_blocks_wrap = |parent_id, n| {
propose_and_import_blocks(&client, &mut proposer_factory, &mut block_import, parent_id, n)
};
let aux_data_check = |hashes: &[Hash], expected: bool| {
hashes.iter().all(|hash| {
aux_schema::load_block_weight(&*peer.client().as_backend(), hash)
.unwrap()
.is_some() == expected
})
};
// Create the following test scenario:
//
// /-----B3 --- B4 ( < fork2 )
// G --- A1 --- A2 --- A3 --- A4 ( < fork1 )
// \-----C4 --- C5 ( < fork3 )
let fork1_hashes = propose_and_import_blocks_wrap(BlockId::Number(0), 4);
let fork2_hashes = propose_and_import_blocks_wrap(BlockId::Number(2), 2);
let fork3_hashes = propose_and_import_blocks_wrap(BlockId::Number(3), 2);
// Check that aux data is present for all but the genesis block.
assert!(aux_data_check(&[client.chain_info().genesis_hash], false));
assert!(aux_data_check(&fork1_hashes, true));
assert!(aux_data_check(&fork2_hashes, true));
assert!(aux_data_check(&fork3_hashes, true));
// Finalize A3
client.finalize_block(BlockId::Number(3), None, true).unwrap();
// Wiped: A1, A2
assert!(aux_data_check(&fork1_hashes[..2], false));
// Present: A3, A4
assert!(aux_data_check(&fork1_hashes[2..], true));
// Wiped: B3, B4
assert!(aux_data_check(&fork2_hashes, false));
// Present C4, C5
assert!(aux_data_check(&fork3_hashes, true));
}