Optimizations of av-store (#2223)

* Store all chunks and in a single transaction

* Adds chunks LRU to store

* Add pruning records metrics

* Use honest cache instead of LRU

* Remove unnecessary optional cache

* Fix review nits that are fixable
This commit is contained in:
Fedor Sakharov
2021-01-08 00:00:30 +03:00
committed by GitHub
parent 0d614374e9
commit b97f52a4c8
6 changed files with 122 additions and 77 deletions
+2 -2
View File
@@ -3354,9 +3354,9 @@ dependencies = [
[[package]] [[package]]
name = "lru" name = "lru"
version = "0.6.1" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be716eb6878ca2263eb5d00a781aa13264a794f519fe6af4fbb2668b2d5441c0" checksum = "3aae342b73d57ad0b8b364bd12584819f2c1fe9114285dfcf8b0722607671635"
dependencies = [ dependencies = [
"hashbrown", "hashbrown",
] ]
+118 -67
View File
@@ -94,7 +94,7 @@ impl Error {
} }
/// A wrapper type for delays. /// A wrapper type for delays.
#[derive(Debug, Decode, Encode, Eq)] #[derive(Clone, Debug, Decode, Encode, Eq)]
enum PruningDelay { enum PruningDelay {
/// This pruning should be triggered after this `Duration` from UNIX_EPOCH. /// This pruning should be triggered after this `Duration` from UNIX_EPOCH.
In(Duration), In(Duration),
@@ -315,13 +315,14 @@ impl PartialOrd for ChunkPruningRecord {
pub struct AvailabilityStoreSubsystem { pub struct AvailabilityStoreSubsystem {
pruning_config: PruningConfig, pruning_config: PruningConfig,
inner: Arc<dyn KeyValueDB>, inner: Arc<dyn KeyValueDB>,
chunks_cache: HashMap<CandidateHash, HashMap<u32, ErasureChunk>>,
metrics: Metrics, metrics: Metrics,
} }
impl AvailabilityStoreSubsystem { impl AvailabilityStoreSubsystem {
// Perform pruning of PoVs // Perform pruning of PoVs
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_povs(&self) -> Result<(), Error> { fn prune_povs(&mut self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_povs(); let _timer = self.metrics.time_prune_povs();
let mut tx = DBTransaction::new(); let mut tx = DBTransaction::new();
@@ -335,20 +336,22 @@ impl AvailabilityStoreSubsystem {
for record in pov_pruning.drain(..outdated_records_count) { for record in pov_pruning.drain(..outdated_records_count) {
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record"); tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
self.chunks_cache.remove(&record.candidate_hash);
tx.delete( tx.delete(
columns::DATA, columns::DATA,
available_data_key(&record.candidate_hash).as_slice(), available_data_key(&record.candidate_hash).as_slice(),
); );
} }
put_pov_pruning(&self.inner, Some(tx), pov_pruning)?; put_pov_pruning(&self.inner, Some(tx), pov_pruning, &self.metrics)?;
Ok(()) Ok(())
} }
// Perform pruning of chunks. // Perform pruning of chunks.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_chunks(&self) -> Result<(), Error> { fn prune_chunks(&mut self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_chunks(); let _timer = self.metrics.time_prune_chunks();
let mut tx = DBTransaction::new(); let mut tx = DBTransaction::new();
@@ -362,13 +365,15 @@ impl AvailabilityStoreSubsystem {
for record in chunk_pruning.drain(..outdated_records_count) { for record in chunk_pruning.drain(..outdated_records_count) {
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record"); tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
self.chunks_cache.remove(&record.candidate_hash);
tx.delete( tx.delete(
columns::DATA, columns::DATA,
erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(), erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(),
); );
} }
put_chunk_pruning(&self.inner, Some(tx), chunk_pruning)?; put_chunk_pruning(&self.inner, Some(tx), chunk_pruning, &self.metrics)?;
Ok(()) Ok(())
} }
@@ -468,6 +473,7 @@ impl AvailabilityStoreSubsystem {
Ok(Self { Ok(Self {
pruning_config: PruningConfig::default(), pruning_config: PruningConfig::default(),
inner: Arc::new(db), inner: Arc::new(db),
chunks_cache: HashMap::new(),
metrics, metrics,
}) })
} }
@@ -477,6 +483,7 @@ impl AvailabilityStoreSubsystem {
Self { Self {
pruning_config, pruning_config,
inner, inner,
chunks_cache: HashMap::new(),
metrics: Metrics(None), metrics: Metrics(None),
} }
} }
@@ -535,7 +542,7 @@ where
ActiveLeavesUpdate { activated, .. }) ActiveLeavesUpdate { activated, .. })
) => { ) => {
for (activated, _span) in activated.into_iter() { for (activated, _span) in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?; process_block_activated(ctx, subsystem, activated).await?;
} }
} }
FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
@@ -590,7 +597,7 @@ async fn process_block_finalized(
} }
} }
put_pov_pruning(db, None, pov_pruning)?; put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?;
} }
if let Some(mut chunk_pruning) = chunk_pruning(db) { if let Some(mut chunk_pruning) = chunk_pruning(db) {
@@ -609,23 +616,23 @@ async fn process_block_finalized(
} }
} }
put_chunk_pruning(db, None, chunk_pruning)?; put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?;
} }
Ok(()) Ok(())
} }
#[tracing::instrument(level = "trace", skip(ctx, db, metrics), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
async fn process_block_activated<Context>( async fn process_block_activated<Context>(
ctx: &mut Context, ctx: &mut Context,
db: &Arc<dyn KeyValueDB>, subsystem: &mut AvailabilityStoreSubsystem,
hash: Hash, hash: Hash,
metrics: &Metrics,
) -> Result<(), Error> ) -> Result<(), Error>
where where
Context: SubsystemContext<Message=AvailabilityStoreMessage> Context: SubsystemContext<Message=AvailabilityStoreMessage>
{ {
let _timer = metrics.time_block_activated(); let _timer = subsystem.metrics.time_block_activated();
let db = &subsystem.inner;
let events = match request_candidate_events(ctx, hash).await { let events = match request_candidate_events(ctx, hash).await {
Ok(events) => events, Ok(events) => events,
@@ -649,6 +656,10 @@ where
} }
} }
for included in &included {
subsystem.chunks_cache.remove(&included);
}
if let Some(mut pov_pruning) = pov_pruning(db) { if let Some(mut pov_pruning) = pov_pruning(db) {
for record in pov_pruning.iter_mut() { for record in pov_pruning.iter_mut() {
if included.contains(&record.candidate_hash) { if included.contains(&record.candidate_hash) {
@@ -659,7 +670,7 @@ where
pov_pruning.sort(); pov_pruning.sort();
put_pov_pruning(db, None, pov_pruning)?; put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?;
} }
if let Some(mut chunk_pruning) = chunk_pruning(db) { if let Some(mut chunk_pruning) = chunk_pruning(db) {
@@ -672,7 +683,7 @@ where
chunk_pruning.sort(); chunk_pruning.sort();
put_chunk_pruning(db, None, chunk_pruning)?; put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?;
} }
Ok(()) Ok(())
@@ -742,11 +753,11 @@ where
tx.send(result?).map_err(|_| oneshot::Canceled)?; tx.send(result?).map_err(|_| oneshot::Canceled)?;
} }
StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { StoreChunk { candidate_hash, relay_parent, chunk, tx } => {
let chunk_index = chunk.index; let chunk_index = chunk.index;
// Current block number is relay_parent block number + 1. // Current block number is relay_parent block number + 1.
let block_number = get_block_number(ctx, relay_parent).await? + 1; let block_number = get_block_number(ctx, relay_parent).await? + 1;
let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number); let result = store_chunks(subsystem, &candidate_hash, vec![chunk], block_number);
tracing::trace!( tracing::trace!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -802,14 +813,17 @@ fn chunk_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<ChunkPruningRecord>> {
query_inner(db, columns::META, &CHUNK_PRUNING_KEY) query_inner(db, columns::META, &CHUNK_PRUNING_KEY)
} }
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))]
fn put_pov_pruning( fn put_pov_pruning(
db: &Arc<dyn KeyValueDB>, db: &Arc<dyn KeyValueDB>,
tx: Option<DBTransaction>, tx: Option<DBTransaction>,
mut pov_pruning: Vec<PoVPruningRecord>, mut pov_pruning: Vec<PoVPruningRecord>,
metrics: &Metrics,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut tx = tx.unwrap_or_default(); let mut tx = tx.unwrap_or_default();
metrics.block_pruning_records_size(pov_pruning.len());
pov_pruning.sort(); pov_pruning.sort();
tx.put_vec( tx.put_vec(
@@ -843,14 +857,17 @@ fn put_pov_pruning(
Ok(()) Ok(())
} }
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))]
fn put_chunk_pruning( fn put_chunk_pruning(
db: &Arc<dyn KeyValueDB>, db: &Arc<dyn KeyValueDB>,
tx: Option<DBTransaction>, tx: Option<DBTransaction>,
mut chunk_pruning: Vec<ChunkPruningRecord>, mut chunk_pruning: Vec<ChunkPruningRecord>,
metrics: &Metrics,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut tx = tx.unwrap_or_default(); let mut tx = tx.unwrap_or_default();
metrics.chunk_pruning_records_size(chunk_pruning.len());
chunk_pruning.sort(); chunk_pruning.sort();
tx.put_vec( tx.put_vec(
@@ -910,16 +927,13 @@ fn store_available_data(
let block_number = available_data.validation_data.block_number; let block_number = available_data.validation_data.block_number;
if let Some(index) = id { let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?;
let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?; store_chunks(
store_chunk( subsystem,
subsystem, candidate_hash,
candidate_hash, chunks,
n_validators, block_number,
chunks[index as usize].clone(), )?;
block_number,
)?;
}
let stored_data = StoredAvailableData { let stored_data = StoredAvailableData {
data: available_data, data: available_data,
@@ -966,23 +980,19 @@ fn store_available_data(
} }
#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))]
fn store_chunk( fn store_chunks(
subsystem: &mut AvailabilityStoreSubsystem, subsystem: &mut AvailabilityStoreSubsystem,
candidate_hash: &CandidateHash, candidate_hash: &CandidateHash,
_n_validators: u32, chunks: Vec<ErasureChunk>,
chunk: ErasureChunk,
block_number: BlockNumber, block_number: BlockNumber,
) -> Result<(), Error> { ) -> Result<(), Error> {
let _timer = subsystem.metrics.time_store_chunk(); let _timer = subsystem.metrics.time_store_chunks();
let mut tx = DBTransaction::new(); let mut tx = DBTransaction::new();
let dbkey = erasure_chunk_key(candidate_hash, chunk.index);
let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default(); let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default();
let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;
if let Some(delay) = prune_at.as_duration() { let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;
if let Some(delay) = prune_at.clone().as_duration() {
tx.put_vec( tx.put_vec(
columns::META, columns::META,
&NEXT_CHUNK_PRUNING, &NEXT_CHUNK_PRUNING,
@@ -990,23 +1000,29 @@ fn store_chunk(
); );
} }
let pruning_record = ChunkPruningRecord { for chunk in chunks {
candidate_hash: candidate_hash.clone(), subsystem.chunks_cache.entry(*candidate_hash).or_default().insert(chunk.index, chunk.clone());
block_number,
candidate_state: CandidateState::Stored,
chunk_index: chunk.index,
prune_at,
};
let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx); let pruning_record = ChunkPruningRecord {
candidate_hash: candidate_hash.clone(),
block_number,
candidate_state: CandidateState::Stored,
chunk_index: chunk.index,
prune_at: prune_at.clone(),
};
chunk_pruning.insert(idx, pruning_record); let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);
tx.put_vec( chunk_pruning.insert(idx, pruning_record);
columns::DATA,
&dbkey, let dbkey = erasure_chunk_key(candidate_hash, chunk.index);
chunk.encode(),
); tx.put_vec(
columns::DATA,
&dbkey,
chunk.encode(),
);
}
tx.put_vec( tx.put_vec(
columns::META, columns::META,
@@ -1027,6 +1043,12 @@ fn get_chunk(
) -> Result<Option<ErasureChunk>, Error> { ) -> Result<Option<ErasureChunk>, Error> {
let _timer = subsystem.metrics.time_get_chunk(); let _timer = subsystem.metrics.time_get_chunk();
if let Some(entry) = subsystem.chunks_cache.get(candidate_hash) {
if let Some(chunk) = entry.get(&index) {
return Ok(Some(chunk.clone()));
}
}
if let Some(chunk) = query_inner( if let Some(chunk) = query_inner(
&subsystem.inner, &subsystem.inner,
columns::DATA, columns::DATA,
@@ -1036,17 +1058,14 @@ fn get_chunk(
} }
if let Some(data) = available_data(&subsystem.inner, candidate_hash) { if let Some(data) = available_data(&subsystem.inner, candidate_hash) {
let mut chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?; let chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?;
let desired_chunk = chunks.get(index as usize).cloned(); let desired_chunk = chunks.get(index as usize).cloned();
for chunk in chunks.drain(..) { store_chunks(
store_chunk( subsystem,
subsystem, candidate_hash,
candidate_hash, chunks,
data.n_validators, data.data.validation_data.block_number,
chunk, )?;
data.data.validation_data.block_number,
)?;
}
return Ok(desired_chunk); return Ok(desired_chunk);
} }
@@ -1109,13 +1128,15 @@ fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> R
#[derive(Clone)] #[derive(Clone)]
struct MetricsInner { struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>, received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
chunk_pruning_records_total: prometheus::Gauge<prometheus::U64>,
block_pruning_records_total: prometheus::Gauge<prometheus::U64>,
prune_povs: prometheus::Histogram, prune_povs: prometheus::Histogram,
prune_chunks: prometheus::Histogram, prune_chunks: prometheus::Histogram,
process_block_finalized: prometheus::Histogram, process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram, block_activated: prometheus::Histogram,
process_message: prometheus::Histogram, process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram, store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram, store_chunks: prometheus::Histogram,
get_chunk: prometheus::Histogram, get_chunk: prometheus::Histogram,
} }
@@ -1133,6 +1154,22 @@ impl Metrics {
} }
} }
fn chunk_pruning_records_size(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
let total = u64::try_from(count).unwrap_or_default();
metrics.chunk_pruning_records_total.set(total);
}
}
fn block_pruning_records_size(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
let total = u64::try_from(count).unwrap_or_default();
metrics.block_pruning_records_total.set(total);
}
}
/// Provide a timer for `prune_povs` which observes on drop. /// Provide a timer for `prune_povs` which observes on drop.
fn time_prune_povs(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> { fn time_prune_povs(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.prune_povs.start_timer()) self.0.as_ref().map(|metrics| metrics.prune_povs.start_timer())
@@ -1164,8 +1201,8 @@ impl Metrics {
} }
/// Provide a timer for `store_chunk` which observes on drop. /// Provide a timer for `store_chunk` which observes on drop.
fn time_store_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> { fn time_store_chunks(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) self.0.as_ref().map(|metrics| metrics.store_chunks.start_timer())
} }
/// Provide a timer for `get_chunk` which observes on drop. /// Provide a timer for `get_chunk` which observes on drop.
@@ -1184,6 +1221,20 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
chunk_pruning_records_total: prometheus::register(
prometheus::Gauge::new(
"parachain_chunk_pruning_records_total",
"Number of chunk pruning records kept by the storage.",
)?,
registry,
)?,
block_pruning_records_total: prometheus::register(
prometheus::Gauge::new(
"parachain_block_pruning_records_total",
"Number of block pruning records kept by the storage.",
)?,
registry,
)?,
prune_povs: prometheus::register( prune_povs: prometheus::register(
prometheus::Histogram::with_opts( prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new( prometheus::HistogramOpts::new(
@@ -1238,11 +1289,11 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
store_chunk: prometheus::register( store_chunks: prometheus::register(
prometheus::Histogram::with_opts( prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new( prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk", "parachain_av_store_store_chunks",
"Time spent within `av_store::store_chunk`", "Time spent within `av_store::store_chunks`",
) )
)?, )?,
registry, registry,
-3
View File
@@ -235,7 +235,6 @@ fn store_chunk_works() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk { let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash, candidate_hash,
relay_parent, relay_parent,
validator_index,
chunk: chunk.clone(), chunk: chunk.clone(),
tx, tx,
}; };
@@ -385,7 +384,6 @@ fn stored_but_not_included_chunk_is_pruned() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk { let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash, candidate_hash,
relay_parent, relay_parent,
validator_index,
chunk: chunk.clone(), chunk: chunk.clone(),
tx, tx,
}; };
@@ -589,7 +587,6 @@ fn stored_chunk_kept_until_finalized() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk { let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash, candidate_hash,
relay_parent, relay_parent,
validator_index,
chunk: chunk.clone(), chunk: chunk.clone(),
tx, tx,
}; };
@@ -1036,7 +1036,6 @@ where
AvailabilityStoreMessage::StoreChunk { AvailabilityStoreMessage::StoreChunk {
candidate_hash, candidate_hash,
relay_parent, relay_parent,
validator_index,
chunk: erasure_chunk, chunk: erasure_chunk,
tx, tx,
} }
-2
View File
@@ -309,8 +309,6 @@ pub enum AvailabilityStoreMessage {
candidate_hash: CandidateHash, candidate_hash: CandidateHash,
/// A relevant relay parent. /// A relevant relay parent.
relay_parent: Hash, relay_parent: Hash,
/// The index of the validator this chunk belongs to.
validator_index: ValidatorIndex,
/// The chunk itself. /// The chunk itself.
chunk: ErasureChunk, chunk: ErasureChunk,
/// Sending side of the channel to send result to. /// Sending side of the channel to send result to.
@@ -174,9 +174,9 @@ enum AvailabilityStoreMessage {
/// Query a specific availability chunk of the candidate's erasure-coding by validator index. /// Query a specific availability chunk of the candidate's erasure-coding by validator index.
/// Returns the chunk and its inclusion proof against the candidate's erasure-root. /// Returns the chunk and its inclusion proof against the candidate's erasure-root.
QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<AvailabilityChunkAndProof>>), QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<AvailabilityChunkAndProof>>),
/// Store a specific chunk of the candidate's erasure-coding by validator index, with an /// Store a specific chunk of the candidate's erasure-coding, with an
/// accompanying proof. /// accompanying proof.
StoreChunk(CandidateHash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel<Result<()>>), StoreChunk(CandidateHash, AvailabilityChunkAndProof, ResponseChannel<Result<()>>),
/// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's /// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's
/// `AvailabilityChunkAndProof`. /// `AvailabilityChunkAndProof`.
StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, ResponseChannel<Result<()>>), StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, ResponseChannel<Result<()>>),