mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
babe: ancient epoch tree pruning (#3746)
* babe: prune epoch tree when importing a new epoch change * fork-tree: fix tree pruning * babe: actually prune epoch change fork tree * Fix typos * babe: add test for epoch tree pruning * fork-tree: fix pruning of stale forks
This commit is contained in:
@@ -190,23 +190,35 @@ impl<Hash, Number> EpochChanges<Hash, Number> where
|
||||
EpochChanges { inner: ForkTree::new() }
|
||||
}
|
||||
|
||||
/// Prune out finalized epochs, except for the ancestor of the finalized block.
|
||||
/// Prune out finalized epochs, except for the ancestor of the finalized
|
||||
/// block. The given slot should be the slot number at which the finalized
|
||||
/// block was authored.
|
||||
pub fn prune_finalized<D: IsDescendentOfBuilder<Hash>>(
|
||||
&mut self,
|
||||
descendent_of_builder: D,
|
||||
_hash: &Hash,
|
||||
_number: Number,
|
||||
hash: &Hash,
|
||||
number: Number,
|
||||
slot: SlotNumber,
|
||||
) -> Result<(), fork_tree::Error<D::Error>> {
|
||||
let _is_descendent_of = descendent_of_builder
|
||||
let is_descendent_of = descendent_of_builder
|
||||
.build_is_descendent_of(None);
|
||||
|
||||
// TODO:
|
||||
// https://github.com/paritytech/substrate/issues/3651
|
||||
//
|
||||
let predicate = |epoch: &PersistedEpoch| match *epoch {
|
||||
PersistedEpoch::Genesis(_, ref epoch_1) =>
|
||||
slot >= epoch_1.end_slot(),
|
||||
PersistedEpoch::Regular(ref epoch_n) =>
|
||||
slot >= epoch_n.end_slot(),
|
||||
};
|
||||
|
||||
// prune any epochs which could not be _live_ as of the children of the
|
||||
// finalized block.
|
||||
// i.e. re-root the fork tree to the oldest ancestor of (hash, number)
|
||||
// where epoch.end_slot() >= slot(hash)
|
||||
// finalized block, i.e. re-root the fork tree to the oldest ancestor of
|
||||
// (hash, number) where epoch.end_slot() >= finalized_slot
|
||||
self.inner.prune(
|
||||
hash,
|
||||
&number,
|
||||
&is_descendent_of,
|
||||
&predicate,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -300,6 +312,12 @@ impl<Hash, Number> EpochChanges<Hash, Number> where
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the inner fork tree, useful for testing purposes.
|
||||
#[cfg(test)]
|
||||
pub fn tree(&self) -> &ForkTree<Hash, Number, PersistedEpoch> {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
/// Type alias to produce the epoch-changes tree from a block type.
|
||||
|
||||
@@ -254,24 +254,6 @@ pub fn start_babe<B, C, SC, E, I, SO, Error>(BabeParams {
|
||||
&inherent_data_providers,
|
||||
)?;
|
||||
|
||||
let epoch_changes = babe_link.epoch_changes.clone();
|
||||
let pruning_task = client.finality_notification_stream()
|
||||
.for_each(move |notification| {
|
||||
// TODO: supply is-descendent-of and maybe write to disk _now_
|
||||
// as opposed to waiting for the next epoch?
|
||||
let res = epoch_changes.lock().prune_finalized(
|
||||
descendent_query(&*client),
|
||||
¬ification.hash,
|
||||
*notification.header.number(),
|
||||
);
|
||||
|
||||
if let Err(e) = res {
|
||||
babe_err!("Could not prune expired epoch changes: {:?}", e);
|
||||
}
|
||||
|
||||
future::ready(())
|
||||
});
|
||||
|
||||
babe_info!("Starting BABE Authorship worker");
|
||||
let slot_worker = slots::start_slot_worker(
|
||||
config.0,
|
||||
@@ -280,9 +262,9 @@ pub fn start_babe<B, C, SC, E, I, SO, Error>(BabeParams {
|
||||
sync_oracle,
|
||||
inherent_data_providers,
|
||||
babe_link.time_source,
|
||||
).map(|_| ());
|
||||
);
|
||||
|
||||
Ok(future::select(slot_worker, pruning_task).map(|_| Ok::<(), ()>(())).compat())
|
||||
Ok(slot_worker.map(|_| Ok::<(), ()>(())).compat())
|
||||
}
|
||||
|
||||
struct BabeWorker<B: BlockT, C, E, I, SO> {
|
||||
@@ -889,6 +871,8 @@ impl<B, E, Block, I, RA, PRA> BlockImport<Block> for BabeBlockImport<B, E, Block
|
||||
// this way we can revert it if there's any error
|
||||
let mut old_epoch_changes = None;
|
||||
|
||||
let info = self.client.info().chain;
|
||||
|
||||
if let Some(next_epoch_descriptor) = next_epoch_digest {
|
||||
let next_epoch = epoch.increment(next_epoch_descriptor);
|
||||
|
||||
@@ -898,21 +882,48 @@ impl<B, E, Block, I, RA, PRA> BlockImport<Block> for BabeBlockImport<B, E, Block
|
||||
epoch.as_ref().epoch_index, hash, slot_number, epoch.as_ref().start_slot);
|
||||
babe_info!("Next epoch starts at slot {}", next_epoch.as_ref().start_slot);
|
||||
|
||||
// track the epoch change in the fork tree
|
||||
let res = epoch_changes.import(
|
||||
descendent_query(&*self.client),
|
||||
hash,
|
||||
number,
|
||||
*block.header.parent_hash(),
|
||||
next_epoch,
|
||||
);
|
||||
// prune the tree of epochs not part of the finalized chain or
|
||||
// that are not live anymore, and then track the given epoch change
|
||||
// in the tree.
|
||||
// NOTE: it is important that these operations are done in this
|
||||
// order, otherwise if pruning after import the `is_descendent_of`
|
||||
// used by pruning may not know about the block that is being
|
||||
// imported.
|
||||
let prune_and_import = || {
|
||||
let finalized_slot = {
|
||||
let finalized_header = self.client.header(&BlockId::Hash(info.finalized_hash))
|
||||
.map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?
|
||||
.expect("best finalized hash was given by client; \
|
||||
finalized headers must exist in db; qed");
|
||||
|
||||
find_pre_digest::<Block>(&finalized_header)
|
||||
.expect("finalized header must be valid; \
|
||||
valid blocks have a pre-digest; qed")
|
||||
.slot_number()
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
let err = ConsensusError::ClientImport(format!("{:?}", e));
|
||||
epoch_changes.prune_finalized(
|
||||
descendent_query(&*self.client),
|
||||
&info.finalized_hash,
|
||||
info.finalized_number,
|
||||
finalized_slot,
|
||||
).map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?;
|
||||
|
||||
epoch_changes.import(
|
||||
descendent_query(&*self.client),
|
||||
hash,
|
||||
number,
|
||||
*block.header.parent_hash(),
|
||||
next_epoch,
|
||||
).map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?;
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = prune_and_import() {
|
||||
babe_err!("Failed to launch next epoch: {:?}", e);
|
||||
*epoch_changes = old_epoch_changes.expect("set `Some` above and not taken; qed");
|
||||
return Err(err);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
crate::aux_schema::write_epoch_changes::<Block, _, _>(
|
||||
@@ -935,10 +946,7 @@ impl<B, E, Block, I, RA, PRA> BlockImport<Block> for BabeBlockImport<B, E, Block
|
||||
// more primary blocks), if there's a tie we go with the longest
|
||||
// chain.
|
||||
block.fork_choice = {
|
||||
let (last_best, last_best_number) = {
|
||||
let info = self.client.info().chain;
|
||||
(info.best_hash, info.best_number)
|
||||
};
|
||||
let (last_best, last_best_number) = (info.best_hash, info.best_number);
|
||||
|
||||
let last_best_weight = if &last_best == block.header.parent_hash() {
|
||||
// the parent=genesis case is already covered for loading parent weight,
|
||||
|
||||
@@ -103,6 +103,7 @@ impl DummyProposer {
|
||||
&BlockId::Hash(self.parent_hash),
|
||||
pre_digests,
|
||||
).unwrap();
|
||||
|
||||
let mut block = match block_builder.bake().map_err(|e| e.into()) {
|
||||
Ok(b) => b,
|
||||
Err(e) => return future::ready(Err(e)),
|
||||
@@ -597,3 +598,163 @@ fn importing_block_one_sets_genesis_epoch() {
|
||||
).unwrap().unwrap().into_inner();
|
||||
assert_eq!(epoch_for_second_block, genesis_epoch);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn importing_epoch_change_block_prunes_tree() {
|
||||
use client::backend::Finalizer;
|
||||
|
||||
let mut net = BabeTestNet::new(1);
|
||||
|
||||
let peer = net.peer(0);
|
||||
let data = peer.data.as_ref().expect("babe link set up during initialization");
|
||||
|
||||
let client = peer.client().as_full().expect("Only full clients are used in tests").clone();
|
||||
let mut block_import = data.block_import.lock().take().expect("import set up during init");
|
||||
let epoch_changes = data.link.epoch_changes.clone();
|
||||
|
||||
// This is just boilerplate code for proposing and importing a valid BABE
|
||||
// block that's built on top of the given parent. The proposer takes care
|
||||
// of producing epoch change digests according to the epoch duration (which
|
||||
// is set to 6 slots in the test runtime).
|
||||
let mut propose_and_import_block = |parent_header| {
|
||||
let mut environ = DummyFactory {
|
||||
client: client.clone(),
|
||||
config: data.link.config.clone(),
|
||||
epoch_changes: data.link.epoch_changes.clone(),
|
||||
mutator: Arc::new(|_, _| ()),
|
||||
};
|
||||
|
||||
let mut proposer = environ.init(&parent_header).unwrap();
|
||||
let parent_pre_digest = find_pre_digest::<Block>(&parent_header).unwrap();
|
||||
|
||||
let pre_digest = sr_primitives::generic::Digest {
|
||||
logs: vec![
|
||||
Item::babe_pre_digest(
|
||||
BabePreDigest::Secondary {
|
||||
authority_index: 0,
|
||||
slot_number: parent_pre_digest.slot_number() + 1,
|
||||
},
|
||||
),
|
||||
],
|
||||
};
|
||||
|
||||
let mut block = futures::executor::block_on(proposer.propose_with(pre_digest)).unwrap();
|
||||
|
||||
let seal = {
|
||||
// sign the pre-sealed hash of the block and then
|
||||
// add it to a digest item.
|
||||
let pair = AuthorityPair::from_seed(&[1; 32]);
|
||||
let pre_hash = block.header.hash();
|
||||
let signature = pair.sign(pre_hash.as_ref());
|
||||
Item::babe_seal(signature)
|
||||
};
|
||||
|
||||
let post_hash = {
|
||||
block.header.digest_mut().push(seal.clone());
|
||||
let h = block.header.hash();
|
||||
block.header.digest_mut().pop();
|
||||
h
|
||||
};
|
||||
|
||||
let next_epoch_digest =
|
||||
find_next_epoch_digest::<Block>(&block.header).unwrap();
|
||||
|
||||
let import_result = block_import.import_block(
|
||||
BlockImportParams {
|
||||
origin: BlockOrigin::Own,
|
||||
header: block.header,
|
||||
justification: None,
|
||||
post_digests: vec![seal],
|
||||
body: Some(block.extrinsics),
|
||||
finalized: false,
|
||||
auxiliary: Vec::new(),
|
||||
fork_choice: ForkChoiceStrategy::LongestChain,
|
||||
},
|
||||
Default::default(),
|
||||
).unwrap();
|
||||
|
||||
match import_result {
|
||||
ImportResult::Imported(_) => {},
|
||||
_ => panic!("expected block to be imported"),
|
||||
}
|
||||
|
||||
(post_hash, next_epoch_digest)
|
||||
};
|
||||
|
||||
let mut propose_and_import_blocks = |parent_id, n| {
|
||||
let mut hashes = Vec::new();
|
||||
let mut parent_header = client.header(&parent_id).unwrap().unwrap();
|
||||
|
||||
for _ in 0..n {
|
||||
let (block_hash, _) = propose_and_import_block(parent_header);
|
||||
hashes.push(block_hash);
|
||||
parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap();
|
||||
}
|
||||
|
||||
hashes
|
||||
};
|
||||
|
||||
// This is the block tree that we're going to use in this test. Each node
|
||||
// represents an epoch change block, the epoch duration is 6 slots.
|
||||
//
|
||||
// *---- F (#7)
|
||||
// / *------ G (#19) - H (#25)
|
||||
// / /
|
||||
// A (#1) - B (#7) - C (#13) - D (#19) - E (#25)
|
||||
// \
|
||||
// *------ I (#25)
|
||||
|
||||
// Create and import the canon chain and keep track of fork blocks (A, C, D)
|
||||
// from the diagram above.
|
||||
let canon_hashes = propose_and_import_blocks(BlockId::Number(0), 30);
|
||||
|
||||
// Create the forks
|
||||
let fork_1 = propose_and_import_blocks(BlockId::Hash(canon_hashes[0]), 10);
|
||||
let fork_2 = propose_and_import_blocks(BlockId::Hash(canon_hashes[12]), 15);
|
||||
let fork_3 = propose_and_import_blocks(BlockId::Hash(canon_hashes[18]), 10);
|
||||
|
||||
// We should be tracking a total of 9 epochs in the fork tree
|
||||
assert_eq!(
|
||||
epoch_changes.lock().tree().iter().count(),
|
||||
9,
|
||||
);
|
||||
|
||||
// And only one root
|
||||
assert_eq!(
|
||||
epoch_changes.lock().tree().roots().count(),
|
||||
1,
|
||||
);
|
||||
|
||||
// We finalize block #13 from the canon chain, so on the next epoch
|
||||
// change the tree should be pruned, to not contain F (#7).
|
||||
client.finalize_block(BlockId::Hash(canon_hashes[12]), None, false).unwrap();
|
||||
propose_and_import_blocks(BlockId::Hash(client.info().chain.best_hash), 7);
|
||||
|
||||
// at this point no hashes from the first fork must exist on the tree
|
||||
assert!(
|
||||
!epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_1.contains(h)),
|
||||
);
|
||||
|
||||
// but the epoch changes from the other forks must still exist
|
||||
assert!(
|
||||
epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_2.contains(h))
|
||||
);
|
||||
|
||||
assert!(
|
||||
epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_3.contains(h)),
|
||||
);
|
||||
|
||||
// finalizing block #25 from the canon chain should prune out the second fork
|
||||
client.finalize_block(BlockId::Hash(canon_hashes[24]), None, false).unwrap();
|
||||
propose_and_import_blocks(BlockId::Hash(client.info().chain.best_hash), 8);
|
||||
|
||||
// at this point no hashes from the second fork must exist on the tree
|
||||
assert!(
|
||||
!epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_2.contains(h)),
|
||||
);
|
||||
|
||||
// while epoch changes from the last fork should still exist
|
||||
assert!(
|
||||
epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_3.contains(h)),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -86,55 +86,45 @@ impl<H, N, V> ForkTree<H, N, V> where
|
||||
N: Ord + Clone,
|
||||
V: Clone,
|
||||
{
|
||||
/// Prune all nodes that are not descendents of `hash` according to
|
||||
/// `is_descendent_of`. The given function `is_descendent_of` should return
|
||||
/// `true` if the second hash (target) is a descendent of the first hash
|
||||
/// (base). After pruning the tree it should have one or zero roots. The
|
||||
/// number and order of calls to `is_descendent_of` is unspecified and
|
||||
/// subject to change.
|
||||
pub fn prune<F, E>(
|
||||
/// Prune the tree, removing all non-canonical nodes. We find the node in the
|
||||
/// tree that is the deepest ancestor of the given hash and that passes the
|
||||
/// given predicate. If such a node exists, we re-root the tree to this
|
||||
/// node. Otherwise the tree remains unchanged. The given function
|
||||
/// `is_descendent_of` should return `true` if the second hash (target) is a
|
||||
/// descendent of the first hash (base).
|
||||
pub fn prune<F, E, P>(
|
||||
&mut self,
|
||||
hash: &H,
|
||||
number: N,
|
||||
is_descendent_of: &F
|
||||
number: &N,
|
||||
is_descendent_of: &F,
|
||||
predicate: &P,
|
||||
) -> Result<(), Error<E>>
|
||||
where E: std::error::Error,
|
||||
F: Fn(&H, &H) -> Result<bool, E>
|
||||
F: Fn(&H, &H) -> Result<bool, E>,
|
||||
P: Fn(&V) -> bool,
|
||||
{
|
||||
let mut new_root = None;
|
||||
for node in self.node_iter() {
|
||||
// if the node has a lower number than the one being finalized then
|
||||
// we only keep if it has no children and the finalized block is a
|
||||
// descendent of this node
|
||||
if node.number < number {
|
||||
if !node.children.is_empty() || !is_descendent_of(&node.hash, hash)? {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// if the node has the same number as the finalized block then it
|
||||
// must have the same hash
|
||||
if node.number == number && node.hash != *hash {
|
||||
continue;
|
||||
}
|
||||
|
||||
// if the node has a higher number then we keep it if it is a
|
||||
// descendent of the finalized block
|
||||
if node.number > number && !is_descendent_of(hash, &node.hash)? {
|
||||
continue;
|
||||
}
|
||||
|
||||
new_root = Some(node);
|
||||
break;
|
||||
}
|
||||
let new_root = self.find_node_where(
|
||||
hash,
|
||||
number,
|
||||
is_descendent_of,
|
||||
predicate,
|
||||
)?;
|
||||
|
||||
if let Some(root) = new_root {
|
||||
self.roots = vec![root.clone()];
|
||||
let mut root = root.clone();
|
||||
|
||||
// we found the deepest ancestor of the finalized block, so we prune
|
||||
// out any children that don't include the finalized block.
|
||||
root.children.retain(|node| {
|
||||
node.number == *number && node.hash == *hash ||
|
||||
node.number < *number && is_descendent_of(&node.hash, hash).unwrap_or(false)
|
||||
});
|
||||
|
||||
self.roots = vec![root];
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<H, N, V> ForkTree<H, N, V> where
|
||||
@@ -1203,18 +1193,36 @@ mod test {
|
||||
|
||||
tree.prune(
|
||||
&"C",
|
||||
3,
|
||||
&3,
|
||||
&is_descendent_of,
|
||||
&|_| true,
|
||||
).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tree.roots.iter().map(|node| node.hash).collect::<Vec<_>>(),
|
||||
vec!["C"],
|
||||
vec!["B"],
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tree.iter().map(|(hash, _, _)| *hash).collect::<Vec<_>>(),
|
||||
vec!["C", "D", "E"],
|
||||
vec!["B", "C", "D", "E"],
|
||||
);
|
||||
|
||||
tree.prune(
|
||||
&"E",
|
||||
&5,
|
||||
&is_descendent_of,
|
||||
&|_| true,
|
||||
).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tree.roots.iter().map(|node| node.hash).collect::<Vec<_>>(),
|
||||
vec!["D"],
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tree.iter().map(|(hash, _, _)| *hash).collect::<Vec<_>>(),
|
||||
vec!["D", "E"],
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user