diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 83add4090c..ce62da6696 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4932,7 +4932,9 @@ version = "0.1.0" dependencies = [ "assert_matches", "derive_more 0.99.11", + "env_logger", "futures 0.3.5", + "futures-timer 3.0.2", "kvdb", "kvdb-memorydb", "kvdb-rocksdb", @@ -4944,6 +4946,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-overseer", "polkadot-primitives", + "smallvec 1.4.2", "sp-core", ] diff --git a/polkadot/node/core/av-store/Cargo.toml b/polkadot/node/core/av-store/Cargo.toml index f1a2c5ade2..27dd7bd733 100644 --- a/polkadot/node/core/av-store/Cargo.toml +++ b/polkadot/node/core/av-store/Cargo.toml @@ -7,21 +7,24 @@ edition = "2018" [dependencies] derive_more = "0.99.9" futures = "0.3.5" -log = "0.4.8" - +futures-timer = "3.0.2" kvdb = "0.7.0" kvdb-rocksdb = "0.9.1" -codec = { package = "parity-scale-codec", version = "1.3.1", features = ["derive"] } +log = "0.4.8" +codec = { package = "parity-scale-codec", version = "1.3.1", features = ["derive"] } erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" } -polkadot-overseer = { path = "../../overseer" } -polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-overseer = { path = "../../overseer" } +polkadot-primitives = { path = "../../../primitives" } [dev-dependencies] -sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } -futures = { version = "0.3.5", features = ["thread-pool"] } -polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } -kvdb-memorydb = "0.7.0" +env_logger = "0.7.1" assert_matches = "1.3.0" +smallvec = "1.4.2" +kvdb-memorydb = "0.7.0" + +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index e32b3c1526..e4e26d2940 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -19,36 +19,44 @@ #![recursion_limit="256"] #![warn(missing_docs)] -use std::collections::HashMap; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; use std::io; use std::path::PathBuf; use std::sync::Arc; +use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; use codec::{Encode, Decode}; -use futures::{select, channel::oneshot, FutureExt}; +use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt}; +use futures_timer::Delay; use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; use polkadot_primitives::v1::{ - Hash, AvailableData, ErasureChunk, ValidatorIndex, + Hash, AvailableData, BlockNumber, CandidateEvent, ErasureChunk, ValidatorIndex, }; use polkadot_subsystem::{ - FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem, + FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem, + ActiveLeavesUpdate, + errors::{ChainApiError, RuntimeApiError}, }; -use polkadot_node_subsystem_util::{ - metrics::{self, prometheus}, +use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_subsystem::messages::{ + AllMessages, AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, }; -use polkadot_subsystem::messages::AvailabilityStoreMessage; const LOG_TARGET: &str = "availability"; mod columns { pub const DATA: u32 = 0; - pub const NUM_COLUMNS: u32 = 1; + pub const META: u32 = 1; + pub const NUM_COLUMNS: u32 = 2; } #[derive(Debug, derive_more::From)] enum Error { + #[from] + Chain(ChainApiError), #[from] Erasure(erasure::Error), #[from] @@ -56,15 +64,316 @@ enum Error { #[from] Oneshot(oneshot::Canceled), #[from] + Runtime(RuntimeApiError), + #[from] Subsystem(SubsystemError), + #[from] + Time(SystemTimeError), +} + +/// A wrapper type for delays. +#[derive(Debug, Decode, Encode, Eq)] +enum PruningDelay { + /// This pruning should be triggered after this `Duration` from UNIX_EPOCH. + In(Duration), + + /// Data is in the state where it has no expiration. + Indefinite, +} + +impl PruningDelay { + fn now() -> Result { + Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.into()) + } + + fn into_the_future(duration: Duration) -> Result { + Ok(Self::In(SystemTime::now().duration_since(UNIX_EPOCH)? + duration)) + } + + fn as_duration(&self) -> Option { + match self { + PruningDelay::In(d) => Some(*d), + PruningDelay::Indefinite => None, + } + } +} + +impl From for PruningDelay { + fn from(d: Duration) -> Self { + Self::In(d) + } +} + +impl PartialEq for PruningDelay { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (PruningDelay::In(this), PruningDelay::In(that)) => {this == that}, + (PruningDelay::Indefinite, PruningDelay::Indefinite) => true, + _ => false, + } + } +} + +impl PartialOrd for PruningDelay { + fn partial_cmp(&self, other: &Self) -> Option { + match (self, other) { + (PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that), + (PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less), + (PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater), + (PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal), + } + } +} + +impl Ord for PruningDelay { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that), + (PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less, + (PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater, + (PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal, + } + } +} + +/// A key for chunk pruning records. +const CHUNK_PRUNING_KEY: [u8; 14] = *b"chunks_pruning"; + +/// A key for PoV pruning records. +const POV_PRUNING_KEY: [u8; 11] = *b"pov_pruning"; + +/// A key for a cached value of next scheduled PoV pruning. +const NEXT_POV_PRUNING: [u8; 16] = *b"next_pov_pruning"; + +/// A key for a cached value of next scheduled chunk pruning. +const NEXT_CHUNK_PRUNING: [u8; 18] = *b"next_chunk_pruning"; + +/// The following constants are used under normal conditions: + +/// Stored block is kept available for 1 hour. +const KEEP_STORED_BLOCK_FOR: Duration = Duration::from_secs(60 * 60); + +/// Finalized block is kept for 1 day. +const KEEP_FINALIZED_BLOCK_FOR: Duration = Duration::from_secs(24 * 60 * 60); + +/// Keep chunk of the finalized block for 1 day + 1 hour. +const KEEP_FINALIZED_CHUNK_FOR: Duration = Duration::from_secs(25 * 60 * 60); + +/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of blocks. +/// Essenially this is the first element in the sorted array of pruning data, +/// we just want to cache it here to avoid lifting the whole array just to look at the head. +/// +/// This record exists under `NEXT_POV_PRUNING` key, if it does not either: +/// a) There are no records and nothing has to be pruned. +/// b) There are records but all of them are in `Included` state and do not have exact time to +/// be pruned. +#[derive(Decode, Encode)] +struct NextPoVPruning(Duration); + +impl NextPoVPruning { + // After which duration from `now` this should fire. + fn should_fire_in(&self) -> Result { + Ok(self.0 - SystemTime::now().duration_since(UNIX_EPOCH)?) + } +} + +/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of chunks. +/// Essentially this is the first element in the sorted array of pruning data, +/// we just want to cache it here to avoid lifting the whole array just to look at the head. +/// +/// This record exists under `NEXT_CHUNK_PRUNING` key, if it does not either: +/// a) There are no records and nothing has to be pruned. +/// b) There are records but all of them are in `Included` state and do not have exact time to +/// be pruned. +#[derive(Decode, Encode)] +struct NextChunkPruning(Duration); + +impl NextChunkPruning { + // After which amount of seconds into the future from `now` this should fire. + fn should_fire_in(&self) -> Result { + Ok(self.0 - SystemTime::now().duration_since(UNIX_EPOCH)?) + } +} + +/// Struct holding pruning timing configuration. +/// The only purpose of this structure is to use different timing +/// configurations in production and in testing. +#[derive(Clone)] +struct PruningConfig { + /// How long should a stored block stay available. + keep_stored_block_for: Duration, + + /// How long should a finalized block stay available. + keep_finalized_block_for: Duration, + + /// How long should a chunk of a finalized block stay available. + keep_finalized_chunk_for: Duration, +} + +impl Default for PruningConfig { + fn default() -> Self { + Self { + keep_stored_block_for: KEEP_STORED_BLOCK_FOR, + keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR, + keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR, + } + } +} + +#[derive(Debug, Decode, Encode, Eq, PartialEq)] +enum CandidateState { + Stored, + Included, + Finalized, +} + +#[derive(Debug, Decode, Encode, Eq)] +struct PoVPruningRecord { + candidate_hash: Hash, + block_number: BlockNumber, + candidate_state: CandidateState, + prune_at: PruningDelay, +} + +impl PartialEq for PoVPruningRecord { + fn eq(&self, other: &Self) -> bool { + self.candidate_hash == other.candidate_hash + } +} + +impl Ord for PoVPruningRecord { + fn cmp(&self, other: &Self) -> Ordering { + if self.candidate_hash == other.candidate_hash { + return Ordering::Equal; + } + + self.prune_at.cmp(&other.prune_at) + } +} + +impl PartialOrd for PoVPruningRecord { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(Debug, Decode, Encode, Eq)] +struct ChunkPruningRecord { + candidate_hash: Hash, + block_number: BlockNumber, + candidate_state: CandidateState, + chunk_index: u32, + prune_at: PruningDelay, +} + +impl PartialEq for ChunkPruningRecord { + fn eq(&self, other: &Self) -> bool { + self.candidate_hash == other.candidate_hash && + self.chunk_index == other.chunk_index + } +} + +impl Ord for ChunkPruningRecord { + fn cmp(&self, other: &Self) -> Ordering { + if self.candidate_hash == other.candidate_hash { + return Ordering::Equal; + } + + self.prune_at.cmp(&other.prune_at) + } +} + +impl PartialOrd for ChunkPruningRecord { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } } /// An implementation of the Availability Store subsystem. pub struct AvailabilityStoreSubsystem { + pruning_config: PruningConfig, inner: Arc, metrics: Metrics, } +impl AvailabilityStoreSubsystem { + // Perform pruning of PoVs + fn prune_povs(&self) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default(); + let now = PruningDelay::now()?; + + log::trace!(target: LOG_TARGET, "Pruning PoVs"); + let outdated_records_count = pov_pruning.iter() + .take_while(|r| r.prune_at <= now) + .count(); + + for record in pov_pruning.drain(..outdated_records_count) { + log::trace!(target: LOG_TARGET, "Removing record {:?}", record); + tx.delete( + columns::DATA, + available_data_key(&record.candidate_hash).as_slice(), + ); + } + + put_pov_pruning(&self.inner, Some(tx), pov_pruning)?; + + Ok(()) + } + + // Perform pruning of chunks. + fn prune_chunks(&self) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default(); + let now = PruningDelay::now()?; + + log::trace!(target: LOG_TARGET, "Pruning Chunks"); + let outdated_records_count = chunk_pruning.iter() + .take_while(|r| r.prune_at <= now) + .count(); + + for record in chunk_pruning.drain(..outdated_records_count) { + log::trace!(target: LOG_TARGET, "Removing record {:?}", record); + tx.delete( + columns::DATA, + erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(), + ); + } + + put_chunk_pruning(&self.inner, Some(tx), chunk_pruning)?; + + Ok(()) + } + + // Return a `Future` that either resolves when another PoV pruning has to happen + // or is indefinitely `pending` in case no pruning has to be done. + // Just a helper to `select` over multiple things at once. + fn maybe_prune_povs(&self) -> Result, Error> { + let future = match get_next_pov_pruning_time(&self.inner) { + Some(pruning) => { + Either::Left(Delay::new(pruning.should_fire_in()?)) + } + None => Either::Right(future::pending::<()>()), + }; + + Ok(future) + } + + // Return a `Future` that either resolves when another chunk pruning has to happen + // or is indefinitely `pending` in case no pruning has to be done. + // Just a helper to `select` over multiple things at once. + fn maybe_prune_chunks(&self) -> Result, Error> { + let future = match get_next_chunk_pruning_time(&self.inner) { + Some(pruning) => { + Either::Left(Delay::new(pruning.should_fire_in()?)) + } + None => Either::Right(future::pending::<()>()), + }; + + Ok(future) + } +} + fn available_data_key(candidate_hash: &Hash) -> Vec { (candidate_hash, 0i8).encode() } @@ -110,38 +419,74 @@ impl AvailabilityStoreSubsystem { let db = Database::open(&db_config, &path)?; Ok(Self { + pruning_config: PruningConfig::default(), inner: Arc::new(db), metrics, }) } #[cfg(test)] - fn new_in_memory(inner: Arc) -> Self { + fn new_in_memory(inner: Arc, pruning_config: PruningConfig) -> Self { Self { + pruning_config, inner, metrics: Metrics(None), } } } -async fn run(subsystem: AvailabilityStoreSubsystem, mut ctx: Context) +fn get_next_pov_pruning_time(db: &Arc) -> Option { + query_inner(db, columns::META, &NEXT_POV_PRUNING) +} + +fn get_next_chunk_pruning_time(db: &Arc) -> Option { + query_inner(db, columns::META, &NEXT_CHUNK_PRUNING) +} + +async fn run(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) -> Result<(), Error> where Context: SubsystemContext, { let ctx = &mut ctx; loop { + // Every time the following two methods are called a read from DB is performed. + // But given that these are very small values which are essentially a newtype + // wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the + // fact of the frequent reads itself we assume these to end up cached in the memory + // anyway and thus these db reads to be reasonably fast. + let pov_pruning_time = subsystem.maybe_prune_povs()?; + let chunk_pruning_time = subsystem.maybe_prune_chunks()?; + + let mut pov_pruning_time = pov_pruning_time.fuse(); + let mut chunk_pruning_time = chunk_pruning_time.fuse(); + select! { incoming = ctx.recv().fuse() => { match incoming { - Ok(FromOverseer::Signal(Conclude)) => break, - Ok(FromOverseer::Signal(_)) => (), + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => break, + Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate { activated, .. }) + )) => { + for activated in activated.into_iter() { + process_block_activated(ctx, &subsystem.inner, activated).await?; + } + } + Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => { + process_block_finalized(&subsystem, ctx, &subsystem.inner, hash).await?; + } Ok(FromOverseer::Communication { msg }) => { - process_message(&subsystem.inner, &subsystem.metrics, msg)?; + process_message(&mut subsystem, ctx, msg).await?; } Err(_) => break, } } + pov_pruning_time = pov_pruning_time => { + subsystem.prune_povs()?; + } + chunk_pruning_time = chunk_pruning_time => { + subsystem.prune_chunks()?; + } complete => break, } } @@ -149,23 +494,164 @@ where Ok(()) } -fn process_message(db: &Arc, metrics: &Metrics, msg: AvailabilityStoreMessage) -> Result<(), Error> { +/// As soon as certain block is finalized its pruning records and records of all +/// blocks that we keep that are `older` than the block in question have to be updated. +/// +/// The state of data has to be changed from +/// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have +/// to be updated to `now` + keep_finalized_{block, chunk}_for`. +async fn process_block_finalized( + subsystem: &AvailabilityStoreSubsystem, + ctx: &mut Context, + db: &Arc, + hash: Hash, +) -> Result<(), Error> +where + Context: SubsystemContext +{ + let block_number = get_block_number(ctx, hash).await?; + + if let Some(mut pov_pruning) = pov_pruning(db) { + // Since the records are sorted by time in which they need to be pruned and not by block + // numbers we have to iterate through the whole collection here. + for record in pov_pruning.iter_mut() { + if record.block_number <= block_number { + log::trace!( + target: LOG_TARGET, + "Updating pruning record for finalized block {}", + record.candidate_hash, + ); + + record.prune_at = PruningDelay::into_the_future( + subsystem.pruning_config.keep_finalized_block_for + )?; + record.candidate_state = CandidateState::Finalized; + } + } + + put_pov_pruning(db, None, pov_pruning)?; + } + + if let Some(mut chunk_pruning) = chunk_pruning(db) { + for record in chunk_pruning.iter_mut() { + if record.block_number <= block_number { + log::trace!( + target: LOG_TARGET, + "Updating chunk pruning record for finalized block {}", + record.candidate_hash, + ); + + record.prune_at = PruningDelay::into_the_future( + subsystem.pruning_config.keep_finalized_chunk_for + )?; + record.candidate_state = CandidateState::Finalized; + } + } + + put_chunk_pruning(db, None, chunk_pruning)?; + } + + Ok(()) +} + +async fn process_block_activated( + ctx: &mut Context, + db: &Arc, + hash: Hash, +) -> Result<(), Error> +where + Context: SubsystemContext +{ + let events = request_candidate_events(ctx, hash).await?; + + log::trace!(target: LOG_TARGET, "block activated {}", hash); + let mut included = HashSet::new(); + + for event in events.into_iter() { + if let CandidateEvent::CandidateIncluded(receipt, _) = event { + log::trace!(target: LOG_TARGET, "Candidate {} was included", receipt.hash()); + included.insert(receipt.hash()); + } + } + + if let Some(mut pov_pruning) = pov_pruning(db) { + for record in pov_pruning.iter_mut() { + if included.contains(&record.candidate_hash) { + record.prune_at = PruningDelay::Indefinite; + record.candidate_state = CandidateState::Included; + } + } + + pov_pruning.sort(); + + put_pov_pruning(db, None, pov_pruning)?; + } + + if let Some(mut chunk_pruning) = chunk_pruning(db) { + for record in chunk_pruning.iter_mut() { + if included.contains(&record.candidate_hash) { + record.prune_at = PruningDelay::Indefinite; + record.candidate_state = CandidateState::Included; + } + } + + chunk_pruning.sort(); + + put_chunk_pruning(db, None, chunk_pruning)?; + } + + Ok(()) +} + +async fn request_candidate_events( + ctx: &mut Context, + hash: Hash, +) -> Result, Error> +where + Context: SubsystemContext +{ + let (tx, rx) = oneshot::channel(); + + let msg = AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::CandidateEvents(tx), + )); + + ctx.send_message(msg.into()).await?; + + Ok(rx.await??) +} + +async fn process_message( + subsystem: &mut AvailabilityStoreSubsystem, + ctx: &mut Context, + msg: AvailabilityStoreMessage, +) -> Result<(), Error> +where + Context: SubsystemContext +{ use AvailabilityStoreMessage::*; match msg { QueryAvailableData(hash, tx) => { - tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?; + tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)) + .map_err(|_| oneshot::Canceled)?; } QueryDataAvailability(hash, tx) => { - tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?; + tx.send(available_data(&subsystem.inner, &hash).is_some()) + .map_err(|_| oneshot::Canceled)?; } QueryChunk(hash, id, tx) => { - tx.send(get_chunk(db, &hash, id, metrics)?).map_err(|_| oneshot::Canceled)?; + tx.send(get_chunk(subsystem, &hash, id)?) + .map_err(|_| oneshot::Canceled)?; } QueryChunkAvailability(hash, id, tx) => { - tx.send(get_chunk(db, &hash, id, metrics)?.is_some()).map_err(|_| oneshot::Canceled)?; + tx.send(get_chunk(subsystem, &hash, id)?.is_some()) + .map_err(|_| oneshot::Canceled)?; } - StoreChunk(hash, id, chunk, tx) => { - match store_chunk(db, &hash, id, chunk) { + StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { + // Current block number is relay_parent block number + 1. + let block_number = get_block_number(ctx, relay_parent).await? + 1; + match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); @@ -176,7 +662,7 @@ fn process_message(db: &Arc, metrics: &Metrics, msg: Availabilit } } StoreAvailableData(hash, id, n_validators, av_data, tx) => { - match store_available_data(db, &hash, id, n_validators, av_data, metrics) { + match store_available_data(subsystem, &hash, id, n_validators, av_data) { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); @@ -195,19 +681,126 @@ fn available_data(db: &Arc, candidate_hash: &Hash) -> Option) -> Option> { + query_inner(db, columns::META, &POV_PRUNING_KEY) +} + +fn chunk_pruning(db: &Arc) -> Option> { + query_inner(db, columns::META, &CHUNK_PRUNING_KEY) +} + +fn put_pov_pruning( db: &Arc, + tx: Option, + mut pov_pruning: Vec, +) -> Result<(), Error> { + let mut tx = tx.unwrap_or_default(); + + pov_pruning.sort(); + + tx.put_vec( + columns::META, + &POV_PRUNING_KEY, + pov_pruning.encode(), + ); + + match pov_pruning.get(0) { + // We want to wake up in case we have some records that are not scheduled to be kept + // indefinitely (data is included and waiting to move to the finalized state) and so + // the is at least one value that is not `PruningDelay::Indefinite`. + Some(PoVPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => { + tx.put_vec( + columns::META, + &NEXT_POV_PRUNING, + NextPoVPruning(*prune_at).encode(), + ); + } + _ => { + // If there is no longer any records, delete the cached pruning time record. + tx.delete( + columns::META, + &NEXT_POV_PRUNING, + ); + } + } + + db.write(tx)?; + + Ok(()) +} + +fn put_chunk_pruning( + db: &Arc, + tx: Option, + mut chunk_pruning: Vec, +) -> Result<(), Error> { + let mut tx = tx.unwrap_or_default(); + + chunk_pruning.sort(); + + tx.put_vec( + columns::META, + &CHUNK_PRUNING_KEY, + chunk_pruning.encode(), + ); + + match chunk_pruning.get(0) { + Some(ChunkPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => { + tx.put_vec( + columns::META, + &NEXT_CHUNK_PRUNING, + NextChunkPruning(*prune_at).encode(), + ); + } + _ => { + tx.delete( + columns::META, + &NEXT_CHUNK_PRUNING, + ); + } + } + + db.write(tx)?; + + Ok(()) +} + +// produces a block number by block's hash. +// in the the event of an invalid `block_hash`, returns `Ok(0)` +async fn get_block_number( + ctx: &mut Context, + block_hash: Hash, +) -> Result +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + + ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await?; + + Ok(rx.await??.map(|number| number).unwrap_or_default()) +} + +fn store_available_data( + subsystem: &mut AvailabilityStoreSubsystem, candidate_hash: &Hash, id: Option, n_validators: u32, available_data: AvailableData, - metrics: &Metrics, ) -> Result<(), Error> { let mut tx = DBTransaction::new(); + let block_number = available_data.validation_data.block_number; + if let Some(index) = id { - let chunks = get_chunks(&available_data, n_validators as usize, metrics)?; - store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?; + let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?; + store_chunk( + subsystem, + candidate_hash, + n_validators, + chunks[index as usize].clone(), + block_number, + )?; } let stored_data = StoredAvailableData { @@ -215,45 +808,120 @@ fn store_available_data( n_validators, }; + let mut pov_pruning = pov_pruning(&subsystem.inner).unwrap_or_default(); + let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?; + + if let Some(next_pruning) = prune_at.as_duration() { + tx.put_vec( + columns::META, + &NEXT_POV_PRUNING, + NextPoVPruning(next_pruning).encode(), + ); + } + + let pruning_record = PoVPruningRecord { + candidate_hash: candidate_hash.clone(), + block_number, + candidate_state: CandidateState::Stored, + prune_at, + }; + + let idx = pov_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); + + pov_pruning.insert(idx, pruning_record); + tx.put_vec( columns::DATA, available_data_key(&candidate_hash).as_slice(), stored_data.encode(), ); - db.write(tx)?; + tx.put_vec( + columns::META, + &POV_PRUNING_KEY, + pov_pruning.encode(), + ); + + subsystem.inner.write(tx)?; Ok(()) } -fn store_chunk(db: &Arc, candidate_hash: &Hash, _n_validators: u32, chunk: ErasureChunk) - -> Result<(), Error> -{ +fn store_chunk( + subsystem: &mut AvailabilityStoreSubsystem, + candidate_hash: &Hash, + _n_validators: u32, + chunk: ErasureChunk, + block_number: BlockNumber, +) -> Result<(), Error> { let mut tx = DBTransaction::new(); let dbkey = erasure_chunk_key(candidate_hash, chunk.index); - tx.put_vec(columns::DATA, &dbkey, chunk.encode()); - db.write(tx)?; + let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default(); + let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?; + + if let Some(delay) = prune_at.as_duration() { + tx.put_vec( + columns::META, + &NEXT_CHUNK_PRUNING, + NextChunkPruning(delay).encode(), + ); + } + + let pruning_record = ChunkPruningRecord { + candidate_hash: candidate_hash.clone(), + block_number, + candidate_state: CandidateState::Stored, + chunk_index: chunk.index, + prune_at, + }; + + let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); + + chunk_pruning.insert(idx, pruning_record); + + tx.put_vec( + columns::DATA, + &dbkey, + chunk.encode(), + ); + + tx.put_vec( + columns::META, + &CHUNK_PRUNING_KEY, + chunk_pruning.encode(), + ); + + subsystem.inner.write(tx)?; Ok(()) } -fn get_chunk(db: &Arc, candidate_hash: &Hash, index: u32, metrics: &Metrics) - -> Result, Error> -{ +fn get_chunk( + subsystem: &mut AvailabilityStoreSubsystem, + candidate_hash: &Hash, + index: u32, +) -> Result, Error> { if let Some(chunk) = query_inner( - db, + &subsystem.inner, columns::DATA, - &erasure_chunk_key(candidate_hash, index)) { + &erasure_chunk_key(candidate_hash, index) + ) { return Ok(Some(chunk)); } - if let Some(data) = available_data(db, candidate_hash) { - let mut chunks = get_chunks(&data.data, data.n_validators as usize, metrics)?; + if let Some(data) = available_data(&subsystem.inner, candidate_hash) { + let mut chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?; let desired_chunk = chunks.get(index as usize).cloned(); for chunk in chunks.drain(..) { - store_chunk(db, candidate_hash, data.n_validators, chunk)?; + store_chunk( + subsystem, + candidate_hash, + data.n_validators, + chunk, + data.data.validation_data.block_number, + )?; } return Ok(desired_chunk); } @@ -282,7 +950,7 @@ impl Subsystem for AvailabilityStoreSubsystem fn start(self, ctx: Context) -> SpawnedSubsystem { let future = Box::pin(async move { if let Err(e) = run(self, ctx).await { - log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e); + log::error!(target: LOG_TARGET, "Subsystem exited with an error {:?}", e); } }); @@ -347,223 +1015,4 @@ impl metrics::Metrics for Metrics { } #[cfg(test)] -mod tests { - use super::*; - use futures::{ - future, - channel::oneshot, - executor, - Future, - }; - use std::cell::RefCell; - use polkadot_primitives::v1::{ - AvailableData, BlockData, HeadData, PersistedValidationData, PoV, - }; - use polkadot_node_subsystem_test_helpers as test_helpers; - - struct TestHarness { - virtual_overseer: test_helpers::TestSubsystemContextHandle, - } - - thread_local! { - static TIME_NOW: RefCell> = RefCell::new(None); - } - - struct TestState { - persisted_validation_data: PersistedValidationData, - } - - impl Default for TestState { - fn default() -> Self { - - let persisted_validation_data = PersistedValidationData { - parent_head: HeadData(vec![7, 8, 9]), - block_number: Default::default(), - hrmp_mqc_heads: Vec::new(), - }; - Self { - persisted_validation_data, - } - } - } - - fn test_harness>( - store: Arc, - test: impl FnOnce(TestHarness) -> T, - ) { - let pool = sp_core::testing::TaskExecutor::new(); - let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - - let subsystem = AvailabilityStoreSubsystem::new_in_memory(store); - let subsystem = run(subsystem, context); - - let test_fut = test(TestHarness { - virtual_overseer, - }); - - futures::pin_mut!(test_fut); - futures::pin_mut!(subsystem); - - executor::block_on(future::select(test_fut, subsystem)); - } - - #[test] - fn store_chunk_works() { - let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - let relay_parent = Hash::from([1; 32]); - let validator_index = 5; - - let chunk = ErasureChunk { - chunk: vec![1, 2, 3], - index: validator_index, - proof: vec![vec![3, 4, 5]], - }; - - let (tx, rx) = oneshot::channel(); - - let chunk_msg = AvailabilityStoreMessage::StoreChunk( - relay_parent, - validator_index, - chunk.clone(), - tx, - ); - - virtual_overseer.send(FromOverseer::Communication{ msg: chunk_msg }).await; - assert_eq!(rx.await.unwrap(), Ok(())); - - let (tx, rx) = oneshot::channel(); - let query_chunk = AvailabilityStoreMessage::QueryChunk( - relay_parent, - validator_index, - tx, - ); - - virtual_overseer.send(FromOverseer::Communication{ msg: query_chunk }).await; - - assert_eq!(rx.await.unwrap().unwrap(), chunk); - }); - } - - #[test] - fn store_block_works() { - let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - let test_state = TestState::default(); - test_harness(store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - let candidate_hash = Hash::from([1; 32]); - let validator_index = 5; - let n_validators = 10; - - let pov = PoV { - block_data: BlockData(vec![4, 5, 6]), - }; - - let available_data = AvailableData { - pov, - validation_data: test_state.persisted_validation_data, - }; - - - let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( - candidate_hash, - Some(validator_index), - n_validators, - available_data.clone(), - tx, - ); - - virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; - assert_eq!(rx.await.unwrap(), Ok(())); - - let pov = query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(); - assert_eq!(pov, available_data); - - let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); - - let chunks = erasure::obtain_chunks_v1(10, &available_data).unwrap(); - - let mut branches = erasure::branches(chunks.as_ref()); - - let branch = branches.nth(5).unwrap(); - let expected_chunk = ErasureChunk { - chunk: branch.1.to_vec(), - index: 5, - proof: branch.0, - }; - - assert_eq!(chunk, expected_chunk); - }); - } - - - #[test] - fn store_pov_and_query_chunk_works() { - let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - let test_state = TestState::default(); - - test_harness(store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - let candidate_hash = Hash::from([1; 32]); - let n_validators = 10; - - let pov = PoV { - block_data: BlockData(vec![4, 5, 6]), - }; - - let available_data = AvailableData { - pov, - validation_data: test_state.persisted_validation_data, - }; - - let no_metrics = Metrics(None); - let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap(); - - let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( - candidate_hash, - None, - n_validators, - available_data, - tx, - ); - - virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; - - assert_eq!(rx.await.unwrap(), Ok(())); - - for validator_index in 0..n_validators { - let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); - - assert_eq!(chunk, chunks_expected[validator_index as usize]); - } - }); - } - - async fn query_available_data( - virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, - candidate_hash: Hash, - ) -> Option { - let (tx, rx) = oneshot::channel(); - - let query = AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx); - virtual_overseer.send(FromOverseer::Communication{ msg: query }).await; - - rx.await.unwrap() - } - - async fn query_chunk( - virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, - candidate_hash: Hash, - index: u32, - ) -> Option { - let (tx, rx) = oneshot::channel(); - - let query = AvailabilityStoreMessage::QueryChunk(candidate_hash, index, tx); - virtual_overseer.send(FromOverseer::Communication{ msg: query }).await; - - rx.await.unwrap() - } -} +mod tests; diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs new file mode 100644 index 0000000000..680eb74b93 --- /dev/null +++ b/polkadot/node/core/av-store/src/tests.rs @@ -0,0 +1,828 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; + +use assert_matches::assert_matches; +use futures::{ + future, + channel::oneshot, + executor, + Future, +}; +use smallvec::smallvec; + +use polkadot_primitives::v1::{ + AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData, + PersistedValidationData, PoV, Id as ParaId, +}; +use polkadot_node_subsystem_util::TimeoutExt; +use polkadot_subsystem::ActiveLeavesUpdate; +use polkadot_node_subsystem_test_helpers as test_helpers; + +struct TestHarness { + virtual_overseer: test_helpers::TestSubsystemContextHandle, +} + +#[derive(Default)] +struct TestCandidateBuilder { + para_id: ParaId, + pov_hash: Hash, + relay_parent: Hash, + commitments_hash: Hash, +} + +impl TestCandidateBuilder { + fn build(self) -> CandidateReceipt { + CandidateReceipt { + descriptor: CandidateDescriptor { + para_id: self.para_id, + pov_hash: self.pov_hash, + relay_parent: self.relay_parent, + ..Default::default() + }, + commitments_hash: self.commitments_hash, + } + } +} + +struct TestState { + persisted_validation_data: PersistedValidationData, + pruning_config: PruningConfig, +} + +impl Default for TestState { + fn default() -> Self { + let persisted_validation_data = PersistedValidationData { + parent_head: HeadData(vec![7, 8, 9]), + block_number: 5, + hrmp_mqc_heads: Vec::new(), + }; + + let pruning_config = PruningConfig { + keep_stored_block_for: Duration::from_secs(1), + keep_finalized_block_for: Duration::from_secs(2), + keep_finalized_chunk_for: Duration::from_secs(2), + }; + + Self { + persisted_validation_data, + pruning_config, + } + } +} + +fn test_harness>( + pruning_config: PruningConfig, + store: Arc, + test: impl FnOnce(TestHarness) -> T, +) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_node_core_av_store"), + log::LevelFilter::Trace, + ) + .filter( + Some(LOG_TARGET), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = AvailabilityStoreSubsystem::new_in_memory(store, pruning_config); + let subsystem = run(subsystem, context); + + let test_fut = test(TestHarness { + virtual_overseer, + }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); +} + +const TIMEOUT: Duration = Duration::from_millis(100); + +async fn overseer_send( + overseer: &mut test_helpers::TestSubsystemContextHandle, + msg: AvailabilityStoreMessage, +) { + log::trace!("Sending message:\n{:?}", &msg); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT)); +} + +async fn overseer_recv( + overseer: &mut test_helpers::TestSubsystemContextHandle, +) -> AllMessages { + let msg = overseer_recv_with_timeout(overseer, TIMEOUT) + .await + .expect(&format!("{:?} is more than enough to receive messages", TIMEOUT)); + + log::trace!("Received message:\n{:?}", &msg); + + msg +} + +async fn overseer_recv_with_timeout( + overseer: &mut test_helpers::TestSubsystemContextHandle, + timeout: Duration, +) -> Option { + log::trace!("Waiting for message..."); + overseer + .recv() + .timeout(timeout) + .await +} + +async fn overseer_signal( + overseer: &mut test_helpers::TestSubsystemContextHandle, + signal: OverseerSignal, +) { + overseer + .send(FromOverseer::Signal(signal)) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); +} + +#[test] +fn store_chunk_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + test_harness(PruningConfig::default(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let relay_parent = Hash::repeat_byte(32); + let candidate_hash = Hash::repeat_byte(33); + let validator_index = 5; + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: validator_index, + proof: vec![vec![3, 4, 5]], + }; + + let (tx, rx) = oneshot::channel(); + + let chunk_msg = AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent, + validator_index, + chunk: chunk.clone(), + tx, + }; + + overseer_send(&mut virtual_overseer, chunk_msg.into()).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + hash, + tx, + )) => { + assert_eq!(hash, relay_parent); + tx.send(Ok(Some(4))).unwrap(); + } + ); + + assert_eq!(rx.await.unwrap(), Ok(())); + + let (tx, rx) = oneshot::channel(); + let query_chunk = AvailabilityStoreMessage::QueryChunk( + candidate_hash, + validator_index, + tx, + ); + + overseer_send(&mut virtual_overseer, query_chunk.into()).await; + + assert_eq!(rx.await.unwrap().unwrap(), chunk); + }); +} + +#[test] +fn store_block_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = Hash::from([1; 32]); + let validator_index = 5; + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let available_data = AvailableData { + pov, + validation_data: test_state.persisted_validation_data, + }; + + + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + Some(validator_index), + n_validators, + available_data.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; + assert_eq!(rx.await.unwrap(), Ok(())); + + let pov = query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(); + assert_eq!(pov, available_data); + + let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); + + let chunks = erasure::obtain_chunks_v1(10, &available_data).unwrap(); + + let mut branches = erasure::branches(chunks.as_ref()); + + let branch = branches.nth(5).unwrap(); + let expected_chunk = ErasureChunk { + chunk: branch.1.to_vec(), + index: 5, + proof: branch.0, + }; + + assert_eq!(chunk, expected_chunk); + }); +} + + +#[test] +fn store_pov_and_query_chunk_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = Hash::from([1; 32]); + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let available_data = AvailableData { + pov, + validation_data: test_state.persisted_validation_data, + }; + + let no_metrics = Metrics(None); + let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap(); + + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + None, + n_validators, + available_data, + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; + + assert_eq!(rx.await.unwrap(), Ok(())); + + for validator_index in 0..n_validators { + let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); + + assert_eq!(chunk, chunks_expected[validator_index as usize]); + } + }); +} + +#[test] +fn stored_but_not_included_chunk_is_pruned() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = Hash::repeat_byte(1); + let relay_parent = Hash::repeat_byte(2); + let validator_index = 5; + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: validator_index, + proof: vec![vec![3, 4, 5]], + }; + + let (tx, rx) = oneshot::channel(); + let chunk_msg = AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent, + validator_index, + chunk: chunk.clone(), + tx, + }; + + overseer_send(&mut virtual_overseer, chunk_msg.into()).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + hash, + tx, + )) => { + assert_eq!(hash, relay_parent); + tx.send(Ok(Some(4))).unwrap(); + } + ); + + rx.await.unwrap().unwrap(); + + // At this point data should be in the store. + assert_eq!( + query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), + chunk, + ); + + // Wait for twice as long as the stored block kept for. + Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await; + + // The block was not included by this point so it should be pruned now. + assert!(query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.is_none()); + }); +} + +#[test] +fn stored_but_not_included_data_is_pruned() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = Hash::repeat_byte(1); + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let available_data = AvailableData { + pov, + validation_data: test_state.persisted_validation_data, + }; + + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + None, + n_validators, + available_data.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; + + rx.await.unwrap().unwrap(); + + // At this point data should be in the store. + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(), + available_data, + ); + + // Wait for twice as long as the stored block kept for. + Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await; + + // The block was not included by this point so it should be pruned now. + assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none()); + }); +} + +#[test] +fn stored_data_kept_until_finalized() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let pov_hash = pov.hash(); + + let candidate = TestCandidateBuilder { + pov_hash, + ..Default::default() + }.build(); + + let candidate_hash = candidate.hash(); + + let available_data = AvailableData { + pov, + validation_data: test_state.persisted_validation_data, + }; + + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + None, + n_validators, + available_data.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; + + rx.await.unwrap().unwrap(); + + // At this point data should be in the store. + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(), + available_data, + ); + + let new_leaf = Hash::repeat_byte(2); + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![new_leaf.clone()], + deactivated: smallvec![], + }), + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(vec![ + CandidateEvent::CandidateIncluded(candidate, HeadData::default()), + ])).unwrap(); + } + ); + + Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await; + + // At this point data should _still_ be in the store. + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(), + available_data, + ); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::BlockFinalized(new_leaf) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + hash, + tx, + )) => { + assert_eq!(hash, new_leaf); + tx.send(Ok(Some(10))).unwrap(); + } + ); + + // Wait for a half of the time finalized data should be available for + Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await; + + // At this point data should _still_ be in the store. + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(), + available_data, + ); + + // Wait until it is should be gone. + Delay::new(test_state.pruning_config.keep_finalized_block_for).await; + + // At this point data should be gone from the store. + assert!( + query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(), + ); + }); +} + +#[test] +fn stored_chunk_kept_until_finalized() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let relay_parent = Hash::repeat_byte(2); + let validator_index = 5; + let candidate = TestCandidateBuilder { + ..Default::default() + }.build(); + let candidate_hash = candidate.hash(); + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: validator_index, + proof: vec![vec![3, 4, 5]], + }; + + let (tx, rx) = oneshot::channel(); + let chunk_msg = AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent, + validator_index, + chunk: chunk.clone(), + tx, + }; + + overseer_send(&mut virtual_overseer, chunk_msg.into()).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + hash, + tx, + )) => { + assert_eq!(hash, relay_parent); + tx.send(Ok(Some(4))).unwrap(); + } + ); + + rx.await.unwrap().unwrap(); + + // At this point data should be in the store. + assert_eq!( + query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), + chunk, + ); + + let new_leaf = Hash::repeat_byte(2); + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![new_leaf.clone()], + deactivated: smallvec![], + }), + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Ok(vec![ + CandidateEvent::CandidateIncluded(candidate, HeadData::default()), + ])).unwrap(); + } + ); + + Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await; + + // At this point data should _still_ be in the store. + assert_eq!( + query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), + chunk, + ); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::BlockFinalized(new_leaf) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + hash, + tx, + )) => { + assert_eq!(hash, new_leaf); + tx.send(Ok(Some(10))).unwrap(); + } + ); + + // Wait for a half of the time finalized data should be available for + Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await; + + // At this point data should _still_ be in the store. + assert_eq!( + query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(), + chunk, + ); + + // Wait until it is should be gone. + Delay::new(test_state.pruning_config.keep_finalized_chunk_for).await; + + // At this point data should be gone from the store. + assert!( + query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(), + ); + }); +} + +#[test] +fn forkfullness_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let n_validators = 10; + + let pov_1 = PoV { + block_data: BlockData(vec![1, 2, 3]), + }; + + let pov_1_hash = pov_1.hash(); + + let pov_2 = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let pov_2_hash = pov_2.hash(); + + let candidate_1 = TestCandidateBuilder { + pov_hash: pov_1_hash, + ..Default::default() + }.build(); + + let candidate_1_hash = candidate_1.hash(); + + let candidate_2 = TestCandidateBuilder { + pov_hash: pov_2_hash, + ..Default::default() + }.build(); + + let candidate_2_hash = candidate_2.hash(); + + let available_data_1 = AvailableData { + pov: pov_1, + validation_data: test_state.persisted_validation_data.clone(), + }; + + let available_data_2 = AvailableData { + pov: pov_2, + validation_data: test_state.persisted_validation_data, + }; + + let (tx, rx) = oneshot::channel(); + let msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_1_hash, + None, + n_validators, + available_data_1.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg }).await; + + rx.await.unwrap().unwrap(); + + let (tx, rx) = oneshot::channel(); + let msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_2_hash, + None, + n_validators, + available_data_2.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg }).await; + + rx.await.unwrap().unwrap(); + + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(), + available_data_1, + ); + + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(), + available_data_2, + ); + + + let new_leaf_1 = Hash::repeat_byte(2); + let new_leaf_2 = Hash::repeat_byte(3); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![new_leaf_1.clone(), new_leaf_2.clone()], + deactivated: smallvec![], + }), + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + leaf, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(leaf, new_leaf_1); + tx.send(Ok(vec![ + CandidateEvent::CandidateIncluded(candidate_1, HeadData::default()), + ])).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + leaf, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(leaf, new_leaf_2); + tx.send(Ok(vec![ + CandidateEvent::CandidateIncluded(candidate_2, HeadData::default()), + ])).unwrap(); + } + ); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::BlockFinalized(new_leaf_1) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber( + hash, + tx, + )) => { + assert_eq!(hash, new_leaf_1); + tx.send(Ok(Some(5))).unwrap(); + } + ); + + + // Data of both candidates should be still present in the DB. + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(), + available_data_1, + ); + + assert_eq!( + query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(), + available_data_2, + ); + // Wait for longer than finalized blocks should be kept for + Delay::new(test_state.pruning_config.keep_finalized_block_for + Duration::from_secs(1)).await; + + // Data of both candidates should be gone now. + assert!( + query_available_data(&mut virtual_overseer, candidate_1_hash).await.is_none(), + ); + + assert!( + query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(), + ); + }); +} + +async fn query_available_data( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + candidate_hash: Hash, +) -> Option { + let (tx, rx) = oneshot::channel(); + + let query = AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx); + virtual_overseer.send(FromOverseer::Communication{ msg: query }).await; + + rx.await.unwrap() +} + +async fn query_chunk( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + candidate_hash: Hash, + index: u32, +) -> Option { + let (tx, rx) = oneshot::channel(); + + let query = AvailabilityStoreMessage::QueryChunk(candidate_hash, index, tx); + virtual_overseer.send(FromOverseer::Communication{ msg: query }).await; + + rx.await.unwrap() +} diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 2674460065..b2e20ac8eb 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -661,6 +661,7 @@ where if let Err(_e) = store_chunk( ctx, message.candidate_hash.clone(), + live_candidate.descriptor.relay_parent.clone(), message.erasure_chunk.index, message.erasure_chunk.clone(), ).await? { @@ -949,6 +950,7 @@ where async fn store_chunk( ctx: &mut Context, candidate_hash: Hash, + relay_parent: Hash, validator_index: ValidatorIndex, erasure_chunk: ErasureChunk, ) -> Result> @@ -957,7 +959,13 @@ where { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx), + AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent, + validator_index, + chunk: erasure_chunk, + tx, + } )).await?; rx.await.map_err::(Into::into) } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 22e9f097ff..5f2fef884f 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -305,7 +305,18 @@ pub enum AvailabilityStoreMessage { /// Store an `ErasureChunk` in the AV store. /// /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. - StoreChunk(Hash, ValidatorIndex, ErasureChunk, oneshot::Sender>), + StoreChunk { + /// A hash of the candidate this chunk belongs to. + candidate_hash: Hash, + /// A relevant relay parent. + relay_parent: Hash, + /// The index of the validator this chunk belongs to. + validator_index: ValidatorIndex, + /// The chunk itself. + chunk: ErasureChunk, + /// Sending side of the channel to send result to. + tx: oneshot::Sender>, + }, /// Store a `AvailableData` in the AV store. /// If `ValidatorIndex` is present store corresponding chunk also. @@ -315,15 +326,10 @@ pub enum AvailabilityStoreMessage { } impl AvailabilityStoreMessage { - /// If the current variant contains the relay parent hash, return it. + /// In fact, none of the AvailabilityStore messages assume a particular relay parent. pub fn relay_parent(&self) -> Option { match self { - Self::QueryAvailableData(hash, _) => Some(*hash), - Self::QueryDataAvailability(hash, _) => Some(*hash), - Self::QueryChunk(hash, _, _) => Some(*hash), - Self::QueryChunkAvailability(hash, _, _) => Some(*hash), - Self::StoreChunk(hash, _, _, _) => Some(*hash), - Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash), + _ => None, } } }