mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 03:31:10 +00:00
av-store: use determine_new_blocks (#3356)
* av-store: use determine_new_blocks * fix tests * update the guide * rename KnownBlocks * fix iteration order * add a test
This commit is contained in:
@@ -19,7 +19,7 @@
|
||||
#![recursion_limit="256"]
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet, BTreeSet};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
|
||||
@@ -31,7 +31,7 @@ use kvdb::{KeyValueDB, DBTransaction};
|
||||
|
||||
use polkadot_primitives::v1::{
|
||||
Hash, BlockNumber, CandidateEvent, ValidatorIndex, CandidateHash,
|
||||
CandidateReceipt,
|
||||
CandidateReceipt, Header,
|
||||
};
|
||||
use polkadot_node_primitives::{
|
||||
ErasureChunk, AvailableData,
|
||||
@@ -41,7 +41,10 @@ use polkadot_subsystem::{
|
||||
ActiveLeavesUpdate,
|
||||
errors::{ChainApiError, RuntimeApiError},
|
||||
};
|
||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
use polkadot_node_subsystem_util::{
|
||||
self as util,
|
||||
metrics::{self, prometheus},
|
||||
};
|
||||
use polkadot_subsystem::messages::{
|
||||
AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
|
||||
};
|
||||
@@ -444,6 +447,8 @@ pub struct AvailabilityStoreSubsystem {
|
||||
pruning_config: PruningConfig,
|
||||
config: Config,
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
known_blocks: KnownUnfinalizedBlocks,
|
||||
finalized_number: Option<BlockNumber>,
|
||||
metrics: Metrics,
|
||||
clock: Box<dyn Clock>,
|
||||
}
|
||||
@@ -478,6 +483,41 @@ impl AvailabilityStoreSubsystem {
|
||||
db,
|
||||
metrics,
|
||||
clock,
|
||||
known_blocks: KnownUnfinalizedBlocks::default(),
|
||||
finalized_number: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We keep the hashes and numbers of all unfinalized
|
||||
/// processed blocks in memory.
|
||||
#[derive(Default, Debug)]
|
||||
struct KnownUnfinalizedBlocks {
|
||||
by_hash: HashSet<Hash>,
|
||||
by_number: BTreeSet<(BlockNumber, Hash)>,
|
||||
}
|
||||
|
||||
impl KnownUnfinalizedBlocks {
|
||||
/// Check whether the block has been already processed.
|
||||
fn is_known(&self, hash: &Hash) -> bool {
|
||||
self.by_hash.contains(hash)
|
||||
}
|
||||
|
||||
/// Insert a new block into the known set.
|
||||
fn insert(&mut self, hash: Hash, number: BlockNumber) {
|
||||
self.by_hash.insert(hash);
|
||||
self.by_number.insert((number, hash));
|
||||
}
|
||||
|
||||
/// Prune all finalized blocks.
|
||||
fn prune_finalized(&mut self, finalized: BlockNumber) {
|
||||
// split_off returns everything after the given key, including the key
|
||||
let split_point = finalized.saturating_add(1);
|
||||
let mut finalized = self.by_number.split_off(&(split_point, Hash::zero()));
|
||||
// after split_off `finalized` actually contains unfinalized blocks, we need to swap
|
||||
std::mem::swap(&mut self.by_number, &mut finalized);
|
||||
for (_, block) in finalized {
|
||||
self.by_hash.remove(&block);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -547,6 +587,8 @@ where
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
|
||||
let _timer = subsystem.metrics.time_process_block_finalized();
|
||||
|
||||
subsystem.finalized_number = Some(number);
|
||||
subsystem.known_blocks.prune_finalized(number);
|
||||
process_block_finalized(
|
||||
ctx,
|
||||
&subsystem,
|
||||
@@ -580,27 +622,6 @@ async fn process_block_activated(
|
||||
) -> Result<(), Error> {
|
||||
let now = subsystem.clock.now()?;
|
||||
|
||||
let candidate_events = {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
ctx.send_message(
|
||||
RuntimeApiMessage::Request(activated, RuntimeApiRequest::CandidateEvents(tx)).into()
|
||||
).await;
|
||||
|
||||
rx.await??
|
||||
};
|
||||
|
||||
let block_number = {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
ctx.send_message(
|
||||
ChainApiMessage::BlockNumber(activated, tx).into()
|
||||
).await;
|
||||
|
||||
match rx.await?? {
|
||||
None => return Ok(()),
|
||||
Some(n) => n,
|
||||
}
|
||||
};
|
||||
|
||||
let block_header = {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
@@ -613,28 +634,77 @@ async fn process_block_activated(
|
||||
Some(n) => n,
|
||||
}
|
||||
};
|
||||
let block_number = block_header.number;
|
||||
|
||||
// We need to request the number of validators based on the parent state, as that is the number of validators
|
||||
// used to create this block.
|
||||
let new_blocks = util::determine_new_blocks(
|
||||
ctx.sender(),
|
||||
|hash| -> Result<bool, Error> {
|
||||
Ok(subsystem.known_blocks.is_known(hash))
|
||||
},
|
||||
activated,
|
||||
&block_header,
|
||||
subsystem.finalized_number.unwrap_or(block_number.saturating_sub(1)),
|
||||
).await?;
|
||||
|
||||
let mut tx = DBTransaction::new();
|
||||
// determine_new_blocks is descending in block height
|
||||
for (hash, header) in new_blocks.into_iter().rev() {
|
||||
process_new_head(
|
||||
ctx,
|
||||
&subsystem.db,
|
||||
&mut tx,
|
||||
&subsystem.config,
|
||||
&subsystem.pruning_config,
|
||||
now,
|
||||
hash,
|
||||
header,
|
||||
).await?;
|
||||
subsystem.known_blocks.insert(hash, block_number);
|
||||
}
|
||||
subsystem.db.write(tx)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_new_head(
|
||||
ctx: &mut impl SubsystemContext,
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db_transaction: &mut DBTransaction,
|
||||
config: &Config,
|
||||
pruning_config: &PruningConfig,
|
||||
now: Duration,
|
||||
hash: Hash,
|
||||
header: Header,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
let candidate_events = {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
ctx.send_message(
|
||||
RuntimeApiMessage::Request(hash, RuntimeApiRequest::CandidateEvents(tx)).into()
|
||||
).await;
|
||||
|
||||
rx.await??
|
||||
};
|
||||
|
||||
// We need to request the number of validators based on the parent state,
|
||||
// as that is the number of validators used to create this block.
|
||||
let n_validators = {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
ctx.send_message(
|
||||
RuntimeApiMessage::Request(block_header.parent_hash, RuntimeApiRequest::Validators(tx)).into()
|
||||
RuntimeApiMessage::Request(header.parent_hash, RuntimeApiRequest::Validators(tx)).into()
|
||||
).await;
|
||||
|
||||
rx.await??.len()
|
||||
};
|
||||
|
||||
let mut tx = DBTransaction::new();
|
||||
|
||||
for event in candidate_events {
|
||||
match event {
|
||||
CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => {
|
||||
note_block_backed(
|
||||
&subsystem.db,
|
||||
&mut tx,
|
||||
&subsystem.config,
|
||||
&subsystem.pruning_config,
|
||||
db,
|
||||
db_transaction,
|
||||
config,
|
||||
pruning_config,
|
||||
now,
|
||||
n_validators,
|
||||
receipt,
|
||||
@@ -642,11 +712,11 @@ async fn process_block_activated(
|
||||
}
|
||||
CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => {
|
||||
note_block_included(
|
||||
&subsystem.db,
|
||||
&mut tx,
|
||||
&subsystem.config,
|
||||
&subsystem.pruning_config,
|
||||
(block_number, activated),
|
||||
db,
|
||||
db_transaction,
|
||||
config,
|
||||
pruning_config,
|
||||
(header.number, hash),
|
||||
receipt,
|
||||
)?;
|
||||
}
|
||||
@@ -654,8 +724,6 @@ async fn process_block_activated(
|
||||
}
|
||||
}
|
||||
|
||||
subsystem.db.write(tx)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -732,9 +800,10 @@ fn note_block_included(
|
||||
State::Unfinalized(at, mut within) => {
|
||||
if let Err(i) = within.binary_search(&be_block) {
|
||||
within.insert(i, be_block);
|
||||
State::Unfinalized(at, within)
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
State::Unfinalized(at, within)
|
||||
}
|
||||
State::Finalized(_at) => {
|
||||
// This should never happen as a candidate would have to be included after
|
||||
|
||||
@@ -266,6 +266,25 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
|
||||
}),
|
||||
).await;
|
||||
|
||||
let header = Header {
|
||||
parent_hash: Hash::zero(),
|
||||
number: 1,
|
||||
state_root: Hash::zero(),
|
||||
extrinsics_root: Hash::zero(),
|
||||
digest: Default::default(),
|
||||
};
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(header))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// runtime api call fails
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
@@ -765,6 +784,134 @@ fn stored_data_kept_until_finalized() {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn we_dont_miss_anything_if_import_notifications_are_missed() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::BlockFinalized(Hash::zero(), 1)
|
||||
).await;
|
||||
|
||||
let header = Header {
|
||||
parent_hash: Hash::repeat_byte(3),
|
||||
number: 4,
|
||||
state_root: Hash::zero(),
|
||||
extrinsics_root: Hash::zero(),
|
||||
digest: Default::default(),
|
||||
};
|
||||
let new_leaf = Hash::repeat_byte(4);
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: vec![ActivatedLeaf {
|
||||
hash: new_leaf,
|
||||
number: 4,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}].into(),
|
||||
deactivated: vec![].into(),
|
||||
}),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(header))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
let new_heads = vec![
|
||||
(Hash::repeat_byte(2), Hash::repeat_byte(1)),
|
||||
(Hash::repeat_byte(3), Hash::repeat_byte(2)),
|
||||
(Hash::repeat_byte(4), Hash::repeat_byte(3)),
|
||||
];
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::Ancestors {
|
||||
hash,
|
||||
k,
|
||||
response_channel: tx,
|
||||
}) => {
|
||||
assert_eq!(hash, new_leaf);
|
||||
assert_eq!(k, 2);
|
||||
let _ = tx.send(Ok(vec![
|
||||
Hash::repeat_byte(3),
|
||||
Hash::repeat_byte(2),
|
||||
]));
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, Hash::repeat_byte(3));
|
||||
tx.send(Ok(Some(Header {
|
||||
parent_hash: Hash::repeat_byte(2),
|
||||
number: 3,
|
||||
state_root: Hash::zero(),
|
||||
extrinsics_root: Hash::zero(),
|
||||
digest: Default::default(),
|
||||
}))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, Hash::repeat_byte(2));
|
||||
tx.send(Ok(Some(Header {
|
||||
parent_hash: Hash::repeat_byte(1),
|
||||
number: 2,
|
||||
state_root: Hash::zero(),
|
||||
extrinsics_root: Hash::zero(),
|
||||
digest: Default::default(),
|
||||
}))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
for (head, parent) in new_heads {
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, head);
|
||||
tx.send(Ok(Vec::new())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, parent);
|
||||
tx.send(Ok(Vec::new())).unwrap();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forkfullness_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
@@ -1003,6 +1150,17 @@ async fn import_leaf(
|
||||
}),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(header))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
@@ -1014,27 +1172,6 @@ async fn import_leaf(
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(block_number))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(header))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
|
||||
@@ -94,10 +94,10 @@ Output:
|
||||
## Functionality
|
||||
|
||||
For each head in the `activated` list:
|
||||
- Note any new candidates backed in the block. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour.
|
||||
- Note any new candidate included in the block. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the block hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate.
|
||||
- Load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data.
|
||||
- Note any new candidates backed in the head. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour.
|
||||
- Note any new candidate included in the head. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the head hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate.
|
||||
- The `CandidateEvent` runtime API can be used for this purpose.
|
||||
- TODO: load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data.
|
||||
|
||||
On `OverseerSignal::BlockFinalized(finalized)` events:
|
||||
- for each key in `iter_with_prefix("unfinalized")`
|
||||
@@ -110,7 +110,7 @@ On `OverseerSignal::BlockFinalized(finalized)` events:
|
||||
- For each candidate that we encounter under `f` which is not under the finalized block hash,
|
||||
- Remove all entries under `f` in the `Unfinalized` state.
|
||||
- If the `CandidateMeta` has state `Unfinalized` with an empty list of blocks, downgrade to `Unavailable` and re-schedule pruning under the timestamp + 1 hour. We do not prune here as the candidate still may be included in a descendent of the finalized chain.
|
||||
- Remove all `"unfinalized"` keys under `f`.
|
||||
- Remove all `"unfinalized"` keys under `f`.
|
||||
- Update last_finalized = finalized.
|
||||
|
||||
This is roughly `O(n * m)` where n is the number of blocks finalized since the last update, and `m` is the number of parachains.
|
||||
|
||||
Reference in New Issue
Block a user