From aedf089691c17293c47205ce8e33625cf09769da Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 11 Jan 2021 12:46:09 -0500 Subject: [PATCH] alternate availability store schema (#2237) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * alternate availability store schema * improvements * tweaks * new DB schema and skeleton * expand skeleton and tweaks * handle backing and inclusion * let finality be handled later * handle finalized blocks * implement query methods * implement chunk storing * StoreAvailableData * fix an off-by-one * implement pruning * reinstate subsystem trait impl * reinstate metrics * fix warnings * remove chunks_cache * oops * actually store the available data * mockable pruning interval * fix tests * spacing * fix code grumbles * guide improvements * make time mockable * implement a mocked clock for testing * return DB errors * Update node/core/av-store/Cargo.toml Co-authored-by: Bastian Köcher * Update roadmap/implementers-guide/src/node/utility/availability-store.md Co-authored-by: Bastian Köcher * Update roadmap/implementers-guide/src/node/utility/availability-store.md Co-authored-by: Bastian Köcher * review grumbles & clarity * fix review grumbles * Add docs Co-authored-by: Andronik Ordian Co-authored-by: Bastian Köcher Co-authored-by: Andronik Ordian --- polkadot/Cargo.lock | 3 + polkadot/node/core/av-store/Cargo.toml | 3 + polkadot/node/core/av-store/src/lib.rs | 1983 ++++++++--------- polkadot/node/core/av-store/src/tests.rs | 602 ++--- .../src/node/utility/availability-store.md | 210 +- .../src/types/overseer-protocol.md | 6 +- 6 files changed, 1441 insertions(+), 1366 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8472139db9..2ad2e9c915 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5044,6 +5044,7 @@ name = "polkadot-node-core-av-store" version = "0.1.0" dependencies = [ "assert_matches", + "bitvec", "env_logger 0.8.2", "futures 0.3.8", "futures-timer 3.0.2", @@ -5052,6 +5053,7 @@ dependencies = [ "kvdb-rocksdb", "log", "parity-scale-codec", + "parking_lot 0.11.1", "polkadot-erasure-coding", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", @@ -5060,6 +5062,7 @@ dependencies = [ "polkadot-primitives", "sc-service", "sp-core", + "sp-keyring", "thiserror", "tracing", "tracing-futures", diff --git a/polkadot/node/core/av-store/Cargo.toml b/polkadot/node/core/av-store/Cargo.toml index 4dee43dc62..3f34a1ee0f 100644 --- a/polkadot/node/core/av-store/Cargo.toml +++ b/polkadot/node/core/av-store/Cargo.toml @@ -12,6 +12,7 @@ kvdb-rocksdb = "0.10.0" thiserror = "1.0.23" tracing = "0.1.22" tracing-futures = "0.2.4" +bitvec = "0.17.4" parity-scale-codec = { version = "1.3.5", features = ["derive"] } erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" } @@ -31,3 +32,5 @@ kvdb-memorydb = "0.8.0" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +parking_lot = "0.11.1" diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index ca95d777a1..a8b498e48b 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -19,21 +19,21 @@ #![recursion_limit="256"] #![warn(missing_docs)] -use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::io; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; -use parity_scale_codec::{Encode, Decode}; -use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt}; +use parity_scale_codec::{Encode, Decode, Input, Error as CodecError}; +use futures::{select, channel::oneshot, future, FutureExt}; use futures_timer::Delay; use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; use polkadot_primitives::v1::{ Hash, AvailableData, BlockNumber, CandidateEvent, ErasureChunk, ValidatorIndex, CandidateHash, + CandidateReceipt, }; use polkadot_subsystem::{ FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem, @@ -42,8 +42,12 @@ use polkadot_subsystem::{ }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_subsystem::messages::{ - AllMessages, AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, + AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, }; +use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0}; + +#[cfg(test)] +mod tests; const LOG_TARGET: &str = "availability"; @@ -53,6 +57,282 @@ mod columns { pub const NUM_COLUMNS: u32 = 2; } +/// The following constants are used under normal conditions: + +const AVAILABLE_PREFIX: &[u8; 9] = b"available"; +const CHUNK_PREFIX: &[u8; 5] = b"chunk"; +const META_PREFIX: &[u8; 4] = b"meta"; +const UNFINALIZED_PREFIX: &[u8; 11] = b"unfinalized"; +const PRUNE_BY_TIME_PREFIX: &[u8; 13] = b"prune_by_time"; + +// We have some keys we want to map to empty values because existence of the key is enough. We use this because +// rocksdb doesn't support empty values. +const TOMBSTONE_VALUE: &[u8] = &*b" "; + +/// Unavailable blocks are kept for 1 hour. +const KEEP_UNAVAILABLE_FOR: Duration = Duration::from_secs(60 * 60); + +/// Finalized data is kept for 25 hours. +const KEEP_FINALIZED_FOR: Duration = Duration::from_secs(25 * 60 * 60); + +/// The pruning interval. +const PRUNING_INTERVAL: Duration = Duration::from_secs(60 * 5); + +/// Unix time wrapper with big-endian encoding. +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +struct BETimestamp(u64); + +impl Encode for BETimestamp { + fn size_hint(&self) -> usize { + std::mem::size_of::() + } + + fn using_encoded R>(&self, f: F) -> R { + f(&self.0.to_be_bytes()) + } +} + +impl Decode for BETimestamp { + fn decode(value: &mut I) -> Result { + <[u8; 8]>::decode(value).map(u64::from_be_bytes).map(Self) + } +} + +impl From for BETimestamp { + fn from(d: Duration) -> Self { + BETimestamp(d.as_secs()) + } +} + +impl Into for BETimestamp { + fn into(self) -> Duration { + Duration::from_secs(self.0) + } +} + +/// [`BlockNumber`] wrapper with big-endian encoding. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)] +struct BEBlockNumber(BlockNumber); + +impl Encode for BEBlockNumber { + fn size_hint(&self) -> usize { + std::mem::size_of::() + } + + fn using_encoded R>(&self, f: F) -> R { + f(&self.0.to_be_bytes()) + } +} + +impl Decode for BEBlockNumber { + fn decode(value: &mut I) -> Result { + <[u8; std::mem::size_of::()]>::decode(value).map(BlockNumber::from_be_bytes).map(Self) + } +} + +#[derive(Debug, Encode, Decode)] +enum State { + /// Candidate data was first observed at the given time but is not available in any block. + #[codec(index = "0")] + Unavailable(BETimestamp), + /// The candidate was first observed at the given time and was included in the given list of unfinalized blocks, which may be + /// empty. The timestamp here is not used for pruning. Either one of these blocks will be finalized or the state will regress to + /// `State::Unavailable`, in which case the same timestamp will be reused. Blocks are sorted ascending first by block number and + /// then hash. + #[codec(index = "1")] + Unfinalized(BETimestamp, Vec<(BEBlockNumber, Hash)>), + /// Candidate data has appeared in a finalized block and did so at the given time. + #[codec(index = "2")] + Finalized(BETimestamp) +} + +// Meta information about a candidate. +#[derive(Debug, Encode, Decode)] +struct CandidateMeta { + state: State, + data_available: bool, + chunks_stored: BitVec, +} + +fn query_inner( + db: &Arc, + column: u32, + key: &[u8], +) -> Result, Error> { + match db.get(column, key) { + Ok(Some(raw)) => { + let res = D::decode(&mut &raw[..])?; + Ok(Some(res)) + } + Ok(None) => Ok(None), + Err(e) => { + tracing::warn!(target: LOG_TARGET, err = ?e, "Error reading from the availability store"); + Err(e.into()) + } + } +} + +fn write_available_data( + tx: &mut DBTransaction, + hash: &CandidateHash, + available_data: &AvailableData, +) { + let key = (AVAILABLE_PREFIX, hash).encode(); + + tx.put_vec(columns::DATA, &key[..], available_data.encode()); +} + +fn load_available_data( + db: &Arc, + hash: &CandidateHash, +) -> Result, Error> { + let key = (AVAILABLE_PREFIX, hash).encode(); + + query_inner(db, columns::DATA, &key) +} + +fn delete_available_data( + tx: &mut DBTransaction, + hash: &CandidateHash, +) { + let key = (AVAILABLE_PREFIX, hash).encode(); + + tx.delete(columns::DATA, &key[..]) +} + +fn load_chunk( + db: &Arc, + candidate_hash: &CandidateHash, + chunk_index: ValidatorIndex, +) -> Result, Error> { + let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); + + query_inner(db, columns::DATA, &key) +} + +fn write_chunk( + tx: &mut DBTransaction, + candidate_hash: &CandidateHash, + chunk_index: ValidatorIndex, + erasure_chunk: &ErasureChunk, +) { + let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); + + tx.put_vec(columns::DATA, &key, erasure_chunk.encode()); +} + +fn delete_chunk( + tx: &mut DBTransaction, + candidate_hash: &CandidateHash, + chunk_index: ValidatorIndex, +) { + let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); + + tx.delete(columns::DATA, &key[..]); +} + +fn load_meta( + db: &Arc, + hash: &CandidateHash, +) -> Result, Error> { + let key = (META_PREFIX, hash).encode(); + + query_inner(db, columns::META, &key) +} + +fn write_meta( + tx: &mut DBTransaction, + hash: &CandidateHash, + meta: &CandidateMeta, +) { + let key = (META_PREFIX, hash).encode(); + + tx.put_vec(columns::META, &key, meta.encode()); +} + +fn delete_meta(tx: &mut DBTransaction, hash: &CandidateHash) { + let key = (META_PREFIX, hash).encode(); + tx.delete(columns::META, &key[..]) +} + +fn delete_unfinalized_height( + tx: &mut DBTransaction, + block_number: BlockNumber, +) { + let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode(); + tx.delete_prefix(columns::META, &prefix); +} + +fn delete_unfinalized_inclusion( + tx: &mut DBTransaction, + block_number: BlockNumber, + block_hash: &Hash, + candidate_hash: &CandidateHash, +) { + let key = ( + UNFINALIZED_PREFIX, + BEBlockNumber(block_number), + block_hash, + candidate_hash, + ).encode(); + + tx.delete(columns::META, &key[..]); +} + +fn delete_pruning_key(tx: &mut DBTransaction, t: impl Into, h: &CandidateHash) { + let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode(); + tx.delete(columns::META, &key); +} + +fn write_pruning_key(tx: &mut DBTransaction, t: impl Into, h: &CandidateHash) { + let t = t.into(); + let key = (PRUNE_BY_TIME_PREFIX, t, h).encode(); + tx.put(columns::META, &key, TOMBSTONE_VALUE); +} + +fn finalized_block_range(finalized: BlockNumber) -> (Vec, Vec) { + // We use big-endian encoding to iterate in ascending order. + let start = UNFINALIZED_PREFIX.encode(); + let end = (UNFINALIZED_PREFIX, BEBlockNumber(finalized + 1)).encode(); + + (start, end) +} + +fn write_unfinalized_block_contains( + tx: &mut DBTransaction, + n: BlockNumber, + h: &Hash, + ch: &CandidateHash, +) { + let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode(); + tx.put(columns::META, &key, TOMBSTONE_VALUE); +} + +fn pruning_range(now: impl Into) -> (Vec, Vec) { + let start = PRUNE_BY_TIME_PREFIX.encode(); + let end = (PRUNE_BY_TIME_PREFIX, BETimestamp(now.into().0 + 1)).encode(); + + (start, end) +} + +fn decode_unfinalized_key(s: &[u8]) -> Result<(BlockNumber, Hash, CandidateHash), CodecError> { + if !s.starts_with(UNFINALIZED_PREFIX) { + return Err("missing magic string".into()); + } + + <(BEBlockNumber, Hash, CandidateHash)>::decode(&mut &s[UNFINALIZED_PREFIX.len()..]) + .map(|(b, h, ch)| (b.0, h, ch)) +} + +fn decode_pruning_key(s: &[u8]) -> Result<(Duration, CandidateHash), CodecError> { + if !s.starts_with(PRUNE_BY_TIME_PREFIX) { + return Err("missing magic string".into()); + } + + <(BETimestamp, CandidateHash)>::decode(&mut &s[PRUNE_BY_TIME_PREFIX.len()..]) + .map(|(t, ch)| (t.into(), ch)) +} + #[derive(Debug, thiserror::Error)] #[allow(missing_docs)] pub enum Error { @@ -77,6 +357,9 @@ pub enum Error { #[error(transparent)] Time(#[from] SystemTimeError), + #[error(transparent)] + Codec(#[from] CodecError), + #[error("Custom databases are not supported")] CustomDatabase, } @@ -93,336 +376,31 @@ impl Error { } } -/// A wrapper type for delays. -#[derive(Clone, Debug, Decode, Encode, Eq)] -enum PruningDelay { - /// This pruning should be triggered after this `Duration` from UNIX_EPOCH. - In(Duration), - - /// Data is in the state where it has no expiration. - Indefinite, -} - -impl PruningDelay { - fn now() -> Result { - Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.into()) - } - - fn into_the_future(duration: Duration) -> Result { - Ok(Self::In(SystemTime::now().duration_since(UNIX_EPOCH)? + duration)) - } - - fn as_duration(&self) -> Option { - match self { - PruningDelay::In(d) => Some(*d), - PruningDelay::Indefinite => None, - } - } -} - -impl From for PruningDelay { - fn from(d: Duration) -> Self { - Self::In(d) - } -} - -impl PartialEq for PruningDelay { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (PruningDelay::In(this), PruningDelay::In(that)) => {this == that}, - (PruningDelay::Indefinite, PruningDelay::Indefinite) => true, - _ => false, - } - } -} - -impl PartialOrd for PruningDelay { - fn partial_cmp(&self, other: &Self) -> Option { - match (self, other) { - (PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that), - (PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less), - (PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater), - (PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal), - } - } -} - -impl Ord for PruningDelay { - fn cmp(&self, other: &Self) -> Ordering { - match (self, other) { - (PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that), - (PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less, - (PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater, - (PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal, - } - } -} - -/// A key for chunk pruning records. -const CHUNK_PRUNING_KEY: [u8; 14] = *b"chunks_pruning"; - -/// A key for PoV pruning records. -const POV_PRUNING_KEY: [u8; 11] = *b"pov_pruning"; - -/// A key for a cached value of next scheduled PoV pruning. -const NEXT_POV_PRUNING: [u8; 16] = *b"next_pov_pruning"; - -/// A key for a cached value of next scheduled chunk pruning. -const NEXT_CHUNK_PRUNING: [u8; 18] = *b"next_chunk_pruning"; - -/// The following constants are used under normal conditions: - -/// Stored block is kept available for 1 hour. -const KEEP_STORED_BLOCK_FOR: Duration = Duration::from_secs(60 * 60); - -/// Finalized block is kept for 1 day. -const KEEP_FINALIZED_BLOCK_FOR: Duration = Duration::from_secs(24 * 60 * 60); - -/// Keep chunk of the finalized block for 1 day + 1 hour. -const KEEP_FINALIZED_CHUNK_FOR: Duration = Duration::from_secs(25 * 60 * 60); - -/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of blocks. -/// Essenially this is the first element in the sorted array of pruning data, -/// we just want to cache it here to avoid lifting the whole array just to look at the head. -/// -/// This record exists under `NEXT_POV_PRUNING` key, if it does not either: -/// a) There are no records and nothing has to be pruned. -/// b) There are records but all of them are in `Included` state and do not have exact time to -/// be pruned. -#[derive(Decode, Encode)] -struct NextPoVPruning(Duration); - -impl NextPoVPruning { - // After which duration from `now` this should fire. - fn should_fire_in(&self) -> Result { - Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default()) - } -} - -/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of chunks. -/// Essentially this is the first element in the sorted array of pruning data, -/// we just want to cache it here to avoid lifting the whole array just to look at the head. -/// -/// This record exists under `NEXT_CHUNK_PRUNING` key, if it does not either: -/// a) There are no records and nothing has to be pruned. -/// b) There are records but all of them are in `Included` state and do not have exact time to -/// be pruned. -#[derive(Decode, Encode)] -struct NextChunkPruning(Duration); - -impl NextChunkPruning { - // After which amount of seconds into the future from `now` this should fire. - fn should_fire_in(&self) -> Result { - Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default()) - } -} - /// Struct holding pruning timing configuration. /// The only purpose of this structure is to use different timing /// configurations in production and in testing. #[derive(Clone)] struct PruningConfig { - /// How long should a stored block stay available. - keep_stored_block_for: Duration, + /// How long unavailable data should be kept. + keep_unavailable_for: Duration, - /// How long should a finalized block stay available. - keep_finalized_block_for: Duration, + /// How long finalized data should be kept. + keep_finalized_for: Duration, - /// How long should a chunk of a finalized block stay available. - keep_finalized_chunk_for: Duration, + /// How often to perform data pruning. + pruning_interval: Duration, } impl Default for PruningConfig { fn default() -> Self { Self { - keep_stored_block_for: KEEP_STORED_BLOCK_FOR, - keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR, - keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR, + keep_unavailable_for: KEEP_UNAVAILABLE_FOR, + keep_finalized_for: KEEP_FINALIZED_FOR, + pruning_interval: PRUNING_INTERVAL, } } } -#[derive(Debug, Decode, Encode, Eq, PartialEq)] -enum CandidateState { - Stored, - Included, - Finalized, -} - -#[derive(Debug, Decode, Encode, Eq)] -struct PoVPruningRecord { - candidate_hash: CandidateHash, - block_number: BlockNumber, - candidate_state: CandidateState, - prune_at: PruningDelay, -} - -impl PartialEq for PoVPruningRecord { - fn eq(&self, other: &Self) -> bool { - self.candidate_hash == other.candidate_hash - } -} - -impl Ord for PoVPruningRecord { - fn cmp(&self, other: &Self) -> Ordering { - if self.candidate_hash == other.candidate_hash { - return Ordering::Equal; - } - - self.prune_at.cmp(&other.prune_at) - } -} - -impl PartialOrd for PoVPruningRecord { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -#[derive(Debug, Decode, Encode, Eq)] -struct ChunkPruningRecord { - candidate_hash: CandidateHash, - block_number: BlockNumber, - candidate_state: CandidateState, - chunk_index: u32, - prune_at: PruningDelay, -} - -impl PartialEq for ChunkPruningRecord { - fn eq(&self, other: &Self) -> bool { - self.candidate_hash == other.candidate_hash && - self.chunk_index == other.chunk_index - } -} - -impl Ord for ChunkPruningRecord { - fn cmp(&self, other: &Self) -> Ordering { - if self.candidate_hash == other.candidate_hash { - return Ordering::Equal; - } - - self.prune_at.cmp(&other.prune_at) - } -} - -impl PartialOrd for ChunkPruningRecord { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// An implementation of the Availability Store subsystem. -pub struct AvailabilityStoreSubsystem { - pruning_config: PruningConfig, - inner: Arc, - chunks_cache: HashMap>, - metrics: Metrics, -} - -impl AvailabilityStoreSubsystem { - // Perform pruning of PoVs - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn prune_povs(&mut self) -> Result<(), Error> { - let _timer = self.metrics.time_prune_povs(); - - let mut tx = DBTransaction::new(); - let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default(); - let now = PruningDelay::now()?; - - tracing::trace!(target: LOG_TARGET, "Pruning PoVs"); - let outdated_records_count = pov_pruning.iter() - .take_while(|r| r.prune_at <= now) - .count(); - - for record in pov_pruning.drain(..outdated_records_count) { - tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record"); - - self.chunks_cache.remove(&record.candidate_hash); - tx.delete( - columns::DATA, - available_data_key(&record.candidate_hash).as_slice(), - ); - } - - put_pov_pruning(&self.inner, Some(tx), pov_pruning, &self.metrics)?; - - Ok(()) - } - - // Perform pruning of chunks. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn prune_chunks(&mut self) -> Result<(), Error> { - let _timer = self.metrics.time_prune_chunks(); - - let mut tx = DBTransaction::new(); - let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default(); - let now = PruningDelay::now()?; - - tracing::trace!(target: LOG_TARGET, "Pruning Chunks"); - let outdated_records_count = chunk_pruning.iter() - .take_while(|r| r.prune_at <= now) - .count(); - - for record in chunk_pruning.drain(..outdated_records_count) { - tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record"); - - self.chunks_cache.remove(&record.candidate_hash); - tx.delete( - columns::DATA, - erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(), - ); - } - - put_chunk_pruning(&self.inner, Some(tx), chunk_pruning, &self.metrics)?; - - Ok(()) - } - - // Return a `Future` that either resolves when another PoV pruning has to happen - // or is indefinitely `pending` in case no pruning has to be done. - // Just a helper to `select` over multiple things at once. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn maybe_prune_povs(&self) -> Result, Error> { - let future = match get_next_pov_pruning_time(&self.inner) { - Some(pruning) => { - Either::Left(Delay::new(pruning.should_fire_in()?)) - } - None => Either::Right(future::pending::<()>()), - }; - - Ok(future) - } - - // Return a `Future` that either resolves when another chunk pruning has to happen - // or is indefinitely `pending` in case no pruning has to be done. - // Just a helper to `select` over multiple things at once. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn maybe_prune_chunks(&self) -> Result, Error> { - let future = match get_next_chunk_pruning_time(&self.inner) { - Some(pruning) => { - Either::Left(Delay::new(pruning.should_fire_in()?)) - } - None => Either::Right(future::pending::<()>()), - }; - - Ok(future) - } -} - -fn available_data_key(candidate_hash: &CandidateHash) -> Vec { - (candidate_hash, 0i8).encode() -} - -fn erasure_chunk_key(candidate_hash: &CandidateHash, index: u32) -> Vec { - (candidate_hash, index, 0i8).encode() -} - -#[derive(Encode, Decode)] -struct StoredAvailableData { - data: AvailableData, - n_validators: u32, -} - /// Configuration for the availability store. pub struct Config { /// Total cache size in megabytes. If `None` the default (128 MiB per column) is used. @@ -448,6 +426,27 @@ impl std::convert::TryFrom for Config { } } +trait Clock: Send + Sync { + // Returns time since unix epoch. + fn now(&self) -> Result; +} + +struct SystemClock; + +impl Clock for SystemClock { + fn now(&self) -> Result { + SystemTime::now().duration_since(UNIX_EPOCH).map_err(Into::into) + } +} + +/// An implementation of the Availability Store subsystem. +pub struct AvailabilityStoreSubsystem { + pruning_config: PruningConfig, + db: Arc, + metrics: Metrics, + clock: Box, +} + impl AvailabilityStoreSubsystem { /// Create a new `AvailabilityStoreSubsystem` with a given config on disk. pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result { @@ -472,624 +471,23 @@ impl AvailabilityStoreSubsystem { Ok(Self { pruning_config: PruningConfig::default(), - inner: Arc::new(db), - chunks_cache: HashMap::new(), + db: Arc::new(db), metrics, + clock: Box::new(SystemClock), }) } #[cfg(test)] - fn new_in_memory(inner: Arc, pruning_config: PruningConfig) -> Self { + fn new_in_memory( + db: Arc, + pruning_config: PruningConfig, + clock: Box, + ) -> Self { Self { pruning_config, - inner, - chunks_cache: HashMap::new(), + db, metrics: Metrics(None), - } - } -} - -fn get_next_pov_pruning_time(db: &Arc) -> Option { - query_inner(db, columns::META, &NEXT_POV_PRUNING) -} - -fn get_next_chunk_pruning_time(db: &Arc) -> Option { - query_inner(db, columns::META, &NEXT_CHUNK_PRUNING) -} - -#[tracing::instrument(skip(subsystem, ctx), fields(subsystem = LOG_TARGET))] -async fn run(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) -where - Context: SubsystemContext, -{ - loop { - let res = run_iteration(&mut subsystem, &mut ctx).await; - match res { - Err(e) => { - e.trace(); - - if let Error::Subsystem(SubsystemError::Context(_)) = e { - break; - } - } - Ok(true) => { - tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); - break; - }, - Ok(false) => continue, - } - } -} - -#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))] -async fn run_iteration(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context) - -> Result -where - Context: SubsystemContext, -{ - // Every time the following two methods are called a read from DB is performed. - // But given that these are very small values which are essentially a newtype - // wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the - // fact of the frequent reads itself we assume these to end up cached in the memory - // anyway and thus these db reads to be reasonably fast. - let pov_pruning_time = subsystem.maybe_prune_povs()?; - let chunk_pruning_time = subsystem.maybe_prune_chunks()?; - - let mut pov_pruning_time = pov_pruning_time.fuse(); - let mut chunk_pruning_time = chunk_pruning_time.fuse(); - - select! { - incoming = ctx.recv().fuse() => { - match incoming? { - FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true), - FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate { activated, .. }) - ) => { - for (activated, _span) in activated.into_iter() { - process_block_activated(ctx, subsystem, activated).await?; - } - } - FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { - process_block_finalized(subsystem, &subsystem.inner, number).await?; - } - FromOverseer::Communication { msg } => { - process_message(subsystem, ctx, msg).await?; - } - } - } - _ = pov_pruning_time => { - subsystem.prune_povs()?; - } - _ = chunk_pruning_time => { - subsystem.prune_chunks()?; - } - complete => return Ok(true), - } - - Ok(false) -} - -/// As soon as certain block is finalized its pruning records and records of all -/// blocks that we keep that are `older` than the block in question have to be updated. -/// -/// The state of data has to be changed from -/// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have -/// to be updated to `now` + keep_finalized_{block, chunk}_for`. -#[tracing::instrument(level = "trace", skip(subsystem, db), fields(subsystem = LOG_TARGET))] -async fn process_block_finalized( - subsystem: &AvailabilityStoreSubsystem, - db: &Arc, - block_number: BlockNumber, -) -> Result<(), Error> { - let _timer = subsystem.metrics.time_process_block_finalized(); - - if let Some(mut pov_pruning) = pov_pruning(db) { - // Since the records are sorted by time in which they need to be pruned and not by block - // numbers we have to iterate through the whole collection here. - for record in pov_pruning.iter_mut() { - if record.block_number <= block_number { - tracing::trace!( - target: LOG_TARGET, - block_number = %record.block_number, - "Updating pruning record for finalized block", - ); - - record.prune_at = PruningDelay::into_the_future( - subsystem.pruning_config.keep_finalized_block_for - )?; - record.candidate_state = CandidateState::Finalized; - } - } - - put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?; - } - - if let Some(mut chunk_pruning) = chunk_pruning(db) { - for record in chunk_pruning.iter_mut() { - if record.block_number <= block_number { - tracing::trace!( - target: LOG_TARGET, - block_number = %record.block_number, - "Updating chunk pruning record for finalized block", - ); - - record.prune_at = PruningDelay::into_the_future( - subsystem.pruning_config.keep_finalized_chunk_for - )?; - record.candidate_state = CandidateState::Finalized; - } - } - - put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?; - } - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip(ctx, subsystem), fields(subsystem = LOG_TARGET))] -async fn process_block_activated( - ctx: &mut Context, - subsystem: &mut AvailabilityStoreSubsystem, - hash: Hash, -) -> Result<(), Error> -where - Context: SubsystemContext -{ - let _timer = subsystem.metrics.time_block_activated(); - let db = &subsystem.inner; - - let events = match request_candidate_events(ctx, hash).await { - Ok(events) => events, - Err(err) => { - tracing::debug!(target: LOG_TARGET, err = ?err, "requesting candidate events failed"); - return Ok(()); - } - }; - - tracing::trace!(target: LOG_TARGET, hash = %hash, "block activated"); - let mut included = HashSet::new(); - - for event in events.into_iter() { - if let CandidateEvent::CandidateIncluded(receipt, _) = event { - tracing::trace!( - target: LOG_TARGET, - hash = %receipt.hash(), - "Candidate {:?} was included", receipt.hash(), - ); - included.insert(receipt.hash()); - } - } - - for included in &included { - subsystem.chunks_cache.remove(&included); - } - - if let Some(mut pov_pruning) = pov_pruning(db) { - for record in pov_pruning.iter_mut() { - if included.contains(&record.candidate_hash) { - record.prune_at = PruningDelay::Indefinite; - record.candidate_state = CandidateState::Included; - } - } - - pov_pruning.sort(); - - put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?; - } - - if let Some(mut chunk_pruning) = chunk_pruning(db) { - for record in chunk_pruning.iter_mut() { - if included.contains(&record.candidate_hash) { - record.prune_at = PruningDelay::Indefinite; - record.candidate_state = CandidateState::Included; - } - } - - chunk_pruning.sort(); - - put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?; - } - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn request_candidate_events( - ctx: &mut Context, - hash: Hash, -) -> Result, Error> -where - Context: SubsystemContext -{ - let (tx, rx) = oneshot::channel(); - - let msg = AllMessages::RuntimeApi(RuntimeApiMessage::Request( - hash, - RuntimeApiRequest::CandidateEvents(tx), - )); - - ctx.send_message(msg.into()).await; - - Ok(rx.await??) -} - -#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))] -async fn process_message( - subsystem: &mut AvailabilityStoreSubsystem, - ctx: &mut Context, - msg: AvailabilityStoreMessage, -) -> Result<(), Error> -where - Context: SubsystemContext -{ - use AvailabilityStoreMessage::*; - - let _timer = subsystem.metrics.time_process_message(); - - match msg { - QueryAvailableData(hash, tx) => { - tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?; - } - QueryDataAvailability(hash, tx) => { - let result = available_data(&subsystem.inner, &hash).is_some(); - - tracing::trace!( - target: LOG_TARGET, - candidate_hash = ?hash, - availability = ?result, - "Queried data availability", - ); - - tx.send(result).map_err(|_| oneshot::Canceled)?; - } - QueryChunk(hash, id, tx) => { - tx.send(get_chunk(subsystem, &hash, id)?).map_err(|_| oneshot::Canceled)?; - } - QueryChunkAvailability(hash, id, tx) => { - let result = get_chunk(subsystem, &hash, id).map(|r| r.is_some()); - - tracing::trace!( - target: LOG_TARGET, - candidate_hash = ?hash, - availability = ?result, - "Queried chunk availability", - ); - - tx.send(result?).map_err(|_| oneshot::Canceled)?; - } - StoreChunk { candidate_hash, relay_parent, chunk, tx } => { - let chunk_index = chunk.index; - // Current block number is relay_parent block number + 1. - let block_number = get_block_number(ctx, relay_parent).await? + 1; - let result = store_chunks(subsystem, &candidate_hash, vec![chunk], block_number); - - tracing::trace!( - target: LOG_TARGET, - %chunk_index, - ?candidate_hash, - %block_number, - ?result, - "Stored chunk", - ); - - match result { - Err(e) => { - tx.send(Err(())).map_err(|_| oneshot::Canceled)?; - return Err(e); - } - Ok(()) => { - tx.send(Ok(())).map_err(|_| oneshot::Canceled)?; - } - } - } - StoreAvailableData(hash, id, n_validators, av_data, tx) => { - let result = store_available_data(subsystem, &hash, id, n_validators, av_data); - - tracing::trace!(target: LOG_TARGET, candidate_hash = ?hash, ?result, "Stored available data"); - - match result { - Err(e) => { - tx.send(Err(())).map_err(|_| oneshot::Canceled)?; - return Err(e); - } - Ok(()) => { - tx.send(Ok(())).map_err(|_| oneshot::Canceled)?; - } - } - } - } - - Ok(()) -} - -fn available_data( - db: &Arc, - candidate_hash: &CandidateHash, -) -> Option { - query_inner(db, columns::DATA, &available_data_key(candidate_hash)) -} - -fn pov_pruning(db: &Arc) -> Option> { - query_inner(db, columns::META, &POV_PRUNING_KEY) -} - -fn chunk_pruning(db: &Arc) -> Option> { - query_inner(db, columns::META, &CHUNK_PRUNING_KEY) -} - -#[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))] -fn put_pov_pruning( - db: &Arc, - tx: Option, - mut pov_pruning: Vec, - metrics: &Metrics, -) -> Result<(), Error> { - let mut tx = tx.unwrap_or_default(); - - metrics.block_pruning_records_size(pov_pruning.len()); - - pov_pruning.sort(); - - tx.put_vec( - columns::META, - &POV_PRUNING_KEY, - pov_pruning.encode(), - ); - - match pov_pruning.get(0) { - // We want to wake up in case we have some records that are not scheduled to be kept - // indefinitely (data is included and waiting to move to the finalized state) and so - // the is at least one value that is not `PruningDelay::Indefinite`. - Some(PoVPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => { - tx.put_vec( - columns::META, - &NEXT_POV_PRUNING, - NextPoVPruning(*prune_at).encode(), - ); - } - _ => { - // If there is no longer any records, delete the cached pruning time record. - tx.delete( - columns::META, - &NEXT_POV_PRUNING, - ); - } - } - - db.write(tx)?; - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))] -fn put_chunk_pruning( - db: &Arc, - tx: Option, - mut chunk_pruning: Vec, - metrics: &Metrics, -) -> Result<(), Error> { - let mut tx = tx.unwrap_or_default(); - - metrics.chunk_pruning_records_size(chunk_pruning.len()); - - chunk_pruning.sort(); - - tx.put_vec( - columns::META, - &CHUNK_PRUNING_KEY, - chunk_pruning.encode(), - ); - - match chunk_pruning.get(0) { - Some(ChunkPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => { - tx.put_vec( - columns::META, - &NEXT_CHUNK_PRUNING, - NextChunkPruning(*prune_at).encode(), - ); - } - _ => { - tx.delete( - columns::META, - &NEXT_CHUNK_PRUNING, - ); - } - } - - db.write(tx)?; - - Ok(()) -} - -// produces a block number by block's hash. -// in the the event of an invalid `block_hash`, returns `Ok(0)` -async fn get_block_number( - ctx: &mut Context, - block_hash: Hash, -) -> Result -where - Context: SubsystemContext, -{ - let (tx, rx) = oneshot::channel(); - - ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await; - - Ok(rx.await??.map(|number| number).unwrap_or_default()) -} - -#[tracing::instrument(level = "trace", skip(subsystem, available_data), fields(subsystem = LOG_TARGET))] -fn store_available_data( - subsystem: &mut AvailabilityStoreSubsystem, - candidate_hash: &CandidateHash, - id: Option, - n_validators: u32, - available_data: AvailableData, -) -> Result<(), Error> { - let _timer = subsystem.metrics.time_store_available_data(); - - let mut tx = DBTransaction::new(); - - let block_number = available_data.validation_data.block_number; - - let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?; - store_chunks( - subsystem, - candidate_hash, - chunks, - block_number, - )?; - - let stored_data = StoredAvailableData { - data: available_data, - n_validators, - }; - - let mut pov_pruning = pov_pruning(&subsystem.inner).unwrap_or_default(); - let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?; - - if let Some(next_pruning) = prune_at.as_duration() { - tx.put_vec( - columns::META, - &NEXT_POV_PRUNING, - NextPoVPruning(next_pruning).encode(), - ); - } - - let pruning_record = PoVPruningRecord { - candidate_hash: *candidate_hash, - block_number, - candidate_state: CandidateState::Stored, - prune_at, - }; - - let idx = pov_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); - - pov_pruning.insert(idx, pruning_record); - - tx.put_vec( - columns::DATA, - available_data_key(&candidate_hash).as_slice(), - stored_data.encode(), - ); - - tx.put_vec( - columns::META, - &POV_PRUNING_KEY, - pov_pruning.encode(), - ); - - subsystem.inner.write(tx)?; - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))] -fn store_chunks( - subsystem: &mut AvailabilityStoreSubsystem, - candidate_hash: &CandidateHash, - chunks: Vec, - block_number: BlockNumber, -) -> Result<(), Error> { - let _timer = subsystem.metrics.time_store_chunks(); - - let mut tx = DBTransaction::new(); - let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default(); - - let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?; - if let Some(delay) = prune_at.clone().as_duration() { - tx.put_vec( - columns::META, - &NEXT_CHUNK_PRUNING, - NextChunkPruning(delay).encode(), - ); - } - - for chunk in &chunks { - let pruning_record = ChunkPruningRecord { - candidate_hash: candidate_hash.clone(), - block_number, - candidate_state: CandidateState::Stored, - chunk_index: chunk.index, - prune_at: prune_at.clone(), - }; - - let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); - - chunk_pruning.insert(idx, pruning_record); - - let dbkey = erasure_chunk_key(candidate_hash, chunk.index); - - tx.put_vec( - columns::DATA, - &dbkey, - chunk.encode(), - ); - } - - subsystem.chunks_cache.entry(*candidate_hash).or_default().extend(chunks.into_iter().map(|c| (c.index, c))); - - tx.put_vec( - columns::META, - &CHUNK_PRUNING_KEY, - chunk_pruning.encode(), - ); - - subsystem.inner.write(tx)?; - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))] -fn get_chunk( - subsystem: &mut AvailabilityStoreSubsystem, - candidate_hash: &CandidateHash, - index: u32, -) -> Result, Error> { - let _timer = subsystem.metrics.time_get_chunk(); - - if let Some(entry) = subsystem.chunks_cache.get(candidate_hash) { - if let Some(chunk) = entry.get(&index) { - return Ok(Some(chunk.clone())); - } - } - - if let Some(chunk) = query_inner( - &subsystem.inner, - columns::DATA, - &erasure_chunk_key(candidate_hash, index) - ) { - return Ok(Some(chunk)); - } - - if let Some(data) = available_data(&subsystem.inner, candidate_hash) { - let chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?; - let desired_chunk = chunks.get(index as usize).cloned(); - store_chunks( - subsystem, - candidate_hash, - chunks, - data.data.validation_data.block_number, - )?; - return Ok(desired_chunk); - } - - Ok(None) -} - -fn query_inner( - db: &Arc, - column: u32, - key: &[u8], -) -> Option { - match db.get(column, key) { - Ok(Some(raw)) => { - let res = D::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed"); - Some(res) - } - Ok(None) => None, - Err(e) => { - tracing::warn!(target: LOG_TARGET, err = ?e, "Error reading from the availability store"); - None + clock, } } } @@ -1110,37 +508,663 @@ where } } -#[tracing::instrument(level = "trace", skip(metrics), fields(subsystem = LOG_TARGET))] -fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result, Error> { - let chunks = erasure::obtain_chunks_v1(n_validators, data)?; - metrics.on_chunks_received(chunks.len()); +#[tracing::instrument(skip(subsystem, ctx), fields(subsystem = LOG_TARGET))] +async fn run(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) +where + Context: SubsystemContext, +{ + let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse(); + + loop { + let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await; + match res { + Err(e) => { + e.trace(); + + if let Error::Subsystem(SubsystemError::Context(_)) = e { + break; + } + } + Ok(true) => { + tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); + break; + }, + Ok(false) => continue, + } + } +} + +#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))] +async fn run_iteration( + ctx: &mut Context, + subsystem: &mut AvailabilityStoreSubsystem, + mut next_pruning: &mut future::Fuse, +) + -> Result +where + Context: SubsystemContext, +{ + select! { + incoming = ctx.recv().fuse() => { + match incoming? { + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true), + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate { activated, .. }) + ) => { + for (activated, _span) in activated.into_iter() { + let _timer = subsystem.metrics.time_block_activated(); + process_block_activated(ctx, subsystem, activated).await?; + } + } + FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => { + let _timer = subsystem.metrics.time_process_block_finalized(); + + process_block_finalized( + ctx, + &subsystem, + hash, + number, + ).await?; + } + FromOverseer::Communication { msg } => { + let _timer = subsystem.metrics.time_process_message(); + process_message(subsystem, msg)?; + } + } + } + _ = next_pruning => { + // It's important to set the delay before calling `prune_all` because an error in `prune_all` + // could lead to the delay not being set again. Then we would never prune anything anymore. + *next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse(); + + let _timer = subsystem.metrics.time_pruning(); + prune_all(&subsystem.db, &*subsystem.clock)?; + } + } + + Ok(false) +} + +async fn process_block_activated( + ctx: &mut impl SubsystemContext, + subsystem: &mut AvailabilityStoreSubsystem, + activated: Hash, +) -> Result<(), Error> { + let now = subsystem.clock.now()?; + + let candidate_events = { + let (tx, rx) = oneshot::channel(); + ctx.send_message( + RuntimeApiMessage::Request(activated, RuntimeApiRequest::CandidateEvents(tx)).into() + ).await; + + rx.await?? + }; + + let block_number = { + let (tx, rx) = oneshot::channel(); + ctx.send_message( + ChainApiMessage::BlockNumber(activated, tx).into() + ).await; + + match rx.await?? { + None => return Ok(()), + Some(n) => n, + } + }; + + let block_header = { + let (tx, rx) = oneshot::channel(); + + ctx.send_message( + ChainApiMessage::BlockHeader(activated, tx).into() + ).await; + + match rx.await?? { + None => return Ok(()), + Some(n) => n, + } + }; + + // We need to request the number of validators based on the parent state, as that is the number of validators + // used to create this block. + let n_validators = { + let (tx, rx) = oneshot::channel(); + ctx.send_message( + RuntimeApiMessage::Request(block_header.parent_hash, RuntimeApiRequest::Validators(tx)).into() + ).await; + + rx.await??.len() + }; + + let mut tx = DBTransaction::new(); + + for event in candidate_events { + match event { + CandidateEvent::CandidateBacked(receipt, _head) => { + note_block_backed( + &subsystem.db, + &mut tx, + &subsystem.pruning_config, + now, + n_validators, + receipt, + )?; + } + CandidateEvent::CandidateIncluded(receipt, _head) => { + note_block_included( + &subsystem.db, + &mut tx, + &subsystem.pruning_config, + (block_number, activated), + receipt, + )?; + } + _ => {} + } + } + + subsystem.db.write(tx)?; + + Ok(()) +} + +fn note_block_backed( + db: &Arc, + db_transaction: &mut DBTransaction, + pruning_config: &PruningConfig, + now: Duration, + n_validators: usize, + candidate: CandidateReceipt, +) -> Result<(), Error> { + let candidate_hash = candidate.hash(); + + if load_meta(db, &candidate_hash)?.is_none() { + let meta = CandidateMeta { + state: State::Unavailable(now.into()), + data_available: false, + chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], + }; + + let prune_at = now + pruning_config.keep_unavailable_for; + + write_pruning_key(db_transaction, prune_at, &candidate_hash); + write_meta(db_transaction, &candidate_hash, &meta); + } + + Ok(()) +} + +fn note_block_included( + db: &Arc, + db_transaction: &mut DBTransaction, + pruning_config:&PruningConfig, + block: (BlockNumber, Hash), + candidate: CandidateReceipt, +) -> Result<(), Error> { + let candidate_hash = candidate.hash(); + + match load_meta(db, &candidate_hash)? { + None => { + // This is alarming. We've observed a block being included without ever seeing it backed. + // Warn and ignore. + tracing::warn!( + target: LOG_TARGET, + "Candidate {}, included without being backed?", + candidate_hash, + ); + } + Some(mut meta) => { + let be_block = (BEBlockNumber(block.0), block.1); + + meta.state = match meta.state { + State::Unavailable(at) => { + let at_d: Duration = at.into(); + let prune_at = at_d + pruning_config.keep_unavailable_for; + delete_pruning_key(db_transaction, prune_at, &candidate_hash); + + State::Unfinalized(at, vec![be_block]) + } + State::Unfinalized(at, mut within) => { + if let Err(i) = within.binary_search(&be_block) { + within.insert(i, be_block); + } + + State::Unfinalized(at, within) + } + State::Finalized(_at) => { + // This should never happen as a candidate would have to be included after + // finality. + return Ok(()) + } + }; + + write_unfinalized_block_contains(db_transaction, block.0, &block.1, &candidate_hash); + write_meta(db_transaction, &candidate_hash, &meta); + } + } + + Ok(()) +} + +macro_rules! peek_num { + ($iter:ident) => { + match $iter.peek() { + Some((k, _)) => decode_unfinalized_key(&k[..]).ok().map(|(b, _, _)| b), + None => None + } + } +} + +async fn process_block_finalized( + ctx: &mut impl SubsystemContext, + subsystem: &AvailabilityStoreSubsystem, + finalized_hash: Hash, + finalized_number: BlockNumber, +) -> Result<(), Error> { + let now = subsystem.clock.now()?; + + let mut next_possible_batch = 0; + loop { + let mut db_transaction = DBTransaction::new(); + let (start_prefix, end_prefix) = finalized_block_range(finalized_number); + + // We have to do some juggling here of the `iter` to make sure it doesn't cross the `.await` boundary + // as it is not `Send`. That is why we create the iterator once within this loop, drop it, + // do an asynchronous request, and then instantiate the exact same iterator again. + let batch_num = { + let mut iter = subsystem.db.iter_with_prefix(columns::META, &start_prefix) + .take_while(|(k, _)| &k[..] < &end_prefix[..]) + .peekable(); + + match peek_num!(iter) { + None => break, // end of iterator. + Some(n) => n, + } + }; + + if batch_num < next_possible_batch { continue } // sanity. + next_possible_batch = batch_num + 1; + + let batch_finalized_hash = if batch_num == finalized_number { + finalized_hash + } else { + let (tx, rx) = oneshot::channel(); + ctx.send_message(ChainApiMessage::FinalizedBlockHash(batch_num, tx).into()).await; + + match rx.await?? { + None => { + tracing::warn!(target: LOG_TARGET, + "Availability store was informed that block #{} is finalized, \ + but chain API has no finalized hash.", + batch_num, + ); + + break + } + Some(h) => h, + } + }; + + let iter = subsystem.db.iter_with_prefix(columns::META, &start_prefix) + .take_while(|(k, _)| &k[..] < &end_prefix[..]) + .peekable(); + + let batch = load_all_at_finalized_height(iter, batch_num, batch_finalized_hash); + + // Now that we've iterated over the entire batch at this finalized height, + // update the meta. + + delete_unfinalized_height(&mut db_transaction, batch_num); + + update_blocks_at_finalized_height( + &subsystem, + &mut db_transaction, + batch, + batch_num, + now, + )?; + + // We need to write at the end of the loop so the prefix iterator doesn't pick up the same values again + // in the next iteration. Another unfortunate effect of having to re-initialize the iterator. + subsystem.db.write(db_transaction)?; + } + + Ok(()) +} + +// loads all candidates at the finalized height and maps them to `true` if finalized +// and `false` if unfinalized. +fn load_all_at_finalized_height( + mut iter: std::iter::Peekable, Box<[u8]>)>>, + block_number: BlockNumber, + finalized_hash: Hash, +) -> impl IntoIterator { + // maps candidate hashes to true if finalized, false otherwise. + let mut candidates = HashMap::new(); + + // Load all candidates that were included at this height. + loop { + match peek_num!(iter) { + None => break, // end of iterator. + Some(n) if n != block_number => break, // end of batch. + _ => {} + } + + let (k, _v) = iter.next().expect("`peek` used to check non-empty; qed"); + let (_, block_hash, candidate_hash) = decode_unfinalized_key(&k[..]) + .expect("`peek_num` checks validity of key; qed"); + + if block_hash == finalized_hash { + candidates.insert(candidate_hash, true); + } else { + candidates.entry(candidate_hash).or_insert(false); + } + } + + candidates +} + +fn update_blocks_at_finalized_height( + subsystem: &AvailabilityStoreSubsystem, + db_transaction: &mut DBTransaction, + candidates: impl IntoIterator, + block_number: BlockNumber, + now: Duration, +) -> Result<(), Error> { + for (candidate_hash, is_finalized) in candidates { + let mut meta = match load_meta(&subsystem.db, &candidate_hash)? { + None => { + tracing::warn!( + target: LOG_TARGET, + "Dangling candidate metadata for {}", + candidate_hash, + ); + + continue; + } + Some(c) => c, + }; + + if is_finalized { + // Clear everything else related to this block. We're finalized now! + match meta.state { + State::Finalized(_) => continue, // sanity + State::Unavailable(at) => { + // This is also not going to happen; the very fact that we are + // iterating over the candidate here indicates that `State` should + // be `Unfinalized`. + delete_pruning_key(db_transaction, at, &candidate_hash); + } + State::Unfinalized(_, blocks) => { + for (block_num, block_hash) in blocks.iter().cloned() { + // this exact height is all getting cleared out anyway. + if block_num.0 != block_number { + delete_unfinalized_inclusion( + db_transaction, + block_num.0, + &block_hash, + &candidate_hash, + ); + } + } + } + } + + meta.state = State::Finalized(now.into()); + + // Write the meta and a pruning record. + write_meta(db_transaction, &candidate_hash, &meta); + write_pruning_key( + db_transaction, + now + subsystem.pruning_config.keep_finalized_for, + &candidate_hash, + ); + } else { + meta.state = match meta.state { + State::Finalized(_) => continue, // sanity. + State::Unavailable(_) => continue, // sanity. + State::Unfinalized(at, mut blocks) => { + // Clear out everything at this height. + blocks.retain(|(n, _)| n.0 != block_number); + + // If empty, we need to go back to being unavailable as we aren't + // aware of any blocks this is included in. + if blocks.is_empty() { + let at_d: Duration = at.into(); + let prune_at = at_d + subsystem.pruning_config.keep_unavailable_for; + write_pruning_key(db_transaction, prune_at, &candidate_hash); + State::Unavailable(at) + } else { + State::Unfinalized(at, blocks) + } + } + }; + + // Update the meta entry. + write_meta(db_transaction, &candidate_hash, &meta) + } + } + + Ok(()) +} + +fn process_message( + subsystem: &mut AvailabilityStoreSubsystem, + msg: AvailabilityStoreMessage, +) -> Result<(), Error> { + match msg { + AvailabilityStoreMessage::QueryAvailableData(candidate, tx) => { + let _ = tx.send(load_available_data(&subsystem.db, &candidate)?); + } + AvailabilityStoreMessage::QueryDataAvailability(candidate, tx) => { + let a = load_meta(&subsystem.db, &candidate)?.map_or(false, |m| m.data_available); + let _ = tx.send(a); + } + AvailabilityStoreMessage::QueryChunk(candidate, validator_index, tx) => { + let _timer = subsystem.metrics.time_get_chunk(); + let _ = tx.send(load_chunk(&subsystem.db, &candidate, validator_index)?); + } + AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => { + let a = load_meta(&subsystem.db, &candidate)? + .map_or(false, |m| *m.chunks_stored.get(validator_index as usize).unwrap_or(&false)); + let _ = tx.send(a); + } + AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent: _, + chunk, + tx, + } => { + subsystem.metrics.on_chunks_received(1); + let _timer = subsystem.metrics.time_store_chunk(); + + match store_chunk(&subsystem.db, candidate_hash, chunk) { + Ok(true) => { + let _ = tx.send(Ok(())); + } + Ok(false) => { + let _ = tx.send(Err(())); + } + Err(e) => { + let _ = tx.send(Err(())); + return Err(e) + } + } + } + AvailabilityStoreMessage::StoreAvailableData(candidate, _our_index, n_validators, available_data, tx) => { + subsystem.metrics.on_chunks_received(n_validators as _); + + let _timer = subsystem.metrics.time_store_available_data(); + + let res = store_available_data( + &subsystem, + candidate, + n_validators as _, + available_data, + ); + + match res { + Ok(()) => { + let _ = tx.send(Ok(())); + } + Err(e) => { + let _ = tx.send(Err(())); + return Err(e) + } + } + } + } + + Ok(()) +} + +// Ok(true) on success, Ok(false) on failure, and Err on internal error. +fn store_chunk( + db: &Arc, + candidate_hash: CandidateHash, + chunk: ErasureChunk, +) -> Result { + let mut tx = DBTransaction::new(); + + let mut meta = match load_meta(db, &candidate_hash)? { + Some(m) => m, + None => return Ok(false), // we weren't informed of this candidate by import events. + }; + + match meta.chunks_stored.get(chunk.index as usize).map(|b| *b) { + Some(true) => return Ok(true), // already stored. + Some(false) => { + meta.chunks_stored.set(chunk.index as usize, true); + + write_chunk(&mut tx, &candidate_hash, chunk.index, &chunk); + write_meta(&mut tx, &candidate_hash, &meta); + } + None => return Ok(false), // out of bounds. + } + + db.write(tx)?; + Ok(true) +} + +// Ok(true) on success, Ok(false) on failure, and Err on internal error. +fn store_available_data( + subsystem: &AvailabilityStoreSubsystem, + candidate_hash: CandidateHash, + n_validators: usize, + available_data: AvailableData, +) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + + let mut meta = match load_meta(&subsystem.db, &candidate_hash)? { + Some(m) => { + if m.data_available { + return Ok(()); // already stored. + } + + m + }, + None => { + let now = subsystem.clock.now()?; + + // Write a pruning record. + let prune_at = now + subsystem.pruning_config.keep_unavailable_for; + write_pruning_key(&mut tx, prune_at, &candidate_hash); + + CandidateMeta { + state: State::Unavailable(now.into()), + data_available: false, + chunks_stored: BitVec::new(), + } + } + }; + + let chunks = erasure::obtain_chunks_v1(n_validators, &available_data)?; let branches = erasure::branches(chunks.as_ref()); - Ok(chunks - .iter() + let erasure_chunks = chunks.iter() .zip(branches.map(|(proof, _)| proof)) .enumerate() .map(|(index, (chunk, proof))| ErasureChunk { chunk: chunk.clone(), proof, index: index as u32, - }) - .collect() - ) + }); + + for chunk in erasure_chunks { + write_chunk(&mut tx, &candidate_hash, chunk.index, &chunk); + } + + meta.data_available = true; + meta.chunks_stored = bitvec::bitvec![BitOrderLsb0, u8; 1; n_validators]; + + write_meta(&mut tx, &candidate_hash, &meta); + write_available_data(&mut tx, &candidate_hash, &available_data); + + subsystem.db.write(tx)?; + Ok(()) +} + +fn prune_all(db: &Arc, clock: &dyn Clock) -> Result<(), Error> { + let now = clock.now()?; + let (range_start, range_end) = pruning_range(now); + + let mut tx = DBTransaction::new(); + let iter = db.iter_with_prefix(columns::META, &range_start[..]) + .take_while(|(k, _)| &k[..] < &range_end[..]); + + for (k, _v) in iter { + tx.delete(columns::META, &k[..]); + + let (_, candidate_hash) = match decode_pruning_key(&k[..]) { + Ok(m) => m, + Err(_) => continue, // sanity + }; + + delete_meta(&mut tx, &candidate_hash); + + // Clean up all attached data of the candidate. + if let Some(meta) = load_meta(db, &candidate_hash)? { + // delete available data. + if meta.data_available { + delete_available_data(&mut tx, &candidate_hash) + } + + // delete chunks. + for (i, b) in meta.chunks_stored.iter().enumerate() { + if *b { + delete_chunk(&mut tx, &candidate_hash, i as _); + } + } + + // delete unfinalized block references. Pruning references don't need to be + // manually taken care of as we are deleting them as we go in the outer loop. + if let State::Unfinalized(_, blocks) = meta.state { + for (block_number, block_hash) in blocks { + delete_unfinalized_inclusion( + &mut tx, + block_number.0, + &block_hash, + &candidate_hash, + ); + } + } + } + } + + db.write(tx)?; + Ok(()) } #[derive(Clone)] struct MetricsInner { received_availability_chunks_total: prometheus::Counter, - chunk_pruning_records_total: prometheus::Gauge, - block_pruning_records_total: prometheus::Gauge, - prune_povs: prometheus::Histogram, - prune_chunks: prometheus::Histogram, + pruning: prometheus::Histogram, process_block_finalized: prometheus::Histogram, block_activated: prometheus::Histogram, process_message: prometheus::Histogram, store_available_data: prometheus::Histogram, - store_chunks: prometheus::Histogram, + store_chunk: prometheus::Histogram, get_chunk: prometheus::Histogram, } @@ -1158,30 +1182,9 @@ impl Metrics { } } - fn chunk_pruning_records_size(&self, count: usize) { - if let Some(metrics) = &self.0 { - use core::convert::TryFrom as _; - let total = u64::try_from(count).unwrap_or_default(); - metrics.chunk_pruning_records_total.set(total); - } - } - - fn block_pruning_records_size(&self, count: usize) { - if let Some(metrics) = &self.0 { - use core::convert::TryFrom as _; - let total = u64::try_from(count).unwrap_or_default(); - metrics.block_pruning_records_total.set(total); - } - } - /// Provide a timer for `prune_povs` which observes on drop. - fn time_prune_povs(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.prune_povs.start_timer()) - } - - /// Provide a timer for `prune_chunks` which observes on drop. - fn time_prune_chunks(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.prune_chunks.start_timer()) + fn time_pruning(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.pruning.start_timer()) } /// Provide a timer for `process_block_finalized` which observes on drop. @@ -1205,8 +1208,8 @@ impl Metrics { } /// Provide a timer for `store_chunk` which observes on drop. - fn time_store_chunks(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.store_chunks.start_timer()) + fn time_store_chunk(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) } /// Provide a timer for `get_chunk` which observes on drop. @@ -1225,34 +1228,11 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - chunk_pruning_records_total: prometheus::register( - prometheus::Gauge::new( - "parachain_chunk_pruning_records_total", - "Number of chunk pruning records kept by the storage.", - )?, - registry, - )?, - block_pruning_records_total: prometheus::register( - prometheus::Gauge::new( - "parachain_block_pruning_records_total", - "Number of block pruning records kept by the storage.", - )?, - registry, - )?, - prune_povs: prometheus::register( + pruning: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( - "parachain_av_store_prune_povs", - "Time spent within `av_store::prune_povs`", - ) - )?, - registry, - )?, - prune_chunks: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "parachain_av_store_prune_chunks", - "Time spent within `av_store::prune_chunks`", + "parachain_av_store_pruning", + "Time spent within `av_store::prune_all`", ) )?, registry, @@ -1261,7 +1241,7 @@ impl metrics::Metrics for Metrics { prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( "parachain_av_store_process_block_finalized", - "Time spent within `av_store::block_finalized`", + "Time spent within `av_store::process_block_finalized`", ) )?, registry, @@ -1270,7 +1250,7 @@ impl metrics::Metrics for Metrics { prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( "parachain_av_store_block_activated", - "Time spent within `av_store::block_activated`", + "Time spent within `av_store::process_block_activated`", ) )?, registry, @@ -1293,11 +1273,11 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - store_chunks: prometheus::register( + store_chunk: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( - "parachain_av_store_store_chunks", - "Time spent within `av_store::store_chunks`", + "parachain_av_store_store_chunk", + "Time spent within `av_store::store_chunk`", ) )?, registry, @@ -1306,7 +1286,7 @@ impl metrics::Metrics for Metrics { prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( "parachain_av_store_get_chunk", - "Time spent within `av_store::get_chunk`", + "Time spent fetching requested chunks.`", ) )?, registry, @@ -1315,6 +1295,3 @@ impl metrics::Metrics for Metrics { Ok(Metrics(Some(metrics))) } } - -#[cfg(test)] -mod tests; diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 1a71981160..74cbc0cb39 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -26,13 +26,15 @@ use futures::{ use polkadot_primitives::v1::{ AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData, - PersistedValidationData, PoV, Id as ParaId, CandidateHash, + PersistedValidationData, PoV, Id as ParaId, CandidateHash, Header, ValidatorId, }; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem::{ - ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan, + ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan, messages::AllMessages, }; use polkadot_node_subsystem_test_helpers as test_helpers; +use sp_keyring::Sr25519Keyring; +use parking_lot::Mutex; struct TestHarness { virtual_overseer: test_helpers::TestSubsystemContextHandle, @@ -60,9 +62,41 @@ impl TestCandidateBuilder { } } +#[derive(Clone)] +struct TestClock { + inner: Arc>, +} + +impl TestClock { + fn now(&self) -> Duration { + self.inner.lock().clone() + } + + fn inc(&self, by: Duration) { + *self.inner.lock() += by; + } +} + +impl Clock for TestClock { + fn now(&self) -> Result { + Ok(TestClock::now(self)) + } +} + + +#[derive(Clone)] struct TestState { persisted_validation_data: PersistedValidationData, pruning_config: PruningConfig, + clock: TestClock, +} + +impl TestState { + // pruning is only polled periodically, so we sometimes need to delay until + // we're sure the subsystem has done pruning. + async fn wait_for_pruning(&self) { + Delay::new(self.pruning_config.pruning_interval * 2).await + } } impl Default for TestState { @@ -77,20 +111,26 @@ impl Default for TestState { }; let pruning_config = PruningConfig { - keep_stored_block_for: Duration::from_secs(1), - keep_finalized_block_for: Duration::from_secs(2), - keep_finalized_chunk_for: Duration::from_secs(2), + keep_unavailable_for: Duration::from_secs(1), + keep_finalized_for: Duration::from_secs(2), + pruning_interval: Duration::from_millis(250), + }; + + let clock = TestClock { + inner: Arc::new(Mutex::new(Duration::from_secs(0))), }; Self { persisted_validation_data, pruning_config, + clock, } } } + fn test_harness>( - pruning_config: PruningConfig, + state: TestState, store: Arc, test: impl FnOnce(TestHarness) -> T, ) { @@ -109,7 +149,12 @@ fn test_harness>( let pool = sp_core::testing::TaskExecutor::new(); let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = AvailabilityStoreSubsystem::new_in_memory(store, pruning_config); + let subsystem = AvailabilityStoreSubsystem::new_in_memory( + store, + state.pruning_config.clone(), + Box::new(state.clock), + ); + let subsystem = run(subsystem, context); let test_fut = test(TestHarness { @@ -170,11 +215,17 @@ async fn overseer_signal( .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); } +fn with_tx(db: &Arc, f: impl FnOnce(&mut DBTransaction)) { + let mut tx = DBTransaction::new(); + f(&mut tx); + db.write(tx).unwrap(); +} + #[test] fn runtime_api_error_does_not_stop_the_subsystem() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(PruningConfig::default(), store, |test_harness| async move { + test_harness(TestState::default(), store, |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let new_leaf = Hash::repeat_byte(0x01); @@ -218,7 +269,59 @@ fn runtime_api_error_does_not_stop_the_subsystem() { #[test] fn store_chunk_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(PruningConfig::default(), store.clone(), |test_harness| async move { + test_harness(TestState::default(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let relay_parent = Hash::repeat_byte(32); + let candidate_hash = CandidateHash(Hash::repeat_byte(33)); + let validator_index = 5; + let n_validators = 10; + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: validator_index, + proof: vec![vec![3, 4, 5]], + }; + + // Ensure an entry already exists. In reality this would come from watching + // chain events. + with_tx(&store, |tx| { + super::write_meta(tx, &candidate_hash, &CandidateMeta { + data_available: false, + chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], + state: State::Unavailable(BETimestamp(0)), + }); + }); + + let (tx, rx) = oneshot::channel(); + + let chunk_msg = AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent, + chunk: chunk.clone(), + tx, + }; + + overseer_send(&mut virtual_overseer, chunk_msg.into()).await; + assert_eq!(rx.await.unwrap(), Ok(())); + + let (tx, rx) = oneshot::channel(); + let query_chunk = AvailabilityStoreMessage::QueryChunk( + candidate_hash, + validator_index, + tx, + ); + + overseer_send(&mut virtual_overseer, query_chunk.into()).await; + + assert_eq!(rx.await.unwrap().unwrap(), chunk); + }); +} + + +#[test] +fn store_chunk_does_nothing_if_no_entry_already() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + test_harness(TestState::default(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let relay_parent = Hash::repeat_byte(32); let candidate_hash = CandidateHash(Hash::repeat_byte(33)); @@ -240,19 +343,7 @@ fn store_chunk_works() { }; overseer_send(&mut virtual_overseer, chunk_msg.into()).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::BlockNumber( - hash, - tx, - )) => { - assert_eq!(hash, relay_parent); - tx.send(Ok(Some(4))).unwrap(); - } - ); - - assert_eq!(rx.await.unwrap(), Ok(())); + assert_eq!(rx.await.unwrap(), Err(())); let (tx, rx) = oneshot::channel(); let query_chunk = AvailabilityStoreMessage::QueryChunk( @@ -263,7 +354,52 @@ fn store_chunk_works() { overseer_send(&mut virtual_overseer, query_chunk.into()).await; - assert_eq!(rx.await.unwrap().unwrap(), chunk); + assert!(rx.await.unwrap().is_none()); + }); +} + +#[test] +fn query_chunk_checks_meta() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + test_harness(TestState::default(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = CandidateHash(Hash::repeat_byte(33)); + let validator_index = 5; + let n_validators = 10; + + // Ensure an entry already exists. In reality this would come from watching + // chain events. + with_tx(&store, |tx| { + super::write_meta(tx, &candidate_hash, &CandidateMeta { + data_available: false, + chunks_stored: { + let mut v = bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators]; + v.set(validator_index as usize, true); + v + }, + state: State::Unavailable(BETimestamp(0)), + }); + }); + + let (tx, rx) = oneshot::channel(); + let query_chunk = AvailabilityStoreMessage::QueryChunkAvailability( + candidate_hash, + validator_index, + tx, + ); + + overseer_send(&mut virtual_overseer, query_chunk.into()).await; + assert!(rx.await.unwrap()); + + let (tx, rx) = oneshot::channel(); + let query_chunk = AvailabilityStoreMessage::QueryChunkAvailability( + candidate_hash, + validator_index + 1, + tx, + ); + + overseer_send(&mut virtual_overseer, query_chunk.into()).await; + assert!(!rx.await.unwrap()); }); } @@ -271,7 +407,7 @@ fn store_chunk_works() { fn store_block_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + test_harness(test_state.clone(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let candidate_hash = CandidateHash(Hash::repeat_byte(1)); let validator_index = 5; @@ -283,7 +419,7 @@ fn store_block_works() { let available_data = AvailableData { pov: Arc::new(pov), - validation_data: test_state.persisted_validation_data, + validation_data: test_state.persisted_validation_data.clone(), }; @@ -319,13 +455,12 @@ fn store_block_works() { }); } - #[test] fn store_pov_and_query_chunk_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + test_harness(test_state.clone(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let candidate_hash = CandidateHash(Hash::repeat_byte(1)); let n_validators = 10; @@ -336,11 +471,10 @@ fn store_pov_and_query_chunk_works() { let available_data = AvailableData { pov: Arc::new(pov), - validation_data: test_state.persisted_validation_data, + validation_data: test_state.persisted_validation_data.clone(), }; - let no_metrics = Metrics(None); - let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap(); + let chunks_expected = erasure::obtain_chunks_v1(n_validators as _, &available_data).unwrap(); let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( @@ -358,71 +492,17 @@ fn store_pov_and_query_chunk_works() { for validator_index in 0..n_validators { let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); - assert_eq!(chunk, chunks_expected[validator_index as usize]); + assert_eq!(chunk.chunk, chunks_expected[validator_index as usize]); } }); } -#[test] -fn stored_but_not_included_chunk_is_pruned() { - let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - let test_state = TestState::default(); - - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - let candidate_hash = CandidateHash(Hash::repeat_byte(1)); - let relay_parent = Hash::repeat_byte(2); - let validator_index = 5; - - let chunk = ErasureChunk { - chunk: vec![1, 2, 3], - index: validator_index, - proof: vec![vec![3, 4, 5]], - }; - - let (tx, rx) = oneshot::channel(); - let chunk_msg = AvailabilityStoreMessage::StoreChunk { - candidate_hash, - relay_parent, - chunk: chunk.clone(), - tx, - }; - - overseer_send(&mut virtual_overseer, chunk_msg.into()).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::BlockNumber( - hash, - tx, - )) => { - assert_eq!(hash, relay_parent); - tx.send(Ok(Some(4))).unwrap(); - } - ); - - rx.await.unwrap().unwrap(); - - // At this point data should be in the store. - assert_eq!( - query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), - chunk, - ); - - // Wait for twice as long as the stored block kept for. - Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await; - - // The block was not included by this point so it should be pruned now. - assert!(query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.is_none()); - }); -} - #[test] fn stored_but_not_included_data_is_pruned() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + test_harness(test_state.clone(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let candidate_hash = CandidateHash(Hash::repeat_byte(1)); let n_validators = 10; @@ -433,7 +513,7 @@ fn stored_but_not_included_data_is_pruned() { let available_data = AvailableData { pov: Arc::new(pov), - validation_data: test_state.persisted_validation_data, + validation_data: test_state.persisted_validation_data.clone(), }; let (tx, rx) = oneshot::channel(); @@ -455,8 +535,9 @@ fn stored_but_not_included_data_is_pruned() { available_data, ); - // Wait for twice as long as the stored block kept for. - Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await; + // Wait until pruning. + test_state.clock.inc(test_state.pruning_config.keep_unavailable_for); + test_state.wait_for_pruning().await; // The block was not included by this point so it should be pruned now. assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none()); @@ -468,7 +549,7 @@ fn stored_data_kept_until_finalized() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + test_harness(test_state.clone(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let n_validators = 10; @@ -487,9 +568,12 @@ fn stored_data_kept_until_finalized() { let available_data = AvailableData { pov: Arc::new(pov), - validation_data: test_state.persisted_validation_data, + validation_data: test_state.persisted_validation_data.clone(), }; + let parent = Hash::repeat_byte(2); + let block_number = 10; + let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( candidate_hash, @@ -509,29 +593,17 @@ fn stored_data_kept_until_finalized() { available_data, ); - let new_leaf = Hash::repeat_byte(2); - overseer_signal( + let new_leaf = import_leaf( &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(), - deactivated: vec![].into(), - }), + parent, + block_number, + vec![CandidateEvent::CandidateIncluded(candidate, HeadData::default())], + (0..n_validators).map(|_| Sr25519Keyring::Alice.public().into()).collect(), ).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::CandidateEvents(tx), - )) => { - assert_eq!(relay_parent, new_leaf); - tx.send(Ok(vec![ - CandidateEvent::CandidateIncluded(candidate, HeadData::default()), - ])).unwrap(); - } - ); - - Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await; + // Wait until unavailable data would definitely be pruned. + test_state.clock.inc(test_state.pruning_config.keep_unavailable_for * 10); + test_state.wait_for_pruning().await; // At this point data should _still_ be in the store. assert_eq!( @@ -539,13 +611,18 @@ fn stored_data_kept_until_finalized() { available_data, ); + assert!( + query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await + ); + overseer_signal( &mut virtual_overseer, - OverseerSignal::BlockFinalized(new_leaf, 10) + OverseerSignal::BlockFinalized(new_leaf, block_number) ).await; - // Wait for a half of the time finalized data should be available for - Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await; + // Wait until unavailable data would definitely be pruned. + test_state.clock.inc(test_state.pruning_config.keep_finalized_for / 2); + test_state.wait_for_pruning().await; // At this point data should _still_ be in the store. assert_eq!( @@ -553,115 +630,21 @@ fn stored_data_kept_until_finalized() { available_data, ); - // Wait until it is should be gone. - Delay::new(test_state.pruning_config.keep_finalized_block_for).await; + assert!( + query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await + ); + + // Wait until it definitely should be gone. + test_state.clock.inc(test_state.pruning_config.keep_finalized_for); + test_state.wait_for_pruning().await; // At this point data should be gone from the store. assert!( query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(), ); - }); -} -#[test] -fn stored_chunk_kept_until_finalized() { - let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - let test_state = TestState::default(); - - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - let relay_parent = Hash::repeat_byte(2); - let validator_index = 5; - let candidate = TestCandidateBuilder { - ..Default::default() - }.build(); - let candidate_hash = candidate.hash(); - - let chunk = ErasureChunk { - chunk: vec![1, 2, 3], - index: validator_index, - proof: vec![vec![3, 4, 5]], - }; - - let (tx, rx) = oneshot::channel(); - let chunk_msg = AvailabilityStoreMessage::StoreChunk { - candidate_hash, - relay_parent, - chunk: chunk.clone(), - tx, - }; - - overseer_send(&mut virtual_overseer, chunk_msg.into()).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::BlockNumber( - hash, - tx, - )) => { - assert_eq!(hash, relay_parent); - tx.send(Ok(Some(4))).unwrap(); - } - ); - - rx.await.unwrap().unwrap(); - - // At this point data should be in the store. - assert_eq!( - query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), - chunk, - ); - - let new_leaf = Hash::repeat_byte(2); - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(), - deactivated: vec![].into(), - }), - ).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::CandidateEvents(tx), - )) => { - assert_eq!(relay_parent, new_leaf); - tx.send(Ok(vec![ - CandidateEvent::CandidateIncluded(candidate, HeadData::default()), - ])).unwrap(); - } - ); - - Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await; - - // At this point data should _still_ be in the store. - assert_eq!( - query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), - chunk, - ); - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::BlockFinalized(new_leaf, 10) - ).await; - - // Wait for a half of the time finalized data should be available for - Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await; - - // At this point data should _still_ be in the store. - assert_eq!( - query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), - chunk, - ); - - // Wait until it is should be gone. - Delay::new(test_state.pruning_config.keep_finalized_chunk_for).await; - - // At this point data should be gone from the store. assert!( - query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(), + query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await ); }); } @@ -671,9 +654,14 @@ fn forkfullness_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + test_harness(test_state.clone(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; let n_validators = 10; + let block_number_1 = 5; + let block_number_2 = 5; + let validators: Vec<_> = (0..n_validators).map(|_| Sr25519Keyring::Alice.public().into()).collect(); + let parent_1 = Hash::repeat_byte(3); + let parent_2 = Hash::repeat_byte(4); let pov_1 = PoV { block_data: BlockData(vec![1, 2, 3]), @@ -708,7 +696,7 @@ fn forkfullness_works() { let available_data_2 = AvailableData { pov: Arc::new(pov_2), - validation_data: test_state.persisted_validation_data, + validation_data: test_state.persisted_validation_data.clone(), }; let (tx, rx) = oneshot::channel(); @@ -747,47 +735,25 @@ fn forkfullness_works() { available_data_2, ); - - let new_leaf_1 = Hash::repeat_byte(2); - let new_leaf_2 = Hash::repeat_byte(3); - - overseer_signal( + let new_leaf_1 = import_leaf( &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![(new_leaf_1, Arc::new(JaegerSpan::Disabled)), (new_leaf_2, Arc::new(JaegerSpan::Disabled))].into(), - deactivated: vec![].into(), - }), + parent_1, + block_number_1, + vec![CandidateEvent::CandidateIncluded(candidate_1, HeadData::default())], + validators.clone(), ).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - leaf, - RuntimeApiRequest::CandidateEvents(tx), - )) => { - assert_eq!(leaf, new_leaf_1); - tx.send(Ok(vec![ - CandidateEvent::CandidateIncluded(candidate_1, HeadData::default()), - ])).unwrap(); - } - ); - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - leaf, - RuntimeApiRequest::CandidateEvents(tx), - )) => { - assert_eq!(leaf, new_leaf_2); - tx.send(Ok(vec![ - CandidateEvent::CandidateIncluded(candidate_2, HeadData::default()), - ])).unwrap(); - } - ); + let _new_leaf_2 = import_leaf( + &mut virtual_overseer, + parent_2, + block_number_2, + vec![CandidateEvent::CandidateIncluded(candidate_2, HeadData::default())], + validators.clone(), + ).await; overseer_signal( &mut virtual_overseer, - OverseerSignal::BlockFinalized(new_leaf_1, 5) + OverseerSignal::BlockFinalized(new_leaf_1, block_number_1) ).await; // Data of both candidates should be still present in the DB. @@ -800,10 +766,41 @@ fn forkfullness_works() { query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(), available_data_2, ); - // Wait for longer than finalized blocks should be kept for - Delay::new(test_state.pruning_config.keep_finalized_block_for + Duration::from_secs(1)).await; - // Data of both candidates should be gone now. + assert!( + query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await, + ); + + assert!( + query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await, + ); + + // Candidate 2 should now be considered unavailable and will be pruned. + test_state.clock.inc(test_state.pruning_config.keep_unavailable_for); + test_state.wait_for_pruning().await; + + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(), + available_data_1, + ); + + assert!( + query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(), + ); + + assert!( + query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await, + ); + + assert!( + query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, + ); + + // Wait for longer than finalized blocks should be kept for + test_state.clock.inc(test_state.pruning_config.keep_finalized_for); + test_state.wait_for_pruning().await; + + // Everything should be pruned now. assert!( query_available_data(&mut virtual_overseer, candidate_1_hash).await.is_none(), ); @@ -811,6 +808,14 @@ fn forkfullness_works() { assert!( query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(), ); + + assert!( + query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await, + ); + + assert!( + query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, + ); }); } @@ -838,3 +843,88 @@ async fn query_chunk( rx.await.unwrap() } + +async fn query_all_chunks( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + candidate_hash: CandidateHash, + n_validators: u32, + expect_present: bool, +) -> bool { + for i in 0..n_validators { + if query_chunk(virtual_overseer, candidate_hash, i).await.is_some() != expect_present { + return false + } + } + true +} + +async fn import_leaf( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + parent_hash: Hash, + block_number: BlockNumber, + events: Vec, + validators: Vec, +) -> Hash { + let header = Header { + parent_hash, + number: block_number, + state_root: Hash::zero(), + extrinsics_root: Hash::zero(), + digest: Default::default(), + }; + let new_leaf = header.hash(); + + overseer_signal( + virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(), + deactivated: vec![].into(), + }), + ).await; + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(events)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(Some(block_number))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader( + relay_parent, + tx, + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(Some(header))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, parent_hash); + tx.send(Ok(validators)).unwrap(); + } + ); + + new_leaf +} diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md b/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md index f8d6f1b67a..7d2ec5e7bb 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md @@ -9,25 +9,19 @@ The two data types: For each of these data we have pruning rules that determine how long we need to keep that data available. -PoV hypothetically only need to be kept around until the block where the data was made fully available is finalized. However, disputes can revert finality, so we need to be a bit more conservative. We should keep the PoV until a block that finalized availability of it has been finalized for 1 day. +PoV hypothetically only need to be kept around until the block where the data was made fully available is finalized. However, disputes can revert finality, so we need to be a bit more conservative and we add a delay. We should keep the PoV until a block that finalized availability of it has been finalized for 1 day + 1 hour. -> TODO: arbitrary, but extracting `acceptance_period` is kind of hard here... - -Availability chunks need to be kept available until the dispute period for the corresponding candidate has ended. We can accomplish this by using the same criterion as the above, plus a delay. This gives us a pruning condition of the block finalizing availability of the chunk being final for 1 day + 1 hour. - -> TODO: again, concrete acceptance-period would be nicer here, but complicates things +Availability chunks need to be kept available until the dispute period for the corresponding candidate has ended. We can accomplish this by using the same criterion as the above. This gives us a pruning condition of the block finalizing availability of the chunk being final for 1 day + 1 hour. There is also the case where a validator commits to make a PoV available, but the corresponding candidate is never backed. In this case, we keep the PoV available for 1 hour. -> TODO: ideally would be an upper bound on how far back contextual execution is OK. +There may be multiple competing blocks all ending the availability phase for a particular candidate. Until finality, it will be unclear which of those is actually the canonical chain, so the pruning records for PoVs and Availability chunks should keep track of all such blocks. -There may be multiple competing blocks all ending the availability phase for a particular candidate. Until (and slightly beyond) finality, it will be unclear which of those is actually the canonical chain, so the pruning records for PoVs and Availability chunks should keep track of all such blocks. - -## Lifetime of the PoV in the storage +## Lifetime of the block data and chunks in storage ```dot process digraph { - label = "Block life FSM\n\n\n"; + label = "Block data FSM\n\n\n"; labelloc = "t"; rankdir="LR"; @@ -39,32 +33,56 @@ digraph { st -> inc [label = "Block\nincluded"] st -> prn [label = "Stored block\ntimed out"] inc -> fin [label = "Block\nfinalized"] - fin -> prn [label = "Block keep time\n(1 day) elapsed"] -} -``` - -## Lifetime of the chunk in the storage - -```dot process -digraph { - label = "Chunk life FSM\n\n\n"; - labelloc = "t"; - rankdir="LR"; - - chst [label = "Chunk\nStored"; shape = circle] - st [label = "Block\nStored"; shape = circle] - inc [label = "Included"; shape = circle] - fin [label = "Finalized"; shape = circle] - prn [label = "Pruned"; shape = circle] - - chst -> inc [label = "Block\nincluded"] - st -> inc [label = "Block\nincluded"] - st -> prn [label = "Stored block\ntimed out"] - inc -> fin [label = "Block\nfinalized"] + inc -> st [label = "Competing blocks\nfinalized"] fin -> prn [label = "Block keep time\n(1 day + 1 hour) elapsed"] } ``` +## Database Schema + +We use an underlying Key-Value database where we assume we have the following operations available: + * `write(key, value)` + * `read(key) -> Option` + * `iter_with_prefix(prefix) -> Iterator<(key, value)>` - gives all keys and values in lexicographical order where the key starts with `prefix`. + +We use this database to encode the following schema: + +``` +("available", CandidateHash) -> Option +("chunk", CandidateHash, u32) -> Option +("meta", CandidateHash) -> Option + +("unfinalized", BlockNumber, BlockHash, CandidateHash) -> Option<()> +("prune_by_time", Timestamp, CandidateHash) -> Option<()> +``` + +Timestamps are the wall-clock seconds since unix epoch. Timestamps and block numbers are both encoded as big-endian so lexicographic order is ascending. + +The meta information that we track per-candidate is defined as the `CandidateMeta` struct + +```rust +struct CandidateMeta { + state: State, + data_available: bool, + chunks_stored: Bitfield, +} + +enum State { + /// Candidate data was first observed at the given time but is not available in any block. + Unavailable(Timestamp), + /// The candidate was first observed at the given time and was included in the given list of unfinalized blocks, which may be + /// empty. The timestamp here is not used for pruning. Either one of these blocks will be finalized or the state will regress to + /// `State::Unavailable`, in which case the same timestamp will be reused. + Unfinalized(Timestamp, Vec<(BlockNumber, BlockHash)>), + /// Candidate data has appeared in a finalized block and did so at the given time. + Finalized(Timestamp) +} +``` + +We maintain the invariant that if a candidate has a meta entry, its available data exists on disk if `data_available` is true. All chunks mentioned in the meta entry are available. + +Additionally, there is exactly one `prune_by_time` entry which holds the candidate hash unless the state is `Unfinalized`. There may be zero, one, or many "unfinalized" keys with the given candidate, and this will correspond to the `state` of the meta entry. + ## Protocol Input: [`AvailabilityStoreMessage`][ASM] @@ -72,97 +90,81 @@ Input: [`AvailabilityStoreMessage`][ASM] Output: - [`RuntimeApiMessage`][RAM] + ## Functionality -On `ActiveLeavesUpdate`: - For each head in the `activated` list: - - Note any new candidates backed in the block. Update pruning records for any stored `PoVBlock`s. - - Note any newly-included candidates backed in the block. Update pruning records for any stored availability chunks. + - Note any new candidates backed in the block. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour. + - Note any new candidate included in the block. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the block hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate. + - The `CandidateEvent` runtime API can be used for this purpose. + - TODO: load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data. -On `OverseerSignal::BlockFinalized(_)` events: +On `OverseerSignal::BlockFinalized(finalized)` events: + - for each key in `iter_with_prefix("unfinalized")` + - Stop if the key is beyond `("unfinalized, finalized)` + - For each block number f that we encounter, load the finalized hash for that block. + - The state of each `CandidateMeta` we encounter here must be `Unfinalized`, since we loaded the candidate from an `"unfinalized"` key. + - For each candidate that we encounter under `f` and the finalized block hash, + - Update the `CandidateMeta` to have `State::Finalized`. Remove all `"unfinalized"` entries from the old `Unfinalized` state. + - Register a `"prune_by_time"` entry for the candidate based on the current time + 1 day + 1 hour. + - For each candidate that we encounter under `f` which is not under the finalized block hash, + - Remove all entries under `f` in the `Unfinalized` state. + - If the `CandidateMeta` has state `Unfinalized` with an empty list of blocks, downgrade to `Unavailable` and re-schedule pruning under the timestamp + 1 hour. We do not prune here as the candidate still may be included in a descendent of the finalized chain. + - Remove all `"unfinalized"` keys under `f`. + - Update last_finalized = finalized. -- Handle all pruning based on the newly-finalized block. + This is roughly `O(n * m)` where n is the number of blocks finalized since the last update, and `m` is the number of parachains. -On `QueryPoV` message: +On `QueryAvailableData` message: -- Return the PoV block, if any, for that candidate hash. + - Query `("available", candidate_hash)` + + This is `O(n)` in the size of the data, which may be large. + +On `QueryDataAvailability` message: + + - Query whether `("meta", candidate_hash)` exists and `data_available == true`. + + This is `O(n)` in the size of the metadata which is small. On `QueryChunk` message: -- Determine if we have the chunk indicated by the parameters and return it and its inclusion proof via the response channel if so. + - Query `("chunk", candidate_hash, index)` + + This is `O(n)` in the size of the data, which may be large. + +On `QueryChunkAvailability message: + + - Query whether `("meta", candidate_hash)` exists and the bit at `index` is set. + + This is `O(n)` in the size of the metadata which is small. On `StoreChunk` message: -- Store the chunk along with its inclusion proof under the candidate hash and validator index. + - If there is a `CandidateMeta` under the candidate hash, set the bit of the erasure-chunk in the `chunks_stored` bitfield to `1`. If it was not `1` already, write the chunk under `("chunk", candidate_hash, chunk_index)`. -On `StorePoV` message: + This is `O(n)` in the size of the chunk. -- Store the block, if the validator index is provided, store the respective chunk as well. +On `StoreAvailableData` message: -On finality event: + - If there is no `CandidateMeta` under the candidate hash, create it with `State::Unavailable(now)`. Load the `CandidateMeta` otherwise. + - Store `data` under `("available", candidate_hash)` and set `data_available` to true. + - Store each chunk under `("chunk", candidate_hash, index)` and set every bit in `chunks_stored` to `1`. -- For the finalized block and any earlier block (if any) update pruning records of `PoV`s and chunks to keep them for respective periods after finality. + This is `O(n)` in the size of the data as the aggregate size of the chunks is proportional to the data. -### Note any backed, included and timedout candidates in the block by `hash`. +Every 5 minutes, run a pruning routine: -- Create a `(sender, receiver)` pair. -- Dispatch a [`RuntimeApiMessage`][RAM]`::Request(hash, RuntimeApiRequest::CandidateEvents(sender)` and listen on the receiver for a response. -- For every event in the response:`CandidateEvent::CandidateIncluded`. - * For every `CandidateEvent::CandidateBacked` do nothing - * For every `CandidateEvent::CandidateIncluded` update pruning records of any blocks that the node stored previously. - * For every `CandidateEvent::CandidateTimedOut` use pruning records to prune the data; delete the info from records. + - for each key in `iter_with_prefix("prune_by_time")`: + - If the key is beyond ("prune_by_time", now), return. + - Remove the key. + - Extract `candidate_hash` from the key. + - Load and remove the `("meta", candidate_hash)` + - For each erasure chunk bit set, remove `("chunk", candidate_hash, bit_index)`. + - If `data_available`, remove `("available", candidate_hash) -## Schema - -### PoV pruning - -We keep a record about every PoV we keep, tracking its state and the time after which this PoV should be pruned. - -As the state of the `Candidate` changes, so does the `Prune At` time according to the rules defined earlier. - -| Record 1 | .. | Record N | -|----------------|----|----------------| -| CandidateHash1 | .. | CandidateHashN | -| Prune At | .. | Prune At | -| CandidateState | .. | CandidateState | - -### Chunk pruning - -Chunk pruning is organized in a similar schema as PoV pruning. - -| Record 1 | .. | Record N | -|----------------|----|----------------| -| CandidateHash1 | .. | CandidateHashN | -| Prune At | .. | Prune At | -| CandidateState | .. | CandidateState | - -### Included blocks caching - -In order to process finality events correctly we need to cache the set of parablocks included into each relay block beginning with the last finalized block and up to the most recent heads. We have to cache this data since we are only able to query this info from the state for the `k` last blocks where `k` is a relatively small number (for more info see `Assumptions`) - -These are used to update Chunk pruning and PoV pruning records upon finality: -When another block finality notification is received: - - For any record older than this block: - - Update pruning - - Remove the record - -| Relay Block N | .. | Chain Head 1 | Chain Head 2 | -|---------------|----|--------------|--------------| -| CandidateN_1 Included | .. | Candidate1_1 Included | Candidate2_1 Included | -| CandidateN_2 Included | .. | Candidate1_2 Included | Candidete2_2 Included | -| .. | .. | .. | .. | -| CandidateN_M Included | .. | Candidate1_K Included | Candidate2_L Included | - -> TODO: It's likely we will have to have a way to go from block hash to `BlockNumber` to make this work. - -### Blocks - -Blocks are simply stored as `(Hash, AvailableData)` key-value pairs. - -### Chunks - -Chunks are stored as `(Hash, Vec)` key-value pairs. + This is O(n * m) in the amount of candidates and average size of the data stored. This is probably the most expensive operation but does not need + to be run very often. ## Basic scenarios to test diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 02219a91f1..44d0d8064d 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -173,12 +173,12 @@ enum AvailabilityStoreMessage { QueryDataAvailability(CandidateHash, ResponseChannel), /// Query a specific availability chunk of the candidate's erasure-coding by validator index. /// Returns the chunk and its inclusion proof against the candidate's erasure-root. - QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel>), + QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel>), /// Store a specific chunk of the candidate's erasure-coding, with an /// accompanying proof. - StoreChunk(CandidateHash, AvailabilityChunkAndProof, ResponseChannel>), + StoreChunk(CandidateHash, ErasureChunk, ResponseChannel>), /// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's - /// `AvailabilityChunkAndProof`. + /// `ErasureChunk`. StoreAvailableData(CandidateHash, Option, u32, AvailableData, ResponseChannel>), } ```