diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8033adade8..9800da4d0d 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -3354,9 +3354,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.6.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be716eb6878ca2263eb5d00a781aa13264a794f519fe6af4fbb2668b2d5441c0" +checksum = "3aae342b73d57ad0b8b364bd12584819f2c1fe9114285dfcf8b0722607671635" dependencies = [ "hashbrown", ] diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 77f5dfb0c5..27973921d2 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -94,7 +94,7 @@ impl Error { } /// A wrapper type for delays. -#[derive(Debug, Decode, Encode, Eq)] +#[derive(Clone, Debug, Decode, Encode, Eq)] enum PruningDelay { /// This pruning should be triggered after this `Duration` from UNIX_EPOCH. In(Duration), @@ -315,13 +315,14 @@ impl PartialOrd for ChunkPruningRecord { pub struct AvailabilityStoreSubsystem { pruning_config: PruningConfig, inner: Arc, + chunks_cache: HashMap>, metrics: Metrics, } impl AvailabilityStoreSubsystem { // Perform pruning of PoVs #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn prune_povs(&self) -> Result<(), Error> { + fn prune_povs(&mut self) -> Result<(), Error> { let _timer = self.metrics.time_prune_povs(); let mut tx = DBTransaction::new(); @@ -335,20 +336,22 @@ impl AvailabilityStoreSubsystem { for record in pov_pruning.drain(..outdated_records_count) { tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record"); + + self.chunks_cache.remove(&record.candidate_hash); tx.delete( columns::DATA, available_data_key(&record.candidate_hash).as_slice(), ); } - put_pov_pruning(&self.inner, Some(tx), pov_pruning)?; + put_pov_pruning(&self.inner, Some(tx), pov_pruning, &self.metrics)?; Ok(()) } // Perform pruning of chunks. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn prune_chunks(&self) -> Result<(), Error> { + fn prune_chunks(&mut self) -> Result<(), Error> { let _timer = self.metrics.time_prune_chunks(); let mut tx = DBTransaction::new(); @@ -362,13 +365,15 @@ impl AvailabilityStoreSubsystem { for record in chunk_pruning.drain(..outdated_records_count) { tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record"); + + self.chunks_cache.remove(&record.candidate_hash); tx.delete( columns::DATA, erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(), ); } - put_chunk_pruning(&self.inner, Some(tx), chunk_pruning)?; + put_chunk_pruning(&self.inner, Some(tx), chunk_pruning, &self.metrics)?; Ok(()) } @@ -468,6 +473,7 @@ impl AvailabilityStoreSubsystem { Ok(Self { pruning_config: PruningConfig::default(), inner: Arc::new(db), + chunks_cache: HashMap::new(), metrics, }) } @@ -477,6 +483,7 @@ impl AvailabilityStoreSubsystem { Self { pruning_config, inner, + chunks_cache: HashMap::new(), metrics: Metrics(None), } } @@ -535,7 +542,7 @@ where ActiveLeavesUpdate { activated, .. }) ) => { for (activated, _span) in activated.into_iter() { - process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?; + process_block_activated(ctx, subsystem, activated).await?; } } FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { @@ -590,7 +597,7 @@ async fn process_block_finalized( } } - put_pov_pruning(db, None, pov_pruning)?; + put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?; } if let Some(mut chunk_pruning) = chunk_pruning(db) { @@ -609,23 +616,23 @@ async fn process_block_finalized( } } - put_chunk_pruning(db, None, chunk_pruning)?; + put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?; } Ok(()) } -#[tracing::instrument(level = "trace", skip(ctx, db, metrics), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, subsystem), fields(subsystem = LOG_TARGET))] async fn process_block_activated( ctx: &mut Context, - db: &Arc, + subsystem: &mut AvailabilityStoreSubsystem, hash: Hash, - metrics: &Metrics, ) -> Result<(), Error> where Context: SubsystemContext { - let _timer = metrics.time_block_activated(); + let _timer = subsystem.metrics.time_block_activated(); + let db = &subsystem.inner; let events = match request_candidate_events(ctx, hash).await { Ok(events) => events, @@ -649,6 +656,10 @@ where } } + for included in &included { + subsystem.chunks_cache.remove(&included); + } + if let Some(mut pov_pruning) = pov_pruning(db) { for record in pov_pruning.iter_mut() { if included.contains(&record.candidate_hash) { @@ -659,7 +670,7 @@ where pov_pruning.sort(); - put_pov_pruning(db, None, pov_pruning)?; + put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?; } if let Some(mut chunk_pruning) = chunk_pruning(db) { @@ -672,7 +683,7 @@ where chunk_pruning.sort(); - put_chunk_pruning(db, None, chunk_pruning)?; + put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?; } Ok(()) @@ -742,11 +753,11 @@ where tx.send(result?).map_err(|_| oneshot::Canceled)?; } - StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { + StoreChunk { candidate_hash, relay_parent, chunk, tx } => { let chunk_index = chunk.index; // Current block number is relay_parent block number + 1. let block_number = get_block_number(ctx, relay_parent).await? + 1; - let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number); + let result = store_chunks(subsystem, &candidate_hash, vec![chunk], block_number); tracing::trace!( target: LOG_TARGET, @@ -802,14 +813,17 @@ fn chunk_pruning(db: &Arc) -> Option> { query_inner(db, columns::META, &CHUNK_PRUNING_KEY) } -#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))] fn put_pov_pruning( db: &Arc, tx: Option, mut pov_pruning: Vec, + metrics: &Metrics, ) -> Result<(), Error> { let mut tx = tx.unwrap_or_default(); + metrics.block_pruning_records_size(pov_pruning.len()); + pov_pruning.sort(); tx.put_vec( @@ -843,14 +857,17 @@ fn put_pov_pruning( Ok(()) } -#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))] fn put_chunk_pruning( db: &Arc, tx: Option, mut chunk_pruning: Vec, + metrics: &Metrics, ) -> Result<(), Error> { let mut tx = tx.unwrap_or_default(); + metrics.chunk_pruning_records_size(chunk_pruning.len()); + chunk_pruning.sort(); tx.put_vec( @@ -910,16 +927,13 @@ fn store_available_data( let block_number = available_data.validation_data.block_number; - if let Some(index) = id { - let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?; - store_chunk( - subsystem, - candidate_hash, - n_validators, - chunks[index as usize].clone(), - block_number, - )?; - } + let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?; + store_chunks( + subsystem, + candidate_hash, + chunks, + block_number, + )?; let stored_data = StoredAvailableData { data: available_data, @@ -966,23 +980,19 @@ fn store_available_data( } #[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))] -fn store_chunk( +fn store_chunks( subsystem: &mut AvailabilityStoreSubsystem, candidate_hash: &CandidateHash, - _n_validators: u32, - chunk: ErasureChunk, + chunks: Vec, block_number: BlockNumber, ) -> Result<(), Error> { - let _timer = subsystem.metrics.time_store_chunk(); + let _timer = subsystem.metrics.time_store_chunks(); let mut tx = DBTransaction::new(); - - let dbkey = erasure_chunk_key(candidate_hash, chunk.index); - let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default(); - let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?; - if let Some(delay) = prune_at.as_duration() { + let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?; + if let Some(delay) = prune_at.clone().as_duration() { tx.put_vec( columns::META, &NEXT_CHUNK_PRUNING, @@ -990,23 +1000,29 @@ fn store_chunk( ); } - let pruning_record = ChunkPruningRecord { - candidate_hash: candidate_hash.clone(), - block_number, - candidate_state: CandidateState::Stored, - chunk_index: chunk.index, - prune_at, - }; + for chunk in chunks { + subsystem.chunks_cache.entry(*candidate_hash).or_default().insert(chunk.index, chunk.clone()); - let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); + let pruning_record = ChunkPruningRecord { + candidate_hash: candidate_hash.clone(), + block_number, + candidate_state: CandidateState::Stored, + chunk_index: chunk.index, + prune_at: prune_at.clone(), + }; - chunk_pruning.insert(idx, pruning_record); + let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); - tx.put_vec( - columns::DATA, - &dbkey, - chunk.encode(), - ); + chunk_pruning.insert(idx, pruning_record); + + let dbkey = erasure_chunk_key(candidate_hash, chunk.index); + + tx.put_vec( + columns::DATA, + &dbkey, + chunk.encode(), + ); + } tx.put_vec( columns::META, @@ -1027,6 +1043,12 @@ fn get_chunk( ) -> Result, Error> { let _timer = subsystem.metrics.time_get_chunk(); + if let Some(entry) = subsystem.chunks_cache.get(candidate_hash) { + if let Some(chunk) = entry.get(&index) { + return Ok(Some(chunk.clone())); + } + } + if let Some(chunk) = query_inner( &subsystem.inner, columns::DATA, @@ -1036,17 +1058,14 @@ fn get_chunk( } if let Some(data) = available_data(&subsystem.inner, candidate_hash) { - let mut chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?; + let chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?; let desired_chunk = chunks.get(index as usize).cloned(); - for chunk in chunks.drain(..) { - store_chunk( - subsystem, - candidate_hash, - data.n_validators, - chunk, - data.data.validation_data.block_number, - )?; - } + store_chunks( + subsystem, + candidate_hash, + chunks, + data.data.validation_data.block_number, + )?; return Ok(desired_chunk); } @@ -1109,13 +1128,15 @@ fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> R #[derive(Clone)] struct MetricsInner { received_availability_chunks_total: prometheus::Counter, + chunk_pruning_records_total: prometheus::Gauge, + block_pruning_records_total: prometheus::Gauge, prune_povs: prometheus::Histogram, prune_chunks: prometheus::Histogram, process_block_finalized: prometheus::Histogram, block_activated: prometheus::Histogram, process_message: prometheus::Histogram, store_available_data: prometheus::Histogram, - store_chunk: prometheus::Histogram, + store_chunks: prometheus::Histogram, get_chunk: prometheus::Histogram, } @@ -1133,6 +1154,22 @@ impl Metrics { } } + fn chunk_pruning_records_size(&self, count: usize) { + if let Some(metrics) = &self.0 { + use core::convert::TryFrom as _; + let total = u64::try_from(count).unwrap_or_default(); + metrics.chunk_pruning_records_total.set(total); + } + } + + fn block_pruning_records_size(&self, count: usize) { + if let Some(metrics) = &self.0 { + use core::convert::TryFrom as _; + let total = u64::try_from(count).unwrap_or_default(); + metrics.block_pruning_records_total.set(total); + } + } + /// Provide a timer for `prune_povs` which observes on drop. fn time_prune_povs(&self) -> Option { self.0.as_ref().map(|metrics| metrics.prune_povs.start_timer()) @@ -1164,8 +1201,8 @@ impl Metrics { } /// Provide a timer for `store_chunk` which observes on drop. - fn time_store_chunk(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) + fn time_store_chunks(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.store_chunks.start_timer()) } /// Provide a timer for `get_chunk` which observes on drop. @@ -1184,6 +1221,20 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + chunk_pruning_records_total: prometheus::register( + prometheus::Gauge::new( + "parachain_chunk_pruning_records_total", + "Number of chunk pruning records kept by the storage.", + )?, + registry, + )?, + block_pruning_records_total: prometheus::register( + prometheus::Gauge::new( + "parachain_block_pruning_records_total", + "Number of block pruning records kept by the storage.", + )?, + registry, + )?, prune_povs: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( @@ -1238,11 +1289,11 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - store_chunk: prometheus::register( + store_chunks: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( - "parachain_av_store_store_chunk", - "Time spent within `av_store::store_chunk`", + "parachain_av_store_store_chunks", + "Time spent within `av_store::store_chunks`", ) )?, registry, diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 1d3bb26d4e..1a71981160 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -235,7 +235,6 @@ fn store_chunk_works() { let chunk_msg = AvailabilityStoreMessage::StoreChunk { candidate_hash, relay_parent, - validator_index, chunk: chunk.clone(), tx, }; @@ -385,7 +384,6 @@ fn stored_but_not_included_chunk_is_pruned() { let chunk_msg = AvailabilityStoreMessage::StoreChunk { candidate_hash, relay_parent, - validator_index, chunk: chunk.clone(), tx, }; @@ -589,7 +587,6 @@ fn stored_chunk_kept_until_finalized() { let chunk_msg = AvailabilityStoreMessage::StoreChunk { candidate_hash, relay_parent, - validator_index, chunk: chunk.clone(), tx, }; diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index c35e2868bc..80d60dc75c 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -1036,7 +1036,6 @@ where AvailabilityStoreMessage::StoreChunk { candidate_hash, relay_parent, - validator_index, chunk: erasure_chunk, tx, } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index bbd2bc37d7..3b2006bea0 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -309,8 +309,6 @@ pub enum AvailabilityStoreMessage { candidate_hash: CandidateHash, /// A relevant relay parent. relay_parent: Hash, - /// The index of the validator this chunk belongs to. - validator_index: ValidatorIndex, /// The chunk itself. chunk: ErasureChunk, /// Sending side of the channel to send result to. diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 3c3456bc71..392927f9eb 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -174,9 +174,9 @@ enum AvailabilityStoreMessage { /// Query a specific availability chunk of the candidate's erasure-coding by validator index. /// Returns the chunk and its inclusion proof against the candidate's erasure-root. QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel>), - /// Store a specific chunk of the candidate's erasure-coding by validator index, with an + /// Store a specific chunk of the candidate's erasure-coding, with an /// accompanying proof. - StoreChunk(CandidateHash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel>), + StoreChunk(CandidateHash, AvailabilityChunkAndProof, ResponseChannel>), /// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's /// `AvailabilityChunkAndProof`. StoreAvailableData(CandidateHash, Option, u32, AvailableData, ResponseChannel>),