mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-07-04 04:37:23 +00:00
74baed8b39
* crate skeleton and type definitions * add ChainSelectionMessage * add error type * run loop * fix overseer * simplify determine_new_blocks API * write an overlay struct and fetch new blocks * add new function to overlay * more flow * add leaves to overlay and add a strong type around leaves-set * add is_parent_viable * implement block import, ignoring reversions * add stagnant-at to overlay * add stagnant * add revert consensus log * flow for reversions * extract and import block reversions * recursively update viability * remove redundant parameter from WriteBlockEntry * do some removal of viable leaves * address grumbles * refactor * address grumbles * add comment about non-monotonicity * extract backend to submodule * begin the hunt for viable leaves * viability pivots for updating the active leaves * remove LeafSearchFrontier * partially -> explicitly viable and untwist some booleans * extract tree to submodule * implement block finality update * Implement block approval routine * implement stagnant detection * ensure blocks pruned on finality are removed from the active leaves set * write down some planned test cases * floww * leaf loading * implement best_leaf_containing * write down a few more tests to do * remove dependence of tree on header * guide: ChainApiMessage::BlockWeight * node: BlockWeight ChainAPI * fix compile issue * note a few TODOs for the future * fetch block weight using new BlockWeight ChainAPI * implement unimplemented * sort leaves by block number after weight * remove warnings and add more TODOs * create test module * storage for test backend * wrap inner in mutex * add write waker query to test backend * Add OverseerSignal -> FromOverseer conversion * add test harnes * add no-op test * add some more test helpers * the first test * more progress on tests * test two subtrees * determine-new-blocks: cleaner genesis avoidance and tighter ancestry requests * don't make ancestry requests when asking for one block * add a couple more tests * add to AllMessages in guide * remove bad spaces from bridge * compact iterator * test import with gaps * more reversion tests * test finalization pruning subtrees * fixups * test clobbering and fix bug in overlay * exhaustive backend state after finalizaiton tested * more finality tests * leaf tests * test approval * test ChainSelectionMessage::Leaves thoroughly * remove TODO * avoid Ordering::is_ne so CI can build * comment algorithmic complexity * Update node/core/chain-selection/src/lib.rs Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
622 lines
15 KiB
Rust
622 lines
15 KiB
Rust
// Copyright 2021 Parity Technologies (UK) Ltd.
|
|
// This file is part of Polkadot.
|
|
|
|
// Polkadot is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Polkadot is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
//! A utility for fetching all unknown blocks based on a new chain-head hash.
|
|
|
|
use polkadot_node_subsystem::{
|
|
messages::ChainApiMessage, SubsystemSender,
|
|
};
|
|
use polkadot_primitives::v1::{Hash, Header, BlockNumber};
|
|
use futures::prelude::*;
|
|
use futures::channel::oneshot;
|
|
|
|
/// Given a new chain-head hash, this determines the hashes of all new blocks we should track
|
|
/// metadata for, given this head.
|
|
///
|
|
/// This is guaranteed to be a subset of the (inclusive) ancestry of `head` determined as all
|
|
/// blocks above the lower bound or above the highest known block, whichever is higher.
|
|
/// This is formatted in descending order by block height.
|
|
///
|
|
/// An implication of this is that if `head` itself is known or not above the lower bound,
|
|
/// then the returned list will be empty.
|
|
///
|
|
/// This may be somewhat expensive when first recovering from major sync.
|
|
pub async fn determine_new_blocks<E>(
|
|
ctx: &mut impl SubsystemSender,
|
|
is_known: impl Fn(&Hash) -> Result<bool, E>,
|
|
head: Hash,
|
|
header: &Header,
|
|
lower_bound_number: BlockNumber,
|
|
) -> Result<Vec<(Hash, Header)>, E> {
|
|
const ANCESTRY_STEP: usize = 4;
|
|
|
|
let min_block_needed = lower_bound_number + 1;
|
|
|
|
// Early exit if the block is in the DB or too early.
|
|
{
|
|
let already_known = is_known(&head)?;
|
|
|
|
let before_relevant = header.number < min_block_needed;
|
|
|
|
if already_known || before_relevant {
|
|
return Ok(Vec::new());
|
|
}
|
|
}
|
|
|
|
let mut ancestry = vec![(head, header.clone())];
|
|
|
|
// Early exit if the parent hash is in the DB or no further blocks
|
|
// are needed.
|
|
if is_known(&header.parent_hash)? || header.number == min_block_needed {
|
|
return Ok(ancestry);
|
|
}
|
|
|
|
'outer: loop {
|
|
let &(ref last_hash, ref last_header) = ancestry.last()
|
|
.expect("ancestry has length 1 at initialization and is only added to; qed");
|
|
|
|
assert!(
|
|
last_header.number > min_block_needed,
|
|
"Loop invariant: the last block in ancestry is checked to be \
|
|
above the minimum before the loop, and at the end of each iteration; \
|
|
qed"
|
|
);
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
// This is always non-zero as determined by the loop invariant
|
|
// above.
|
|
let ancestry_step = std::cmp::min(
|
|
ANCESTRY_STEP,
|
|
(last_header.number - min_block_needed) as usize,
|
|
);
|
|
|
|
let batch_hashes = if ancestry_step == 1 {
|
|
vec![last_header.parent_hash]
|
|
} else {
|
|
ctx.send_message(ChainApiMessage::Ancestors {
|
|
hash: *last_hash,
|
|
k: ancestry_step,
|
|
response_channel: tx,
|
|
}.into()).await;
|
|
|
|
// Continue past these errors.
|
|
match rx.await {
|
|
Err(_) | Ok(Err(_)) => break 'outer,
|
|
Ok(Ok(ancestors)) => ancestors,
|
|
}
|
|
};
|
|
|
|
let batch_headers = {
|
|
let (batch_senders, batch_receivers) = (0..batch_hashes.len())
|
|
.map(|_| oneshot::channel())
|
|
.unzip::<_, _, Vec<_>, Vec<_>>();
|
|
|
|
for (hash, sender) in batch_hashes.iter().cloned().zip(batch_senders) {
|
|
ctx.send_message(ChainApiMessage::BlockHeader(hash, sender).into()).await;
|
|
}
|
|
|
|
let mut requests = futures::stream::FuturesOrdered::new();
|
|
batch_receivers.into_iter().map(|rx| async move {
|
|
match rx.await {
|
|
Err(_) | Ok(Err(_)) => None,
|
|
Ok(Ok(h)) => h,
|
|
}
|
|
})
|
|
.for_each(|x| requests.push(x));
|
|
|
|
let batch_headers: Vec<_> = requests
|
|
.flat_map(|x: Option<Header>| stream::iter(x))
|
|
.collect()
|
|
.await;
|
|
|
|
// Any failed header fetch of the batch will yield a `None` result that will
|
|
// be skipped. Any failure at this stage means we'll just ignore those blocks
|
|
// as the chain DB has failed us.
|
|
if batch_headers.len() != batch_hashes.len() { break 'outer }
|
|
batch_headers
|
|
};
|
|
|
|
for (hash, header) in batch_hashes.into_iter().zip(batch_headers) {
|
|
let is_known = is_known(&hash)?;
|
|
|
|
let is_relevant = header.number >= min_block_needed;
|
|
let is_terminating = header.number == min_block_needed;
|
|
|
|
if is_known || !is_relevant {
|
|
break 'outer
|
|
}
|
|
|
|
ancestry.push((hash, header));
|
|
|
|
if is_terminating {
|
|
break 'outer
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(ancestry)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::collections::{HashSet, HashMap};
|
|
use sp_core::testing::TaskExecutor;
|
|
use polkadot_node_subsystem::{messages::AllMessages, SubsystemContext};
|
|
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
|
|
use assert_matches::assert_matches;
|
|
|
|
#[derive(Default)]
|
|
struct TestKnownBlocks {
|
|
blocks: HashSet<Hash>,
|
|
}
|
|
|
|
impl TestKnownBlocks {
|
|
fn insert(&mut self, hash: Hash) {
|
|
self.blocks.insert(hash);
|
|
}
|
|
|
|
fn is_known(&self, hash: &Hash) -> Result<bool, ()> {
|
|
Ok(self.blocks.contains(hash))
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct TestChain {
|
|
start_number: BlockNumber,
|
|
headers: Vec<Header>,
|
|
numbers: HashMap<Hash, BlockNumber>,
|
|
}
|
|
|
|
impl TestChain {
|
|
fn new(start: BlockNumber, len: usize) -> Self {
|
|
assert!(len > 0, "len must be at least 1");
|
|
|
|
let base = Header {
|
|
digest: Default::default(),
|
|
extrinsics_root: Default::default(),
|
|
number: start,
|
|
state_root: Default::default(),
|
|
parent_hash: Default::default(),
|
|
};
|
|
|
|
let base_hash = base.hash();
|
|
|
|
let mut chain = TestChain {
|
|
start_number: start,
|
|
headers: vec![base],
|
|
numbers: vec![(base_hash, start)].into_iter().collect(),
|
|
};
|
|
|
|
for _ in 1..len {
|
|
chain.grow()
|
|
}
|
|
|
|
chain
|
|
}
|
|
|
|
fn grow(&mut self) {
|
|
let next = {
|
|
let last = self.headers.last().unwrap();
|
|
Header {
|
|
digest: Default::default(),
|
|
extrinsics_root: Default::default(),
|
|
number: last.number + 1,
|
|
state_root: Default::default(),
|
|
parent_hash: last.hash(),
|
|
}
|
|
};
|
|
|
|
self.numbers.insert(next.hash(), next.number);
|
|
self.headers.push(next);
|
|
}
|
|
|
|
fn header_by_number(&self, number: BlockNumber) -> Option<&Header> {
|
|
if number < self.start_number {
|
|
None
|
|
} else {
|
|
self.headers.get((number - self.start_number) as usize)
|
|
}
|
|
}
|
|
|
|
fn header_by_hash(&self, hash: &Hash) -> Option<&Header> {
|
|
self.numbers.get(hash).and_then(|n| self.header_by_number(*n))
|
|
}
|
|
|
|
fn hash_by_number(&self, number: BlockNumber) -> Option<Hash> {
|
|
self.header_by_number(number).map(|h| h.hash())
|
|
}
|
|
|
|
fn ancestry(&self, hash: &Hash, k: BlockNumber) -> Vec<Hash> {
|
|
let n = match self.numbers.get(hash) {
|
|
None => return Vec::new(),
|
|
Some(&n) => n,
|
|
};
|
|
|
|
(0..k)
|
|
.map(|i| i + 1)
|
|
.filter_map(|i| self.header_by_number(n - i))
|
|
.map(|h| h.hash())
|
|
.collect()
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_blocks_back_to_lower_bound() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let known = TestKnownBlocks::default();
|
|
|
|
let chain = TestChain::new(10, 9);
|
|
|
|
let head = chain.header_by_number(18).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let lower_bound_number = 12;
|
|
|
|
// Finalized block should be omitted. The head provided to `determine_new_blocks`
|
|
// should be included.
|
|
let expected_ancestry = (13..=18)
|
|
.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
|
|
.rev()
|
|
.collect::<Vec<_>>();
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let ancestry = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
lower_bound_number,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(
|
|
ancestry,
|
|
expected_ancestry,
|
|
);
|
|
});
|
|
|
|
let aux_fut = Box::pin(async move {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::Ancestors {
|
|
hash: h,
|
|
k,
|
|
response_channel: tx,
|
|
}) => {
|
|
assert_eq!(h, head_hash);
|
|
assert_eq!(k, 4);
|
|
let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
|
|
}
|
|
);
|
|
|
|
for _ in 0u32..4 {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
|
|
let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
|
|
}
|
|
);
|
|
}
|
|
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
|
|
assert_eq!(h, chain.hash_by_number(13).unwrap());
|
|
let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
|
|
}
|
|
);
|
|
});
|
|
|
|
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_blocks_back_to_known() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let mut known = TestKnownBlocks::default();
|
|
|
|
let chain = TestChain::new(10, 9);
|
|
|
|
let head = chain.header_by_number(18).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let lower_bound_number = 12;
|
|
let known_number = 15;
|
|
let known_hash = chain.hash_by_number(known_number).unwrap();
|
|
|
|
known.insert(known_hash);
|
|
|
|
// Known block should be omitted. The head provided to `determine_new_blocks`
|
|
// should be included.
|
|
let expected_ancestry = (16..=18)
|
|
.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
|
|
.rev()
|
|
.collect::<Vec<_>>();
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let ancestry = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
lower_bound_number,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(
|
|
ancestry,
|
|
expected_ancestry,
|
|
);
|
|
});
|
|
|
|
let aux_fut = Box::pin(async move {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::Ancestors {
|
|
hash: h,
|
|
k,
|
|
response_channel: tx,
|
|
}) => {
|
|
assert_eq!(h, head_hash);
|
|
assert_eq!(k, 4);
|
|
let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
|
|
}
|
|
);
|
|
|
|
for _ in 0u32..4 {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
|
|
let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
|
|
}
|
|
);
|
|
}
|
|
});
|
|
|
|
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_blocks_already_known_is_empty() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let mut known = TestKnownBlocks::default();
|
|
|
|
let chain = TestChain::new(10, 9);
|
|
|
|
let head = chain.header_by_number(18).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let lower_bound_number = 0;
|
|
|
|
known.insert(head_hash);
|
|
|
|
// Known block should be omitted.
|
|
let expected_ancestry = Vec::new();
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let ancestry = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
lower_bound_number,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(
|
|
ancestry,
|
|
expected_ancestry,
|
|
);
|
|
});
|
|
|
|
futures::executor::block_on(test_fut);
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_blocks_parent_known_is_fast() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let mut known = TestKnownBlocks::default();
|
|
|
|
let chain = TestChain::new(10, 9);
|
|
|
|
let head = chain.header_by_number(18).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let lower_bound_number = 0;
|
|
let parent_hash = chain.hash_by_number(17).unwrap();
|
|
|
|
known.insert(parent_hash);
|
|
|
|
// New block should be the only new one.
|
|
let expected_ancestry = vec![(head_hash, head.clone())];
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let ancestry = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
lower_bound_number,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(
|
|
ancestry,
|
|
expected_ancestry,
|
|
);
|
|
});
|
|
|
|
futures::executor::block_on(test_fut);
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_block_before_finality_is_empty() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let chain = TestChain::new(10, 9);
|
|
|
|
let head = chain.header_by_number(18).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let parent_hash = chain.hash_by_number(17).unwrap();
|
|
let mut known = TestKnownBlocks::default();
|
|
|
|
known.insert(parent_hash);
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let after_finality = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
17,
|
|
).await.unwrap();
|
|
|
|
let at_finality = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
18,
|
|
).await.unwrap();
|
|
|
|
let before_finality = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
19,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(
|
|
after_finality,
|
|
vec![(head_hash, head.clone())],
|
|
);
|
|
|
|
assert_eq!(
|
|
at_finality,
|
|
Vec::new(),
|
|
);
|
|
|
|
assert_eq!(
|
|
before_finality,
|
|
Vec::new(),
|
|
);
|
|
});
|
|
|
|
futures::executor::block_on(test_fut);
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_blocks_does_not_request_genesis() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let chain = TestChain::new(1, 2);
|
|
|
|
let head = chain.header_by_number(2).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let known = TestKnownBlocks::default();
|
|
|
|
let expected_ancestry = (1..=2)
|
|
.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
|
|
.rev()
|
|
.collect::<Vec<_>>();
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let ancestry = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
0,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(ancestry, expected_ancestry);
|
|
});
|
|
|
|
let aux_fut = Box::pin(async move {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
|
|
assert_eq!(h, chain.hash_by_number(1).unwrap());
|
|
let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
|
|
}
|
|
);
|
|
});
|
|
|
|
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
|
|
}
|
|
|
|
#[test]
|
|
fn determine_new_blocks_does_not_request_genesis_even_in_multi_ancestry() {
|
|
let pool = TaskExecutor::new();
|
|
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
|
|
|
|
let chain = TestChain::new(1, 3);
|
|
|
|
let head = chain.header_by_number(3).unwrap().clone();
|
|
let head_hash = head.hash();
|
|
let known = TestKnownBlocks::default();
|
|
|
|
let expected_ancestry = (1..=3)
|
|
.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
|
|
.rev()
|
|
.collect::<Vec<_>>();
|
|
|
|
let test_fut = Box::pin(async move {
|
|
let ancestry = determine_new_blocks(
|
|
ctx.sender(),
|
|
|h| known.is_known(h),
|
|
head_hash,
|
|
&head,
|
|
0,
|
|
).await.unwrap();
|
|
|
|
assert_eq!(ancestry, expected_ancestry);
|
|
});
|
|
|
|
let aux_fut = Box::pin(async move {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::Ancestors {
|
|
hash: h,
|
|
k,
|
|
response_channel: tx,
|
|
}) => {
|
|
assert_eq!(h, head_hash);
|
|
assert_eq!(k, 2);
|
|
|
|
let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
|
|
}
|
|
);
|
|
|
|
for _ in 0..2 {
|
|
assert_matches!(
|
|
handle.recv().await,
|
|
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
|
|
let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
|
|
}
|
|
);
|
|
}
|
|
});
|
|
|
|
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
|
|
}
|
|
}
|