Prevent possible rocksdb-corruption by running benchmark-storage (#11040)

* Prevent possible rocksdb-corruption by running benchmark-storage

Also adds a `sanitize_key` function to strip path-prefixes from the db-keys (for databases that
use prefixed keys such as rocksdb)

fixes #10998

* Fix @cheme 's annotations.

* Update utils/frame/benchmarking-cli/src/storage/write.rs

Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Make logic match the name of bool flag `invert_inserts`

* Remove unused lifetime

Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
This commit is contained in:
Falco Hirschenberger
2022-03-16 17:05:34 +01:00
committed by GitHub
parent 800cc1d4be
commit 7a54efd1f2
4 changed files with 46 additions and 35 deletions
+3 -5
View File
@@ -107,7 +107,8 @@ pub type DbState<B> =
sp_state_machine::TrieBackend<Arc<dyn sp_state_machine::Storage<HashFor<B>>>, HashFor<B>>;
/// Length of a [`DbHash`].
pub const DB_HASH_LEN: usize = 32;
const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
pub type DbHash = sp_core::H256;
@@ -1351,10 +1352,7 @@ impl<Block: BlockT> Backend<Block> {
let mut removal: u64 = 0;
let mut bytes_removal: u64 = 0;
for (mut key, (val, rc)) in operation.db_updates.drain() {
if !self.storage.prefix_keys {
// Strip prefix
key.drain(0..key.len() - DB_HASH_LEN);
};
self.storage.db.sanitize_key(&mut key);
if rc > 0 {
ops += 1;
bytes += key.len() as u64 + val.len() as u64;
+4
View File
@@ -112,4 +112,8 @@ impl<H: Clone + AsRef<[u8]>> Database<H> for DbAdapter {
fn supports_ref_counting(&self) -> bool {
true
}
fn sanitize_key(&self, key: &mut Vec<u8>) {
let _prefix = key.drain(0..key.len() - crate::DB_HASH_LEN);
}
}
+5
View File
@@ -110,6 +110,11 @@ pub trait Database<H: Clone + AsRef<[u8]>>: Send + Sync {
fn supports_ref_counting(&self) -> bool {
false
}
/// Remove a possible path-prefix from the key.
///
/// Not all database implementations use a prefix for keys, so this function may be a noop.
fn sanitize_key(&self, _key: &mut Vec<u8>) {}
}
impl<H> std::fmt::Debug for dyn Database<H> {
@@ -17,7 +17,7 @@
use sc_cli::Result;
use sc_client_api::UsageProvider;
use sc_client_db::{DbHash, DbState, DB_HASH_LEN};
use sc_client_db::{DbHash, DbState};
use sp_api::StateBackend;
use sp_blockchain::HeaderBackend;
use sp_database::{ColumnId, Transaction};
@@ -27,7 +27,7 @@ use sp_runtime::{
};
use sp_trie::PrefixedMemoryDB;
use log::info;
use log::{info, trace};
use rand::prelude::*;
use std::{fmt::Debug, sync::Arc, time::Instant};
@@ -50,7 +50,6 @@ impl StorageCmd {
// Store the time that it took to write each value.
let mut record = BenchRecord::default();
let supports_rc = db.supports_ref_counting();
let block = BlockId::Number(client.usage_info().chain.best_number);
let header = client.header(block)?.ok_or("Header not found")?;
let original_root = *header.state_root();
@@ -62,15 +61,34 @@ impl StorageCmd {
let mut rng = Self::setup_rng();
kvs.shuffle(&mut rng);
// Generate all random values first; Make sure there are no collisions with existing
// db entries, so we can rollback all additions without corrupting existing entries.
for (k, original_v) in kvs.iter_mut() {
'retry: loop {
let mut new_v = vec![0; original_v.len()];
// Create a random value to overwrite with.
// NOTE: We use a possibly higher entropy than the original value,
// could be improved but acts as an over-estimation which is fine for now.
rng.fill_bytes(&mut new_v[..]);
let new_kv = vec![(k.as_ref(), Some(new_v.as_ref()))];
let (_, mut stx) = trie.storage_root(new_kv.iter().cloned(), self.state_version());
for (mut k, (_, rc)) in stx.drain().into_iter() {
if rc > 0 {
db.sanitize_key(&mut k);
if db.get(state_col, &k).is_some() {
trace!("Benchmark-store key creation: Key collision detected, retry");
continue 'retry
}
}
}
*original_v = new_v;
break
}
}
info!("Writing {} keys", kvs.len());
// Write each value in one commit.
for (k, original_v) in kvs.iter() {
// Create a random value to overwrite with.
// NOTE: We use a possibly higher entropy than the original value,
// could be improved but acts as an over-estimation which is fine for now.
let mut new_v = vec![0; original_v.len()];
rng.fill_bytes(&mut new_v[..]);
for (k, new_v) in kvs.iter() {
// Interesting part here:
let start = Instant::now();
// Create a TX that will modify the Trie in the DB and
@@ -78,12 +96,12 @@ impl StorageCmd {
let replace = vec![(k.as_ref(), Some(new_v.as_ref()))];
let (_, stx) = trie.storage_root(replace.iter().cloned(), self.state_version());
// Only the keep the insertions, since we do not want to benchmark pruning.
let tx = convert_tx::<Block>(stx.clone(), true, state_col, supports_rc);
let tx = convert_tx::<Block>(db.clone(), stx.clone(), false, state_col);
db.commit(tx).map_err(|e| format!("Writing to the Database: {}", e))?;
record.append(new_v.len(), start.elapsed())?;
// Now undo the changes by removing what was added.
let tx = convert_tx::<Block>(stx.clone(), false, state_col, supports_rc);
let tx = convert_tx::<Block>(db.clone(), stx.clone(), true, state_col);
db.commit(tx).map_err(|e| format!("Writing to the Database: {}", e))?;
}
Ok(record)
@@ -93,35 +111,21 @@ impl StorageCmd {
/// Converts a Trie transaction into a DB transaction.
/// Removals are ignored and will not be included in the final tx.
/// `invert_inserts` replaces all inserts with removals.
///
/// The keys of Trie transactions are prefixed, this is treated differently by each DB.
/// ParityDB can use an optimization where only the last `DB_HASH_LEN` byte are needed.
/// The last `DB_HASH_LEN` byte are the hash of the actual stored data, everything
/// before that is the route in the Patricia Trie.
/// RocksDB cannot do this and needs the whole route, hence no key truncating for RocksDB.
///
/// TODO:
/// This copies logic from [`sp_client_db::Backend::try_commit_operation`] and should be
/// refactored to use a canonical `sanitize_key` function from `sp_client_db` which
/// does not yet exist.
fn convert_tx<B: BlockT>(
db: Arc<dyn sp_database::Database<DbHash>>,
mut tx: PrefixedMemoryDB<HashFor<B>>,
invert_inserts: bool,
col: ColumnId,
supports_rc: bool,
) -> Transaction<DbHash> {
let mut ret = Transaction::<DbHash>::default();
for (mut k, (v, rc)) in tx.drain().into_iter() {
if supports_rc {
let _prefix = k.drain(0..k.len() - DB_HASH_LEN);
}
if rc > 0 {
db.sanitize_key(&mut k);
if invert_inserts {
ret.set(col, k.as_ref(), &v);
} else {
ret.remove(col, &k);
} else {
ret.set(col, &k, &v);
}
}
// < 0 means removal - ignored.