From 3827f6e6f8b03ad072048780cbc6d70e0f6a0556 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 23 Jun 2021 23:26:33 +0200 Subject: [PATCH] av-store: use determine_new_blocks (#3356) * av-store: use determine_new_blocks * fix tests * update the guide * rename KnownBlocks * fix iteration order * add a test --- polkadot/node/core/av-store/src/lib.rs | 153 +++++++++++---- polkadot/node/core/av-store/src/tests.rs | 179 ++++++++++++++++-- .../src/node/utility/availability-store.md | 8 +- 3 files changed, 273 insertions(+), 67 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 1c5e8708e5..4c646f5e2d 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -19,7 +19,7 @@ #![recursion_limit="256"] #![warn(missing_docs)] -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, BTreeSet}; use std::io; use std::sync::Arc; use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; @@ -31,7 +31,7 @@ use kvdb::{KeyValueDB, DBTransaction}; use polkadot_primitives::v1::{ Hash, BlockNumber, CandidateEvent, ValidatorIndex, CandidateHash, - CandidateReceipt, + CandidateReceipt, Header, }; use polkadot_node_primitives::{ ErasureChunk, AvailableData, @@ -41,7 +41,10 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, errors::{ChainApiError, RuntimeApiError}, }; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_subsystem_util::{ + self as util, + metrics::{self, prometheus}, +}; use polkadot_subsystem::messages::{ AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, }; @@ -444,6 +447,8 @@ pub struct AvailabilityStoreSubsystem { pruning_config: PruningConfig, config: Config, db: Arc, + known_blocks: KnownUnfinalizedBlocks, + finalized_number: Option, metrics: Metrics, clock: Box, } @@ -478,6 +483,41 @@ impl AvailabilityStoreSubsystem { db, metrics, clock, + known_blocks: KnownUnfinalizedBlocks::default(), + finalized_number: None, + } + } +} + +/// We keep the hashes and numbers of all unfinalized +/// processed blocks in memory. +#[derive(Default, Debug)] +struct KnownUnfinalizedBlocks { + by_hash: HashSet, + by_number: BTreeSet<(BlockNumber, Hash)>, +} + +impl KnownUnfinalizedBlocks { + /// Check whether the block has been already processed. + fn is_known(&self, hash: &Hash) -> bool { + self.by_hash.contains(hash) + } + + /// Insert a new block into the known set. + fn insert(&mut self, hash: Hash, number: BlockNumber) { + self.by_hash.insert(hash); + self.by_number.insert((number, hash)); + } + + /// Prune all finalized blocks. + fn prune_finalized(&mut self, finalized: BlockNumber) { + // split_off returns everything after the given key, including the key + let split_point = finalized.saturating_add(1); + let mut finalized = self.by_number.split_off(&(split_point, Hash::zero())); + // after split_off `finalized` actually contains unfinalized blocks, we need to swap + std::mem::swap(&mut self.by_number, &mut finalized); + for (_, block) in finalized { + self.by_hash.remove(&block); } } } @@ -547,6 +587,8 @@ where FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => { let _timer = subsystem.metrics.time_process_block_finalized(); + subsystem.finalized_number = Some(number); + subsystem.known_blocks.prune_finalized(number); process_block_finalized( ctx, &subsystem, @@ -580,27 +622,6 @@ async fn process_block_activated( ) -> Result<(), Error> { let now = subsystem.clock.now()?; - let candidate_events = { - let (tx, rx) = oneshot::channel(); - ctx.send_message( - RuntimeApiMessage::Request(activated, RuntimeApiRequest::CandidateEvents(tx)).into() - ).await; - - rx.await?? - }; - - let block_number = { - let (tx, rx) = oneshot::channel(); - ctx.send_message( - ChainApiMessage::BlockNumber(activated, tx).into() - ).await; - - match rx.await?? { - None => return Ok(()), - Some(n) => n, - } - }; - let block_header = { let (tx, rx) = oneshot::channel(); @@ -613,28 +634,77 @@ async fn process_block_activated( Some(n) => n, } }; + let block_number = block_header.number; - // We need to request the number of validators based on the parent state, as that is the number of validators - // used to create this block. + let new_blocks = util::determine_new_blocks( + ctx.sender(), + |hash| -> Result { + Ok(subsystem.known_blocks.is_known(hash)) + }, + activated, + &block_header, + subsystem.finalized_number.unwrap_or(block_number.saturating_sub(1)), + ).await?; + + let mut tx = DBTransaction::new(); + // determine_new_blocks is descending in block height + for (hash, header) in new_blocks.into_iter().rev() { + process_new_head( + ctx, + &subsystem.db, + &mut tx, + &subsystem.config, + &subsystem.pruning_config, + now, + hash, + header, + ).await?; + subsystem.known_blocks.insert(hash, block_number); + } + subsystem.db.write(tx)?; + + Ok(()) +} + +async fn process_new_head( + ctx: &mut impl SubsystemContext, + db: &Arc, + db_transaction: &mut DBTransaction, + config: &Config, + pruning_config: &PruningConfig, + now: Duration, + hash: Hash, + header: Header, +) -> Result<(), Error> { + + let candidate_events = { + let (tx, rx) = oneshot::channel(); + ctx.send_message( + RuntimeApiMessage::Request(hash, RuntimeApiRequest::CandidateEvents(tx)).into() + ).await; + + rx.await?? + }; + + // We need to request the number of validators based on the parent state, + // as that is the number of validators used to create this block. let n_validators = { let (tx, rx) = oneshot::channel(); ctx.send_message( - RuntimeApiMessage::Request(block_header.parent_hash, RuntimeApiRequest::Validators(tx)).into() + RuntimeApiMessage::Request(header.parent_hash, RuntimeApiRequest::Validators(tx)).into() ).await; rx.await??.len() }; - let mut tx = DBTransaction::new(); - for event in candidate_events { match event { CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => { note_block_backed( - &subsystem.db, - &mut tx, - &subsystem.config, - &subsystem.pruning_config, + db, + db_transaction, + config, + pruning_config, now, n_validators, receipt, @@ -642,11 +712,11 @@ async fn process_block_activated( } CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => { note_block_included( - &subsystem.db, - &mut tx, - &subsystem.config, - &subsystem.pruning_config, - (block_number, activated), + db, + db_transaction, + config, + pruning_config, + (header.number, hash), receipt, )?; } @@ -654,8 +724,6 @@ async fn process_block_activated( } } - subsystem.db.write(tx)?; - Ok(()) } @@ -732,9 +800,10 @@ fn note_block_included( State::Unfinalized(at, mut within) => { if let Err(i) = within.binary_search(&be_block) { within.insert(i, be_block); + State::Unfinalized(at, within) + } else { + return Ok(()); } - - State::Unfinalized(at, within) } State::Finalized(_at) => { // This should never happen as a candidate would have to be included after diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 718b52306a..e923545a78 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -266,6 +266,25 @@ fn runtime_api_error_does_not_stop_the_subsystem() { }), ).await; + let header = Header { + parent_hash: Hash::zero(), + number: 1, + state_root: Hash::zero(), + extrinsics_root: Hash::zero(), + digest: Default::default(), + }; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(Some(header))).unwrap(); + } + ); + // runtime api call fails assert_matches!( overseer_recv(&mut virtual_overseer).await, @@ -765,6 +784,134 @@ fn stored_data_kept_until_finalized() { }); } +#[test] +fn we_dont_miss_anything_if_import_notifications_are_missed() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { + overseer_signal( + &mut virtual_overseer, + OverseerSignal::BlockFinalized(Hash::zero(), 1) + ).await; + + let header = Header { + parent_hash: Hash::repeat_byte(3), + number: 4, + state_root: Hash::zero(), + extrinsics_root: Hash::zero(), + digest: Default::default(), + }; + let new_leaf = Hash::repeat_byte(4); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: vec![ActivatedLeaf { + hash: new_leaf, + number: 4, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }].into(), + deactivated: vec![].into(), + }), + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(Some(header))).unwrap(); + } + ); + + let new_heads = vec![ + (Hash::repeat_byte(2), Hash::repeat_byte(1)), + (Hash::repeat_byte(3), Hash::repeat_byte(2)), + (Hash::repeat_byte(4), Hash::repeat_byte(3)), + ]; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash, + k, + response_channel: tx, + }) => { + assert_eq!(hash, new_leaf); + assert_eq!(k, 2); + let _ = tx.send(Ok(vec![ + Hash::repeat_byte(3), + Hash::repeat_byte(2), + ])); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, Hash::repeat_byte(3)); + tx.send(Ok(Some(Header { + parent_hash: Hash::repeat_byte(2), + number: 3, + state_root: Hash::zero(), + extrinsics_root: Hash::zero(), + digest: Default::default(), + }))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, Hash::repeat_byte(2)); + tx.send(Ok(Some(Header { + parent_hash: Hash::repeat_byte(1), + number: 2, + state_root: Hash::zero(), + extrinsics_root: Hash::zero(), + digest: Default::default(), + }))).unwrap(); + } + ); + + for (head, parent) in new_heads { + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(relay_parent, head); + tx.send(Ok(Vec::new())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, parent); + tx.send(Ok(Vec::new())).unwrap(); + } + ); + } + + virtual_overseer + }); +} + #[test] fn forkfullness_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); @@ -1003,6 +1150,17 @@ async fn import_leaf( }), ).await; + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(Some(header))).unwrap(); + } + ); + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -1014,27 +1172,6 @@ async fn import_leaf( } ); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::BlockNumber( - relay_parent, - tx, - )) => { - assert_eq!(relay_parent, new_leaf); - tx.send(Ok(Some(block_number))).unwrap(); - } - ); - - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::BlockHeader( - relay_parent, - tx, - )) => { - assert_eq!(relay_parent, new_leaf); - tx.send(Ok(Some(header))).unwrap(); - } - ); assert_matches!( overseer_recv(virtual_overseer).await, diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md b/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md index e4f94cdcb7..aec10b1967 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md @@ -94,10 +94,10 @@ Output: ## Functionality For each head in the `activated` list: - - Note any new candidates backed in the block. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour. - - Note any new candidate included in the block. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the block hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate. + - Load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data. + - Note any new candidates backed in the head. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour. + - Note any new candidate included in the head. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the head hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate. - The `CandidateEvent` runtime API can be used for this purpose. - - TODO: load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data. On `OverseerSignal::BlockFinalized(finalized)` events: - for each key in `iter_with_prefix("unfinalized")` @@ -110,7 +110,7 @@ On `OverseerSignal::BlockFinalized(finalized)` events: - For each candidate that we encounter under `f` which is not under the finalized block hash, - Remove all entries under `f` in the `Unfinalized` state. - If the `CandidateMeta` has state `Unfinalized` with an empty list of blocks, downgrade to `Unavailable` and re-schedule pruning under the timestamp + 1 hour. We do not prune here as the candidate still may be included in a descendent of the finalized chain. - - Remove all `"unfinalized"` keys under `f`. + - Remove all `"unfinalized"` keys under `f`. - Update last_finalized = finalized. This is roughly `O(n * m)` where n is the number of blocks finalized since the last update, and `m` is the number of parachains.