Fix a storage leak in parachains db (#5594)

* Fix cleanup of old votes.

* Cleanup.

* Get rid of redundant import

* Tests + logging

* Fix db key name.

* Add some reasoning to batch size.

* Add dispute data to indexed columns

* Fix fmt

* Add helper function.

* Fix typos.

* Update node/core/dispute-coordinator/src/db/v1.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/core/dispute-coordinator/src/db/v1.rs

Co-authored-by: Andronik <write@reusable.software>

* Add metric for how long cleanup takes.

Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Robert Klotzner
2022-06-13 15:38:42 +02:00
committed by GitHub
parent 215ae1f134
commit c3c10ce4c4
7 changed files with 253 additions and 54 deletions
@@ -149,13 +149,6 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
self.candidate_votes.insert((session, candidate_hash), Some(votes));
}
/// Prepare a deletion of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn delete_candidate_votes(&mut self, session: SessionIndex, candidate_hash: CandidateHash) {
self.candidate_votes.insert((session, candidate_hash), None);
}
/// Transform this backend into a set of write-ops to be written to the inner backend.
pub fn into_write_ops(self) -> impl Iterator<Item = BackendWriteOp> {
let earliest_session_ops = self
@@ -30,22 +30,80 @@ use parity_scale_codec::{Decode, Encode};
use crate::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
error::{FatalError, FatalResult},
metrics::Metrics,
status::DisputeStatus,
DISPUTE_WINDOW,
DISPUTE_WINDOW, LOG_TARGET,
};
const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
/// Until what session have votes been cleaned up already?
const CLEANED_VOTES_WATERMARK_KEY: &[u8; 23] = b"cleaned-votes-watermark";
/// Restrict number of cleanup operations.
///
/// On the first run we are starting at session 0 going up all the way to the current session -
/// this should not be done at once, but rather in smaller batches so nodes won't get stalled by
/// this.
///
/// 300 is with session duration of 1 hour and 30 parachains around <3_000_000 key purges in the worst
/// case. Which is already quite a lot, at the same time we have around 21_000 sessions on
/// Kusama. This means at 300 purged sessions per session, cleaning everything up will take
/// around 3 days. Depending on how severe disk usage becomes, we might want to bump the batch
/// size, at the cost of risking issues at session boundaries (performance).
#[cfg(test)]
const MAX_CLEAN_BATCH_SIZE: u32 = 10;
#[cfg(not(test))]
const MAX_CLEAN_BATCH_SIZE: u32 = 300;
pub struct DbBackend {
inner: Arc<dyn Database>,
config: ColumnConfiguration,
metrics: Metrics,
}
impl DbBackend {
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration) -> Self {
Self { inner: db, config }
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration, metrics: Metrics) -> Self {
Self { inner: db, config, metrics }
}
/// Cleanup old votes.
///
/// Should be called whenever a new earliest session gets written.
fn add_vote_cleanup_tx(
&mut self,
tx: &mut DBTransaction,
earliest_session: SessionIndex,
) -> FatalResult<()> {
// Cleanup old votes in db:
let watermark = load_cleaned_votes_watermark(&*self.inner, &self.config)?.unwrap_or(0);
let clean_until = if earliest_session.saturating_sub(watermark) > MAX_CLEAN_BATCH_SIZE {
watermark + MAX_CLEAN_BATCH_SIZE
} else {
earliest_session
};
gum::trace!(
target: LOG_TARGET,
?watermark,
?clean_until,
?earliest_session,
?MAX_CLEAN_BATCH_SIZE,
"WriteEarliestSession"
);
for index in watermark..clean_until {
gum::trace!(
target: LOG_TARGET,
?index,
encoded = ?candidate_votes_session_prefix(index),
"Cleaning votes for session index"
);
tx.delete_prefix(self.config.col_data, &candidate_votes_session_prefix(index));
}
// New watermark:
tx.put_vec(self.config.col_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode());
Ok(())
}
}
@@ -71,20 +129,32 @@ impl Backend for DbBackend {
/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
///
/// This also takes care of purging old votes (of obsolete sessions).
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>,
{
let mut tx = DBTransaction::new();
// Make sure the whole process is timed, including the actual transaction flush:
let mut cleanup_timer = None;
for op in ops {
match op {
BackendWriteOp::WriteEarliestSession(session) => {
cleanup_timer = match cleanup_timer.take() {
None => Some(self.metrics.time_vote_cleanup()),
Some(t) => Some(t),
};
self.add_vote_cleanup_tx(&mut tx, session)?;
// Actually write the earliest session.
tx.put_vec(self.config.col_data, EARLIEST_SESSION_KEY, session.encode());
},
BackendWriteOp::WriteRecentDisputes(recent_disputes) => {
tx.put_vec(self.config.col_data, RECENT_DISPUTES_KEY, recent_disputes.encode());
},
BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => {
gum::trace!(target: LOG_TARGET, ?session, "Writing candidate votes");
tx.put_vec(
self.config.col_data,
&candidate_votes_key(session, &candidate_hash),
@@ -112,6 +182,15 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) ->
buf
}
fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] {
let mut buf = [0u8; 15 + 4];
buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
// big-endian encoding is used to ensure lexicographic ordering.
buf[15..][..4].copy_from_slice(&session.to_be_bytes());
buf
}
/// Column configuration information for the DB.
#[derive(Debug, Clone)]
pub struct ColumnConfiguration {
@@ -244,9 +323,7 @@ pub(crate) fn note_current_session(
if pruned_disputes.len() != 0 {
overlay_db.write_recent_disputes(new_recent_disputes);
for ((session, candidate_hash), _) in pruned_disputes {
overlay_db.delete_candidate_votes(session, candidate_hash);
}
// Note: Deleting old candidate votes is handled in `write` based on the earliest session.
}
}
},
@@ -258,18 +335,114 @@ pub(crate) fn note_current_session(
Ok(())
}
/// Until what session votes have been cleaned up already.
///
/// That is the db has already been purged of votes for sessions older than the returned
/// `SessionIndex`.
fn load_cleaned_votes_watermark(
db: &dyn Database,
config: &ColumnConfiguration,
) -> FatalResult<Option<SessionIndex>> {
load_decode(db, config.col_data, CLEANED_VOTES_WATERMARK_KEY)
.map_err(|e| FatalError::DbReadFailed(e))
}
#[cfg(test)]
mod tests {
use super::*;
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use polkadot_primitives::v2::{Hash, Id as ParaId};
fn make_db() -> DbBackend {
let db = kvdb_memorydb::create(1);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
let store = Arc::new(db);
let config = ColumnConfiguration { col_data: 0 };
DbBackend::new(store, config)
DbBackend::new(store, config, Metrics::default())
}
#[test]
fn max_clean_batch_size_is_honored() {
let mut backend = make_db();
let mut overlay_db = OverlayedBackend::new(&backend);
let current_session = MAX_CLEAN_BATCH_SIZE + DISPUTE_WINDOW.get() + 3;
let earliest_session = current_session - DISPUTE_WINDOW.get();
overlay_db.write_earliest_session(0);
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
for session in 0..current_session + 1 {
overlay_db.write_candidate_votes(
session,
candidate_hash,
CandidateVotes {
candidate_receipt: dummy_candidate_receipt(dummy_hash()),
valid: Vec::new(),
invalid: Vec::new(),
},
);
}
assert!(overlay_db.load_candidate_votes(0, &candidate_hash).unwrap().is_some());
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash)
.unwrap()
.is_some());
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash)
.unwrap()
.is_some());
// Cleanup only works for votes that have been written already - so write.
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
let mut overlay_db = OverlayedBackend::new(&backend);
gum::trace!(target: LOG_TARGET, ?current_session, "Noting current session");
note_current_session(&mut overlay_db, current_session).unwrap();
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
let mut overlay_db = OverlayedBackend::new(&backend);
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash)
.unwrap()
.is_none());
// After batch size votes should still be there:
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash)
.unwrap()
.is_some());
let current_session = current_session + 1;
let earliest_session = earliest_session + 1;
note_current_session(&mut overlay_db, current_session).unwrap();
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
let overlay_db = OverlayedBackend::new(&backend);
// All should be gone now:
assert!(overlay_db
.load_candidate_votes(earliest_session - 1, &candidate_hash)
.unwrap()
.is_none());
// Earliest session should still be there:
assert!(overlay_db
.load_candidate_votes(earliest_session, &candidate_hash)
.unwrap()
.is_some());
// Old current session should still be there as well:
assert!(overlay_db
.load_candidate_votes(current_session - 1, &candidate_hash)
.unwrap()
.is_some());
}
#[test]
@@ -368,13 +541,24 @@ mod tests {
let mut backend = make_db();
let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: dummy_candidate_receipt(dummy_hash()),
candidate_receipt: dummy_candidate_receipt(Hash::random()),
valid: Vec::new(),
invalid: Vec::new(),
},
);
let receipt = dummy_candidate_receipt(dummy_hash());
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
},
@@ -388,37 +572,9 @@ mod tests {
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.unwrap()
.candidate_receipt
.descriptor
.para_id,
ParaId::from(1),
.candidate_receipt,
receipt,
);
let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = dummy_candidate_receipt(dummy_hash());
receipt.descriptor.para_id = ParaId::from(5_u32);
receipt
},
valid: Vec::new(),
invalid: Vec::new(),
},
);
overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
assert!(backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.is_none());
}
#[test]
@@ -434,6 +590,7 @@ mod tests {
let new_earliest_session = 5;
let current_session = 5 + DISPUTE_WINDOW.get();
let super_old_no_dispute = 1;
let very_old = 3;
let slightly_old = 4;
let very_recent = current_session - 1;
@@ -457,6 +614,7 @@ mod tests {
.collect(),
);
overlay_db.write_candidate_votes(super_old_no_dispute, hash_a, blank_candidate_votes());
overlay_db.write_candidate_votes(very_old, hash_a, blank_candidate_votes());
overlay_db.write_candidate_votes(slightly_old, hash_b, blank_candidate_votes());
@@ -483,6 +641,16 @@ mod tests {
.collect(),
);
// Votes are only cleaned up after actual write:
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
let overlay_db = OverlayedBackend::new(&backend);
assert!(overlay_db
.load_candidate_votes(super_old_no_dispute, &hash_a)
.unwrap()
.is_none());
assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none());
assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none());
assert!(overlay_db
@@ -20,7 +20,7 @@ use futures::channel::oneshot;
use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError};
use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime};
use crate::{participation, LOG_TARGET};
use crate::{db, participation, LOG_TARGET};
use parity_scale_codec::Error as CodecError;
pub type Result<T> = std::result::Result<T, Error>;
@@ -53,6 +53,10 @@ pub enum Error {
#[error("Writing to database failed: {0}")]
DbWriteFailed(std::io::Error),
#[fatal]
#[error("Reading from database failed: {0}")]
DbReadFailed(db::v1::Error),
#[fatal]
#[error("Oneshot for receiving block number from chain API got cancelled")]
CanceledBlockNumber,
@@ -127,7 +127,11 @@ impl Config {
impl<Context: Send> DisputeCoordinatorSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async {
let backend = DbBackend::new(self.store.clone(), self.config.column_config());
let backend = DbBackend::new(
self.store.clone(),
self.config.column_config(),
self.metrics.clone(),
);
self.run(ctx, backend, Box::new(SystemClock))
.await
.map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
@@ -26,6 +26,8 @@ struct MetricsInner {
concluded: prometheus::CounterVec<prometheus::U64>,
/// Number of participations that have been queued.
queued_participations: prometheus::CounterVec<prometheus::U64>,
/// How long vote cleanup batches take.
vote_cleanup_time: prometheus::Histogram,
}
/// Candidate validation metrics.
@@ -74,6 +76,10 @@ impl Metrics {
metrics.queued_participations.with_label_values(&["best-effort"]).inc();
}
}
pub(crate) fn time_vote_cleanup(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.vote_cleanup_time.start_timer())
}
}
impl metrics::Metrics for Metrics {
@@ -116,6 +122,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
vote_cleanup_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_coordinator_vote_cleanup",
"Time spent cleaning up old votes per batch.",
)
.buckets([0.01, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0].into()),
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
@@ -438,7 +438,8 @@ impl TestState {
self.subsystem_keystore.clone(),
Metrics::default(),
);
let backend = DbBackend::new(self.db.clone(), self.config.column_config());
let backend =
DbBackend::new(self.db.clone(), self.config.column_config(), Metrics::default());
let subsystem_task = subsystem.run(ctx, backend, Box::new(self.clock.clone()));
let test_task = test(self, ctx_handle);
@@ -2150,7 +2151,11 @@ fn negative_issue_local_statement_only_triggers_import() {
})
.await;
let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config());
let backend = DbBackend::new(
test_state.db.clone(),
test_state.config.column_config(),
Metrics::default(),
);
let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap();
assert_eq!(votes.invalid.len(), 1);
@@ -2198,7 +2203,11 @@ fn empty_import_still_writes_candidate_receipt() {
rx.await.unwrap();
let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config());
let backend = DbBackend::new(
test_state.db.clone(),
test_state.config.column_config(),
Metrics::default(),
);
let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap();
assert_eq!(votes.invalid.len(), 0);
@@ -2264,7 +2273,11 @@ fn redundant_votes_ignored() {
rx.await.unwrap();
let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config());
let backend = DbBackend::new(
test_state.db.clone(),
test_state.config.column_config(),
Metrics::default(),
);
let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap();
assert_eq!(votes.invalid.len(), 0);
@@ -33,7 +33,8 @@ pub(crate) mod columns {
pub const COL_APPROVAL_DATA: u32 = 2;
pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
pub const ORDERED_COL: &[u32] = &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA];
pub const ORDERED_COL: &[u32] =
&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
}
/// Columns used by different subsystems.