New database trait (#5549)

* Introduce trait

* The trait

* Generic

* Basic impls.

* Remove unneeded bounds

* Minor changes

* Switch over to the new DB trait

* Integrated parity-db and added CLI for db selection

* Default impl.

* Fix logs.

* Started integrating subdb

* Apply suggestions from code review

Co-Authored-By: Cecile Tonglet <cecile@parity.io>

* Apply suggestions from code review

Co-Authored-By: Nikolay Volf <nikvolf@gmail.com>

* Enable subdb

* Bump parity-db

* Fixed CLI macro

* Fixed browser build

* Fixed features

* Sort out features

* Use parity-db from crates.io

* Typo

Co-authored-by: arkpar <arkady.paronyan@gmail.com>
Co-authored-by: Cecile Tonglet <cecile@parity.io>
Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
This commit is contained in:
Gavin Wood
2020-04-15 14:38:39 +02:00
committed by GitHub
parent 3426d662f7
commit 91af5b6fcc
43 changed files with 1036 additions and 579 deletions
+24 -28
View File
@@ -18,17 +18,17 @@
use std::sync::Arc;
use kvdb::{KeyValueDB, DBTransaction};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use codec::{Encode, Decode};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use crate::utils::{self, db_err, meta_keys};
use sp_database::{Database, Transaction};
use crate::utils::{self, meta_keys};
use crate::cache::{CacheItemT, ComplexBlockId};
use crate::cache::list_cache::{CommitOperation, Fork};
use crate::cache::list_entry::{Entry, StorageEntry};
use crate::DbHash;
/// Single list-cache metadata.
#[derive(Debug)]
@@ -97,19 +97,19 @@ pub struct DbColumns {
pub struct DbStorage {
name: Vec<u8>,
meta_key: Vec<u8>,
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
columns: DbColumns,
}
impl DbStorage {
/// Create new database-backed list cache storage.
pub fn new(name: Vec<u8>, db: Arc<dyn KeyValueDB>, columns: DbColumns) -> Self {
pub fn new(name: Vec<u8>, db: Arc<dyn Database<DbHash>>, columns: DbColumns) -> Self {
let meta_key = meta::key(&name);
DbStorage { name, meta_key, db, columns }
}
/// Get reference to the database.
pub fn db(&self) -> &Arc<dyn KeyValueDB> { &self.db }
pub fn db(&self) -> &Arc<dyn Database<DbHash>> { &self.db }
/// Get reference to the database columns.
pub fn columns(&self) -> &DbColumns { &self.columns }
@@ -135,49 +135,45 @@ impl<Block: BlockT, T: CacheItemT> Storage<Block, T> for DbStorage {
}
fn read_meta(&self) -> ClientResult<Metadata<Block>> {
self.db.get(self.columns.meta, &self.meta_key)
.map_err(db_err)
.and_then(|meta| match meta {
Some(meta) => meta::decode(&*meta),
None => Ok(Metadata {
finalized: None,
unfinalized: Vec::new(),
}),
match self.db.get(self.columns.meta, &self.meta_key) {
Some(meta) => meta::decode(&*meta),
None => Ok(Metadata {
finalized: None,
unfinalized: Vec::new(),
})
}
}
fn read_entry(&self, at: &ComplexBlockId<Block>) -> ClientResult<Option<StorageEntry<Block, T>>> {
self.db.get(self.columns.cache, &self.encode_block_id(at))
.map_err(db_err)
.and_then(|entry| match entry {
Some(entry) => StorageEntry::<Block, T>::decode(&mut &entry[..])
.map_err(|_| ClientError::Backend("Failed to decode cache entry".into()))
.map(Some),
None => Ok(None),
})
match self.db.get(self.columns.cache, &self.encode_block_id(at)) {
Some(entry) => StorageEntry::<Block, T>::decode(&mut &entry[..])
.map_err(|_| ClientError::Backend("Failed to decode cache entry".into()))
.map(Some),
None => Ok(None),
}
}
}
/// Database-backed list cache storage transaction.
pub struct DbStorageTransaction<'a> {
storage: &'a DbStorage,
tx: &'a mut DBTransaction,
tx: &'a mut Transaction<DbHash>,
}
impl<'a> DbStorageTransaction<'a> {
/// Create new database transaction.
pub fn new(storage: &'a DbStorage, tx: &'a mut DBTransaction) -> Self {
pub fn new(storage: &'a DbStorage, tx: &'a mut Transaction<DbHash>) -> Self {
DbStorageTransaction { storage, tx }
}
}
impl<'a, Block: BlockT, T: CacheItemT> StorageTransaction<Block, T> for DbStorageTransaction<'a> {
fn insert_storage_entry(&mut self, at: &ComplexBlockId<Block>, entry: &StorageEntry<Block, T>) {
self.tx.put(self.storage.columns.cache, &self.storage.encode_block_id(at), &entry.encode());
self.tx.set_from_vec(self.storage.columns.cache, &self.storage.encode_block_id(at), entry.encode());
}
fn remove_storage_entry(&mut self, at: &ComplexBlockId<Block>) {
self.tx.delete(self.storage.columns.cache, &self.storage.encode_block_id(at));
self.tx.remove(self.storage.columns.cache, &self.storage.encode_block_id(at));
}
fn update_meta(
@@ -186,10 +182,10 @@ impl<'a, Block: BlockT, T: CacheItemT> StorageTransaction<Block, T> for DbStorag
unfinalized: &[Fork<Block, T>],
operation: &CommitOperation<Block, T>,
) {
self.tx.put(
self.tx.set_from_vec(
self.storage.columns.meta,
&self.storage.meta_key,
&meta::encode(best_finalized_entry, unfinalized, operation));
meta::encode(best_finalized_entry, unfinalized, operation));
}
}
+11 -11
View File
@@ -19,14 +19,14 @@
use std::{sync::Arc, collections::{HashMap, hash_map::Entry}};
use parking_lot::RwLock;
use kvdb::{KeyValueDB, DBTransaction};
use sc_client_api::blockchain::{well_known_cache_keys::{self, Id as CacheKeyId}, Cache as BlockchainCache};
use sp_blockchain::Result as ClientResult;
use sp_database::{Database, Transaction};
use codec::{Encode, Decode};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
use crate::utils::{self, COLUMN_META, db_err};
use crate::utils::{self, COLUMN_META};
use crate::DbHash;
use self::list_cache::{ListCache, PruningStrategy};
@@ -78,7 +78,7 @@ impl<T> CacheItemT for T where T: Clone + Decode + Encode + PartialEq {}
/// Database-backed blockchain data cache.
pub struct DbCache<Block: BlockT> {
cache_at: HashMap<CacheKeyId, ListCache<Block, Vec<u8>, self::list_storage::DbStorage>>,
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
key_lookup_column: u32,
header_column: u32,
cache_column: u32,
@@ -89,7 +89,7 @@ pub struct DbCache<Block: BlockT> {
impl<Block: BlockT> DbCache<Block> {
/// Create new cache.
pub fn new(
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
key_lookup_column: u32,
header_column: u32,
cache_column: u32,
@@ -113,7 +113,7 @@ impl<Block: BlockT> DbCache<Block> {
}
/// Begin cache transaction.
pub fn transaction<'a>(&'a mut self, tx: &'a mut DBTransaction) -> DbCacheTransaction<'a, Block> {
pub fn transaction<'a>(&'a mut self, tx: &'a mut Transaction<DbHash>) -> DbCacheTransaction<'a, Block> {
DbCacheTransaction {
cache: self,
tx,
@@ -125,7 +125,7 @@ impl<Block: BlockT> DbCache<Block> {
/// Begin cache transaction with given ops.
pub fn transaction_with_ops<'a>(
&'a mut self,
tx: &'a mut DBTransaction,
tx: &'a mut Transaction<DbHash>,
ops: DbCacheTransactionOps<Block>,
) -> DbCacheTransaction<'a, Block> {
DbCacheTransaction {
@@ -169,7 +169,7 @@ impl<Block: BlockT> DbCache<Block> {
fn get_cache_helper<'a, Block: BlockT>(
cache_at: &'a mut HashMap<CacheKeyId, ListCache<Block, Vec<u8>, self::list_storage::DbStorage>>,
name: CacheKeyId,
db: &Arc<dyn KeyValueDB>,
db: &Arc<dyn Database<DbHash>>,
key_lookup: u32,
header: u32,
cache: u32,
@@ -215,7 +215,7 @@ impl<Block: BlockT> DbCacheTransactionOps<Block> {
/// Database-backed blockchain data cache transaction valid for single block import.
pub struct DbCacheTransaction<'a, Block: BlockT> {
cache: &'a mut DbCache<Block>,
tx: &'a mut DBTransaction,
tx: &'a mut Transaction<DbHash>,
cache_at_ops: HashMap<CacheKeyId, self::list_cache::CommitOperations<Block, Vec<u8>>>,
best_finalized_block: Option<ComplexBlockId<Block>>,
}
@@ -328,7 +328,7 @@ impl<Block: BlockT> BlockchainCache<Block> for DbCacheSync<Block> {
let genesis_hash = cache.genesis_hash;
let cache_contents = vec![(*key, data)].into_iter().collect();
let db = cache.db.clone();
let mut dbtx = DBTransaction::new();
let mut dbtx = Transaction::new();
let tx = cache.transaction(&mut dbtx);
let tx = tx.on_block_insert(
ComplexBlockId::new(Default::default(), Zero::zero()),
@@ -337,7 +337,7 @@ impl<Block: BlockT> BlockchainCache<Block> for DbCacheSync<Block> {
EntryType::Genesis,
)?;
let tx_ops = tx.into_ops();
db.write(dbtx).map_err(db_err)?;
db.commit(dbtx);
cache.commit(tx_ops)?;
Ok(())
}
@@ -19,7 +19,6 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use hash_db::Prefix;
use kvdb::{KeyValueDB, DBTransaction};
use codec::{Decode, Encode};
use parking_lot::RwLock;
use sp_blockchain::{Error as ClientError, Result as ClientResult};
@@ -27,12 +26,14 @@ use sp_trie::MemoryDB;
use sc_client_api::backend::PrunableStateChangesTrieStorage;
use sp_blockchain::{well_known_cache_keys, Cache as BlockchainCache};
use sp_core::{ChangesTrieConfiguration, ChangesTrieConfigurationRange, convert_hash};
use sp_database::Transaction;
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, HashFor, NumberFor, One, Zero, CheckedSub,
};
use sp_runtime::generic::{BlockId, DigestItem, ChangesTrieSignal};
use sp_state_machine::{DBValue, ChangesTrieBuildCache, ChangesTrieCacheAction};
use crate::utils::{self, Meta, meta_keys, db_err};
use sp_state_machine::{ChangesTrieBuildCache, ChangesTrieCacheAction};
use crate::{Database, DbHash};
use crate::utils::{self, Meta, meta_keys};
use crate::cache::{
DbCacheSync, DbCache, DbCacheTransactionOps,
ComplexBlockId, EntryType as CacheEntryType,
@@ -76,7 +77,7 @@ impl<Block: BlockT> From<DbCacheTransactionOps<Block>> for DbChangesTrieStorageT
/// Stores all tries in separate DB column.
/// Lock order: meta, tries_meta, cache, build_cache.
pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
meta_column: u32,
changes_tries_column: u32,
key_lookup_column: u32,
@@ -111,7 +112,7 @@ struct ChangesTriesMeta<Block: BlockT> {
impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// Create new changes trie storage.
pub fn new(
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
meta_column: u32,
changes_tries_column: u32,
key_lookup_column: u32,
@@ -149,7 +150,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// Commit new changes trie.
pub fn commit(
&self,
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
mut changes_trie: MemoryDB<HashFor<Block>>,
parent_block: ComplexBlockId<Block>,
block: ComplexBlockId<Block>,
@@ -160,7 +161,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
) -> ClientResult<DbChangesTrieStorageTransaction<Block>> {
// insert changes trie, associated with block, into DB
for (key, (val, _)) in changes_trie.drain() {
tx.put(self.changes_tries_column, key.as_ref(), &val);
tx.set(self.changes_tries_column, key.as_ref(), &val);
}
// if configuration has not been changed AND block is not finalized => nothing to do here
@@ -205,7 +206,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// Called when block is finalized.
pub fn finalize(
&self,
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
parent_block_hash: Block::Hash,
block_hash: Block::Hash,
block_num: NumberFor<Block>,
@@ -254,7 +255,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// When block is reverted.
pub fn revert(
&self,
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
block: &ComplexBlockId<Block>,
) -> ClientResult<DbChangesTrieStorageTransaction<Block>> {
Ok(self.cache.0.write().transaction(tx)
@@ -280,7 +281,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// Prune obsolete changes tries.
fn prune(
&self,
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
block_hash: Block::Hash,
block_num: NumberFor<Block>,
new_header: Option<&Block::Header>,
@@ -313,7 +314,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
hash: convert_hash(&block_hash),
number: block_num,
},
|node| tx.delete(self.changes_tries_column, node.as_ref()),
|node| tx.remove(self.changes_tries_column, node.as_ref()),
);
next_digest_range_start = end + One::one();
@@ -486,18 +487,17 @@ where
self.build_cache.read().with_changed_keys(root, functor)
}
fn get(&self, key: &Block::Hash, _prefix: Prefix) -> Result<Option<DBValue>, String> {
self.db.get(self.changes_tries_column, key.as_ref())
.map_err(|err| format!("{}", err))
fn get(&self, key: &Block::Hash, _prefix: Prefix) -> Result<Option<Vec<u8>>, String> {
Ok(self.db.get(self.changes_tries_column, key.as_ref()))
}
}
/// Read changes tries metadata from database.
fn read_tries_meta<Block: BlockT>(
db: &dyn KeyValueDB,
db: &dyn Database<DbHash>,
meta_column: u32,
) -> ClientResult<ChangesTriesMeta<Block>> {
match db.get(meta_column, meta_keys::CHANGES_TRIES_META).map_err(db_err)? {
match db.get(meta_column, meta_keys::CHANGES_TRIES_META) {
Some(h) => match Decode::decode(&mut &h[..]) {
Ok(h) => Ok(h),
Err(err) => Err(ClientError::Backend(format!("Error decoding changes tries metadata: {}", err))),
@@ -511,11 +511,11 @@ fn read_tries_meta<Block: BlockT>(
/// Write changes tries metadata from database.
fn write_tries_meta<Block: BlockT>(
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
meta_column: u32,
meta: &ChangesTriesMeta<Block>,
) {
tx.put(meta_column, meta_keys::CHANGES_TRIES_META, &meta.encode());
tx.set_from_vec(meta_column, meta_keys::CHANGES_TRIES_META, meta.encode());
}
#[cfg(test)]
@@ -707,7 +707,7 @@ mod tests {
let finalize_block = |number| {
let header = backend.blockchain().header(BlockId::Number(number)).unwrap().unwrap();
let mut tx = DBTransaction::new();
let mut tx = Transaction::new();
let cache_ops = backend.changes_tries_storage.finalize(
&mut tx,
*header.parent_hash(),
@@ -716,7 +716,7 @@ mod tests {
None,
None,
).unwrap();
backend.storage.db.write(tx).unwrap();
backend.storage.db.commit(tx);
backend.changes_tries_storage.post_commit(Some(cache_ops));
};
+17 -18
View File
@@ -16,23 +16,21 @@
//! Functionality for reading and storing children hashes from db.
use kvdb::{KeyValueDB, DBTransaction};
use codec::{Encode, Decode};
use sp_blockchain;
use std::hash::Hash;
use sp_database::{Database, Transaction};
use crate::DbHash;
/// Returns the hashes of the children blocks of the block with `parent_hash`.
pub fn read_children<
K: Eq + Hash + Clone + Encode + Decode,
V: Eq + Hash + Clone + Encode + Decode,
>(db: &dyn KeyValueDB, column: u32, prefix: &[u8], parent_hash: K) -> sp_blockchain::Result<Vec<V>> {
>(db: &dyn Database<DbHash>, column: u32, prefix: &[u8], parent_hash: K) -> sp_blockchain::Result<Vec<V>> {
let mut buf = prefix.to_vec();
parent_hash.using_encoded(|s| buf.extend(s));
let raw_val_opt = match db.get(column, &buf[..]) {
Ok(raw_val_opt) => raw_val_opt,
Err(_) => return Err(sp_blockchain::Error::Backend("Error reading value from database".into())),
};
let raw_val_opt = db.get(column, &buf[..]);
let raw_val = match raw_val_opt {
Some(val) => val,
@@ -53,7 +51,7 @@ pub fn write_children<
K: Eq + Hash + Clone + Encode + Decode,
V: Eq + Hash + Clone + Encode + Decode,
>(
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
column: u32,
prefix: &[u8],
parent_hash: K,
@@ -61,34 +59,35 @@ pub fn write_children<
) {
let mut key = prefix.to_vec();
parent_hash.using_encoded(|s| key.extend(s));
tx.put_vec(column, &key[..], children_hashes.encode());
tx.set_from_vec(column, &key[..], children_hashes.encode());
}
/// Prepare transaction to remove the children of `parent_hash`.
pub fn remove_children<
K: Eq + Hash + Clone + Encode + Decode,
>(
tx: &mut DBTransaction,
tx: &mut Transaction<DbHash>,
column: u32,
prefix: &[u8],
parent_hash: K,
) {
let mut key = prefix.to_vec();
parent_hash.using_encoded(|s| key.extend(s));
tx.delete(column, &key[..]);
tx.remove(column, &key);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn children_write_read_remove() {
const PREFIX: &[u8] = b"children";
let db = ::kvdb_memorydb::create(1);
let db = Arc::new(sp_database::MemDb::default());
let mut tx = DBTransaction::new();
let mut tx = Transaction::new();
let mut children1 = Vec::new();
children1.push(1_3);
@@ -100,19 +99,19 @@ mod tests {
children2.push(1_6);
write_children(&mut tx, 0, PREFIX, 1_2, children2);
db.write(tx.clone()).expect("(2) Committing transaction failed");
db.commit(tx.clone());
let r1: Vec<u32> = read_children(&db, 0, PREFIX, 1_1).expect("(1) Getting r1 failed");
let r2: Vec<u32> = read_children(&db, 0, PREFIX, 1_2).expect("(1) Getting r2 failed");
let r1: Vec<u32> = read_children(&*db, 0, PREFIX, 1_1).expect("(1) Getting r1 failed");
let r2: Vec<u32> = read_children(&*db, 0, PREFIX, 1_2).expect("(1) Getting r2 failed");
assert_eq!(r1, vec![1_3, 1_5]);
assert_eq!(r2, vec![1_4, 1_6]);
remove_children(&mut tx, 0, PREFIX, 1_2);
db.write(tx).expect("(2) Committing transaction failed");
db.commit(tx);
let r1: Vec<u32> = read_children(&db, 0, PREFIX, 1_1).expect("(2) Getting r1 failed");
let r2: Vec<u32> = read_children(&db, 0, PREFIX, 1_2).expect("(2) Getting r2 failed");
let r1: Vec<u32> = read_children(&*db, 0, PREFIX, 1_1).expect("(2) Getting r1 failed");
let r2: Vec<u32> = read_children(&*db, 0, PREFIX, 1_2).expect("(2) Getting r2 failed");
assert_eq!(r1, vec![1_3, 1_5]);
assert_eq!(r2.len(), 0);
+112 -162
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Client backend that uses RocksDB database as storage.
//! Client backend that is backed by a database.
//!
//! # Canonicality vs. Finality
//!
@@ -40,9 +40,13 @@ mod storage_cache;
mod upgrade;
mod utils;
mod stats;
#[cfg(feature = "parity-db")]
mod parity_db;
#[cfg(feature = "subdb")]
mod subdb;
use std::sync::Arc;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::io;
use std::collections::HashMap;
@@ -57,8 +61,8 @@ use sp_blockchain::{
};
use codec::{Decode, Encode};
use hash_db::Prefix;
use kvdb::{KeyValueDB, DBTransaction};
use sp_trie::{MemoryDB, PrefixedMemoryDB, prefixed_key};
use sp_database::Transaction;
use parking_lot::RwLock;
use sp_core::{ChangesTrieConfiguration, traits::CodeExecutor};
use sp_core::storage::{well_known_keys, ChildInfo};
@@ -75,7 +79,7 @@ use sp_state_machine::{
StorageCollection, ChildStorageCollection,
backend::Backend as StateBackend, StateMachineStats,
};
use crate::utils::{DatabaseType, Meta, db_err, meta_keys, read_db, read_meta};
use crate::utils::{DatabaseType, Meta, meta_keys, read_db, read_meta};
use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTransaction};
use sc_client::leaves::{LeafSet, FinalizationDisplaced};
use sc_state_db::StateDb;
@@ -83,15 +87,15 @@ use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache};
use crate::storage_cache::{CachingState, SyncingCachingState, SharedCache, new_shared_cache};
use crate::stats::StateUsageStats;
use log::{trace, debug, warn};
pub use sc_state_db::PruningMode;
use prometheus_endpoint::Registry;
// Re-export the Database trait so that one can pass an implementation of it.
pub use sp_database::Database;
pub use sc_state_db::PruningMode;
#[cfg(any(feature = "kvdb-rocksdb", test))]
pub use bench::BenchmarkingState;
#[cfg(feature = "test-helpers")]
use sc_client::in_mem::Backend as InMemoryBackend;
const CANONICALIZATION_DELAY: u64 = 4096;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768;
@@ -103,8 +107,8 @@ pub type DbState<B> = sp_state_machine::TrieBackend<
Arc<dyn sp_state_machine::Storage<HashFor<B>>>, HashFor<B>
>;
/// Re-export the KVDB trait so that one can pass an implementation of it.
pub use kvdb;
/// Hash type that this backend uses for the database.
pub type DbHash = [u8; 32];
/// A reference tracking state.
///
@@ -279,17 +283,42 @@ pub struct DatabaseSettings {
}
/// Where to find the database..
#[derive(Clone)]
pub enum DatabaseSettingsSrc {
/// Load a database from a given path. Recommended for most uses.
Path {
/// Load a RocksDB database from a given path. Recommended for most uses.
RocksDb {
/// Path to the database.
path: PathBuf,
/// Cache size in MiB.
cache_size: usize,
},
/// Load a ParityDb database from a given path.
ParityDb {
/// Path to the database.
path: PathBuf,
},
/// Load a Subdb database from a given path.
SubDb {
/// Path to the database.
path: PathBuf,
},
/// Use a custom already-open database.
Custom(Arc<dyn KeyValueDB>),
Custom(Arc<dyn Database<DbHash>>),
}
impl DatabaseSettingsSrc {
/// Return dabase path for databases that are on the disk.
pub fn path(&self) -> Option<&Path> {
match self {
DatabaseSettingsSrc::RocksDb { path, .. } => Some(path.as_path()),
DatabaseSettingsSrc::ParityDb { path, .. } => Some(path.as_path()),
DatabaseSettingsSrc::SubDb { path, .. } => Some(path.as_path()),
DatabaseSettingsSrc::Custom(_) => None,
}
}
}
/// Create an instance of db-backed client.
@@ -357,26 +386,26 @@ struct PendingBlock<Block: BlockT> {
}
// wrapper that implements trait required for state_db
struct StateMetaDb<'a>(&'a dyn KeyValueDB);
struct StateMetaDb<'a>(&'a dyn Database<DbHash>);
impl<'a> sc_state_db::MetaDb for StateMetaDb<'a> {
type Error = io::Error;
fn get_meta(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.get(columns::STATE_META, key).map(|r| r.map(|v| v.to_vec()))
Ok(self.0.get(columns::STATE_META, key))
}
}
/// Block database
pub struct BlockchainDb<Block: BlockT> {
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
header_metadata_cache: HeaderMetadataCache<Block>,
}
impl<Block: BlockT> BlockchainDb<Block> {
fn new(db: Arc<dyn KeyValueDB>) -> ClientResult<Self> {
fn new(db: Arc<dyn Database<DbHash>>) -> ClientResult<Self> {
let meta = read_meta::<Block>(&*db, columns::HEADER)?;
let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
Ok(BlockchainDb {
@@ -547,11 +576,11 @@ pub struct BlockImportOperation<Block: BlockT> {
}
impl<Block: BlockT> BlockImportOperation<Block> {
fn apply_aux(&mut self, transaction: &mut DBTransaction) {
fn apply_aux(&mut self, transaction: &mut Transaction<DbHash>) {
for (key, maybe_val) in self.aux_ops.drain(..) {
match maybe_val {
Some(val) => transaction.put_vec(columns::AUX, &key, val),
None => transaction.delete(columns::AUX, &key),
Some(val) => transaction.set_from_vec(columns::AUX, &key, val),
None => transaction.remove(columns::AUX, &key),
}
}
}
@@ -676,7 +705,7 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc
}
struct StorageDb<Block: BlockT> {
pub db: Arc<dyn KeyValueDB>,
pub db: Arc<dyn Database<DbHash>>,
pub state_db: StateDb<Block::Hash, Vec<u8>>,
}
@@ -693,7 +722,7 @@ impl<Block: BlockT> sc_state_db::NodeDb for StorageDb<Block> {
type Key = [u8];
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.db.get(columns::STATE, key).map(|r| r.map(|v| v.to_vec()))
Ok(self.db.get(columns::STATE, key))
}
}
@@ -776,13 +805,14 @@ impl<Block: BlockT> Backend<Block> {
/// The pruning window is how old a block must be before the state is pruned.
pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> ClientResult<Self> {
let db = crate::utils::open_database::<Block>(&config, DatabaseType::Full)?;
Self::from_kvdb(db as Arc<_>, canonicalization_delay, &config)
Self::from_database(db as Arc<_>, canonicalization_delay, &config)
}
/// Create new memory-backed client backend for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self {
let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS));
let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
let db = sp_database::as_database(db);
let db_setting = DatabaseSettings {
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
@@ -793,8 +823,8 @@ impl<Block: BlockT> Backend<Block> {
Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
}
fn from_kvdb(
db: Arc<dyn KeyValueDB>,
fn from_database(
db: Arc<dyn Database<DbHash>>,
canonicalization_delay: u64,
config: &DatabaseSettings,
) -> ClientResult<Self> {
@@ -843,61 +873,6 @@ impl<Block: BlockT> Backend<Block> {
})
}
/// Returns in-memory blockchain that contains the same set of blocks as self.
#[cfg(feature = "test-helpers")]
pub fn as_in_memory(&self) -> InMemoryBackend<Block> {
use sc_client_api::backend::{Backend as ClientBackend, BlockImportOperation};
use sc_client::blockchain::Backend as BlockchainBackend;
let inmem = InMemoryBackend::<Block>::new();
// get all headers hashes && sort them by number (could be duplicate)
let mut headers: Vec<(NumberFor<Block>, Block::Hash, Block::Header)> = Vec::new();
for (_, header) in self.blockchain.db.iter(columns::HEADER) {
let header = Block::Header::decode(&mut &header[..]).unwrap();
let hash = header.hash();
let number = *header.number();
let pos = headers.binary_search_by(|item| item.0.cmp(&number));
match pos {
Ok(pos) => headers.insert(pos, (number, hash, header)),
Err(pos) => headers.insert(pos, (number, hash, header)),
}
}
// insert all other headers + bodies + justifications
let info = self.blockchain.info();
for (number, hash, header) in headers {
let id = BlockId::Hash(hash);
let justification = self.blockchain.justification(id).unwrap();
let body = self.blockchain.body(id).unwrap();
let state = self.state_at(id).unwrap().pairs();
let new_block_state = if number.is_zero() {
NewBlockState::Final
} else if hash == info.best_hash {
NewBlockState::Best
} else {
NewBlockState::Normal
};
let mut op = inmem.begin_operation().unwrap();
op.set_block_data(header, body, justification, new_block_state).unwrap();
op.update_db_storage(vec![(None, state.into_iter().map(|(k, v)| (k, Some(v))).collect())])
.unwrap();
inmem.commit_operation(op).unwrap();
}
// and now finalize the best block we have
inmem.finalize_block(BlockId::Hash(info.finalized_hash), None).unwrap();
inmem
}
/// Returns total number of blocks (headers) in the block DB.
#[cfg(feature = "test-helpers")]
pub fn blocks_count(&self) -> u64 {
self.blockchain.db.iter(columns::HEADER).count() as u64
}
/// Handle setting head within a transaction. `route_to` should be the last
/// block that existed in the database. `best_to` should be the best block
/// to be set.
@@ -907,7 +882,7 @@ impl<Block: BlockT> Backend<Block> {
/// to be best, `route_to` should equal to `best_to`.
fn set_head_with_transaction(
&self,
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
route_to: Block::Hash,
best_to: (NumberFor<Block>, Block::Hash),
) -> ClientResult<(Vec<Block::Hash>, Vec<Block::Hash>)> {
@@ -957,7 +932,7 @@ impl<Block: BlockT> Backend<Block> {
}
let lookup_key = utils::number_and_hash_to_lookup_key(best_to.0, &best_to.1)?;
transaction.put(columns::META, meta_keys::BEST_BLOCK, &lookup_key);
transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, lookup_key);
utils::insert_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
@@ -984,7 +959,7 @@ impl<Block: BlockT> Backend<Block> {
fn finalize_block_with_transaction(
&self,
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
hash: &Block::Hash,
header: &Block::Header,
last_finalized: Option<Block::Hash>,
@@ -1005,10 +980,10 @@ impl<Block: BlockT> Backend<Block> {
)?;
if let Some(justification) = justification {
transaction.put(
transaction.set_from_vec(
columns::JUSTIFICATION,
&utils::number_and_hash_to_lookup_key(number, hash)?,
&justification.encode(),
justification.encode(),
);
}
Ok((*hash, number, false, true))
@@ -1017,7 +992,7 @@ impl<Block: BlockT> Backend<Block> {
// performs forced canonicalization with a delay after importing a non-finalized block.
fn force_delayed_canonicalize(
&self,
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
hash: Block::Hash,
number: NumberFor<Block>,
)
@@ -1051,7 +1026,7 @@ impl<Block: BlockT> Backend<Block> {
fn try_commit_operation(&self, mut operation: BlockImportOperation<Block>)
-> ClientResult<()>
{
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
let mut finalization_displaced_leaves = None;
operation.apply_aux(&mut transaction);
@@ -1103,17 +1078,17 @@ impl<Block: BlockT> Backend<Block> {
header_metadata,
);
transaction.put(columns::HEADER, &lookup_key, &pending_block.header.encode());
transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
if let Some(body) = &pending_block.body {
transaction.put(columns::BODY, &lookup_key, &body.encode());
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
}
if let Some(justification) = pending_block.justification {
transaction.put(columns::JUSTIFICATION, &lookup_key, &justification.encode());
transaction.set_from_vec(columns::JUSTIFICATION, &lookup_key, justification.encode());
}
if number.is_zero() {
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
transaction.set(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
// for tests, because config is set from within the reset_storage
if operation.changes_trie_config_update.is_none() {
@@ -1260,31 +1235,17 @@ impl<Block: BlockT> Backend<Block> {
None
};
let write_result = self.storage.db.write(transaction).map_err(db_err);
self.storage.db.commit(transaction);
if let Some((
number,
hash,
enacted,
retracted,
displaced_leaf,
_displaced_leaf,
is_best,
mut cache,
)) = imported {
if let Err(e) = write_result {
let mut leaves = self.blockchain.leaves.write();
let mut undo = leaves.undo();
if let Some(displaced_leaf) = displaced_leaf {
undo.undo_import(displaced_leaf);
}
if let Some(finalization_displaced) = finalization_displaced_leaves {
undo.undo_finalization(finalization_displaced);
}
return Err(e)
}
cache.sync_cache(
&enacted,
&retracted,
@@ -1317,7 +1278,7 @@ impl<Block: BlockT> Backend<Block> {
// was not a child of the last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
is_inserted: bool,
f_header: &Block::Header,
f_hash: Block::Hash,
@@ -1328,7 +1289,7 @@ impl<Block: BlockT> Backend<Block> {
if self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::<u64>() > c).unwrap_or(true) {
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?;
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
let commit = self.storage.state_db.canonicalize_block(&f_hash)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?;
@@ -1357,18 +1318,18 @@ impl<Block: BlockT> Backend<Block> {
}
}
fn apply_state_commit(transaction: &mut DBTransaction, commit: sc_state_db::CommitSet<Vec<u8>>) {
fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db::CommitSet<Vec<u8>>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
transaction.set_from_vec(columns::STATE, &key[..], val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
transaction.remove(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
transaction.set_from_vec(columns::STATE_META, &key[..], val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
transaction.remove(columns::STATE_META, &key[..]);
}
}
@@ -1380,19 +1341,19 @@ impl<Block> sc_client_api::backend::AuxStore for Backend<Block> where Block: Blo
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> ClientResult<()> {
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
transaction.set(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
transaction.remove(columns::AUX, k);
}
self.storage.db.write(transaction).map_err(db_err)?;
self.storage.db.commit(transaction);
Ok(())
}
fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
Ok(self.storage.db.get(columns::AUX, key))
}
}
@@ -1453,36 +1414,24 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
fn finalize_block(&self, block: BlockId<Block>, justification: Option<Justification>)
-> ClientResult<()>
{
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
let hash = self.blockchain.expect_block_hash_from_id(&block)?;
let header = self.blockchain.expect_header(block)?;
let mut displaced = None;
let commit = |displaced| {
let mut changes_trie_cache_ops = None;
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
&mut changes_trie_cache_ops,
displaced,
)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
self.changes_tries_storage.post_commit(changes_trie_cache_ops);
Ok(())
};
match commit(&mut displaced) {
Ok(()) => self.storage.state_db.apply_pending(),
e @ Err(_) => {
self.storage.state_db.revert_pending();
if let Some(displaced) = displaced {
self.blockchain.leaves.write().undo().undo_finalization(displaced);
}
return e;
}
}
let mut changes_trie_cache_ops = None;
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
&mut changes_trie_cache_ops,
&mut displaced,
)?;
self.storage.db.commit(transaction);
self.blockchain.update_meta(hash, number, is_best, is_finalized);
self.changes_tries_storage.post_commit(changes_trie_cache_ops);
Ok(())
}
@@ -1497,11 +1446,12 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
fn usage_info(&self) -> Option<UsageInfo> {
let (io_stats, state_stats) = self.io_stats.take_or_else(||
(
self.storage.db.io_stats(kvdb::IoStatsKind::SincePrevious),
// TODO: implement DB stats and cache size retrieval
kvdb::IoStats::empty(),
self.state_usage.take(),
)
);
let database_cache = MemorySize::from_bytes(parity_util_mem::malloc_size(&*self.storage.db));
let database_cache = MemorySize::from_bytes(0);
let state_cache = MemorySize::from_bytes(
(*&self.shared_cache).lock().used_storage_cache_size(),
);
@@ -1547,7 +1497,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
if best_number.is_zero() {
return Ok(c.saturated_into::<NumberFor<Block>>())
}
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
match self.storage.state_db.revert_one() {
Some(commit) => {
apply_state_commit(&mut transaction, commit);
@@ -1571,13 +1521,13 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
removed_number,
),
)?;
transaction.put(columns::META, meta_keys::BEST_BLOCK, &key);
if update_finalized {
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &key);
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, key.clone());
}
transaction.delete(columns::KEY_LOOKUP, removed.hash().as_ref());
transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, key);
transaction.remove(columns::KEY_LOOKUP, removed.hash().as_ref());
children::remove_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, best_hash);
self.storage.db.write(transaction).map_err(db_err)?;
self.storage.db.commit(transaction);
self.changes_tries_storage.post_commit(Some(changes_trie_cache_ops));
self.blockchain.update_meta(best_hash, best_number, true, update_finalized);
}
@@ -1591,12 +1541,12 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
let reverted = revert_blocks()?;
let revert_leaves = || -> ClientResult<()> {
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
let mut leaves = self.blockchain.leaves.write();
leaves.revert(best_hash, best_number);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
self.storage.db.write(transaction).map_err(db_err)?;
self.storage.db.commit(transaction);
Ok(())
};
@@ -1959,7 +1909,7 @@ pub(crate) mod tests {
assert_eq!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap().unwrap(), &b"hello"[..]);
).unwrap(), &b"hello"[..]);
hash
};
@@ -1996,7 +1946,7 @@ pub(crate) mod tests {
assert_eq!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap().unwrap(), &b"hello"[..]);
).unwrap(), &b"hello"[..]);
hash
};
@@ -2034,7 +1984,7 @@ pub(crate) mod tests {
assert!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap().is_some());
).is_some());
hash
};
@@ -2068,7 +2018,7 @@ pub(crate) mod tests {
assert!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap().is_none());
).is_none());
}
backend.finalize_block(BlockId::Number(1), None).unwrap();
@@ -2077,7 +2027,7 @@ pub(crate) mod tests {
assert!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap().is_none());
).is_none());
}
#[test]
+58 -56
View File
@@ -20,8 +20,6 @@ use std::{sync::Arc, collections::HashMap};
use std::convert::TryInto;
use parking_lot::RwLock;
use kvdb::{KeyValueDB, DBTransaction};
use sc_client_api::{backend::{AuxStore, NewBlockState}, UsageInfo};
use sc_client::blockchain::{
BlockStatus, Cache as BlockchainCache,Info as BlockchainInfo,
@@ -33,13 +31,14 @@ use sp_blockchain::{
HeaderBackend as BlockchainHeaderBackend,
well_known_cache_keys,
};
use sp_database::{Database, Transaction};
use sc_client::light::blockchain::Storage as LightBlockchainStorage;
use codec::{Decode, Encode};
use sp_runtime::generic::{DigestItem, BlockId};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero, One, NumberFor, HashFor};
use crate::cache::{DbCacheSync, DbCache, ComplexBlockId, EntryType as CacheEntryType};
use crate::utils::{self, meta_keys, DatabaseType, Meta, db_err, read_db, block_id_to_lookup_key, read_meta};
use crate::{DatabaseSettings, FrozenForDuration};
use crate::utils::{self, meta_keys, DatabaseType, Meta, read_db, block_id_to_lookup_key, read_meta};
use crate::{DatabaseSettings, FrozenForDuration, DbHash};
use log::{trace, warn, debug};
pub(crate) mod columns {
@@ -59,7 +58,7 @@ const CHANGES_TRIE_CHT_PREFIX: u8 = 1;
/// Light blockchain storage. Stores most recent headers + CHTs for older headers.
/// Locks order: meta, cache.
pub struct LightStorage<Block: BlockT> {
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
meta: RwLock<Meta<NumberFor<Block>, Block::Hash>>,
cache: Arc<DbCacheSync<Block>>,
header_metadata_cache: HeaderMetadataCache<Block>,
@@ -78,14 +77,11 @@ impl<Block: BlockT> LightStorage<Block> {
/// Create new memory-backed `LightStorage` for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test() -> Self {
use utils::NUM_COLUMNS;
let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS));
let db = Arc::new(sp_database::MemDb::default());
Self::from_kvdb(db as Arc<_>).expect("failed to create test-db")
}
fn from_kvdb(db: Arc<dyn KeyValueDB>) -> ClientResult<Self> {
fn from_kvdb(db: Arc<dyn Database<DbHash>>) -> ClientResult<Self> {
let meta = read_meta::<Block>(&*db, columns::HEADER)?;
let cache = DbCache::new(
db.clone(),
@@ -230,7 +226,7 @@ impl<Block: BlockT> LightStorage<Block> {
/// to be best, `route_to` should equal to `best_to`.
fn set_head_with_transaction(
&self,
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
route_to: Block::Hash,
best_to: (NumberFor<Block>, Block::Hash),
) -> ClientResult<()> {
@@ -266,7 +262,7 @@ impl<Block: BlockT> LightStorage<Block> {
}
}
transaction.put(columns::META, meta_keys::BEST_BLOCK, &lookup_key);
transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, lookup_key);
utils::insert_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
@@ -280,7 +276,7 @@ impl<Block: BlockT> LightStorage<Block> {
// Note that a block is finalized. Only call with child of last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
header: &Block::Header,
hash: Block::Hash,
) -> ClientResult<()> {
@@ -293,7 +289,7 @@ impl<Block: BlockT> LightStorage<Block> {
}
let lookup_key = utils::number_and_hash_to_lookup_key(header.number().clone(), hash)?;
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
// build new CHT(s) if required
if let Some(new_cht_number) = cht::is_build_required(cht::size(), *header.number()) {
@@ -309,7 +305,7 @@ impl<Block: BlockT> LightStorage<Block> {
let new_header_cht_root = cht::compute_root::<Block::Header, HashFor<Block>, _>(
cht::size(), new_cht_number, cht_range.map(|num| self.hash(num))
)?;
transaction.put(
transaction.set(
columns::CHT,
&cht_key(HEADER_CHT_PREFIX, new_cht_start)?,
new_header_cht_root.as_ref()
@@ -327,7 +323,7 @@ impl<Block: BlockT> LightStorage<Block> {
cht::size(), new_cht_number, cht_range
.map(|num| self.changes_trie_root(BlockId::Number(num)))
)?;
transaction.put(
transaction.set(
columns::CHT,
&cht_key(CHANGES_TRIE_CHT_PREFIX, new_cht_start)?,
new_changes_trie_cht_root.as_ref()
@@ -350,7 +346,7 @@ impl<Block: BlockT> LightStorage<Block> {
prune_block,
hash
)?;
transaction.delete(columns::HEADER, &lookup_key);
transaction.remove(columns::HEADER, &lookup_key);
}
prune_block += One::one();
}
@@ -377,7 +373,7 @@ impl<Block: BlockT> LightStorage<Block> {
}
let cht_start = cht::start_number(cht_size, cht_number);
self.db.get(columns::CHT, &cht_key(cht_type, cht_start)?).map_err(db_err)?
self.db.get(columns::CHT, &cht_key(cht_type, cht_start)?)
.ok_or_else(no_cht_for_block)
.and_then(|hash| Block::Hash::decode(&mut &*hash).map_err(|_| no_cht_for_block()))
.map(Some)
@@ -394,18 +390,19 @@ impl<Block> AuxStore for LightStorage<Block>
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> ClientResult<()> {
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
transaction.set(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
transaction.remove(columns::AUX, k);
}
self.db.write(transaction).map_err(db_err)
self.db.commit(transaction);
Ok(())
}
fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
self.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)
Ok(self.db.get(columns::AUX, key))
}
}
@@ -419,7 +416,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
leaf_state: NewBlockState,
aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
) -> ClientResult<()> {
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
let hash = header.hash();
let number = *header.number();
@@ -427,8 +424,8 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
for (key, maybe_val) in aux_ops {
match maybe_val {
Some(val) => transaction.put_vec(columns::AUX, &key, val),
None => transaction.delete(columns::AUX, &key),
Some(val) => transaction.set_from_vec(columns::AUX, &key, val),
None => transaction.remove(columns::AUX, &key),
}
}
@@ -445,7 +442,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
number,
hash,
)?;
transaction.put(columns::HEADER, &lookup_key, &header.encode());
transaction.set_from_vec(columns::HEADER, &lookup_key, header.encode());
let header_metadata = CachedHeaderMetadata::from(&header);
self.header_metadata_cache.insert_header_metadata(
@@ -456,7 +453,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
let is_genesis = number.is_zero();
if is_genesis {
self.cache.0.write().set_genesis_hash(hash);
transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
transaction.set(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
}
let finalized = match leaf_state {
@@ -493,7 +490,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
debug!("Light DB Commit {:?} ({})", hash, number);
self.db.write(transaction).map_err(db_err)?;
self.db.commit(transaction);
cache.commit(cache_ops)
.expect("only fails if cache with given name isn't loaded yet;\
cache is already loaded because there are cache_ops; qed");
@@ -509,9 +506,9 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
let hash = header.hash();
let number = header.number();
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
self.set_head_with_transaction(&mut transaction, hash.clone(), (number.clone(), hash.clone()))?;
self.db.write(transaction).map_err(db_err)?;
self.db.commit(transaction);
self.update_meta(hash, header.number().clone(), true, false);
Ok(())
} else {
@@ -537,7 +534,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
fn finalize_header(&self, id: BlockId<Block>) -> ClientResult<()> {
if let Some(header) = self.header(id)? {
let mut transaction = DBTransaction::new();
let mut transaction = Transaction::new();
let hash = header.hash();
let number = *header.number();
self.note_finalized(&mut transaction, &header, hash.clone())?;
@@ -550,7 +547,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
)?
.into_ops();
self.db.write(transaction).map_err(db_err)?;
self.db.commit(transaction);
cache.commit(cache_ops)
.expect("only fails if cache with given name isn't loaded yet;\
cache is already loaded because there are cache_ops; qed");
@@ -575,8 +572,9 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
fn usage_info(&self) -> Option<UsageInfo> {
use sc_client_api::{MemoryInfo, IoInfo, MemorySize};
let database_cache = MemorySize::from_bytes(parity_util_mem::malloc_size(&*self.db));
let io_stats = self.io_stats.take_or_else(|| self.db.io_stats(kvdb::IoStatsKind::SincePrevious));
// TODO: reimplement IO stats
let database_cache = MemorySize::from_bytes(0);
let io_stats = self.io_stats.take_or_else(|| kvdb::IoStats::empty());
Some(UsageInfo {
memory: MemoryInfo {
@@ -732,21 +730,25 @@ pub(crate) mod tests {
#[test]
fn import_header_works() {
let db = LightStorage::new_test();
let raw_db = Arc::new(sp_database::MemDb::default());
let db = LightStorage::from_kvdb(raw_db.clone()).unwrap();
let genesis_hash = insert_block(&db, HashMap::new(), || default_header(&Default::default(), 0));
assert_eq!(db.db.iter(columns::HEADER).count(), 1);
assert_eq!(db.db.iter(columns::KEY_LOOKUP).count(), 2);
assert_eq!(raw_db.count(columns::HEADER), 1);
assert_eq!(raw_db.count(columns::KEY_LOOKUP), 2);
let _ = insert_block(&db, HashMap::new(), || default_header(&genesis_hash, 1));
assert_eq!(db.db.iter(columns::HEADER).count(), 2);
assert_eq!(db.db.iter(columns::KEY_LOOKUP).count(), 4);
assert_eq!(raw_db.count(columns::HEADER), 2);
assert_eq!(raw_db.count(columns::KEY_LOOKUP), 4);
}
#[test]
fn finalized_ancient_headers_are_replaced_with_cht() {
fn insert_headers<F: Fn(&Hash, u64) -> Header>(header_producer: F) -> LightStorage<Block> {
let db = LightStorage::new_test();
fn insert_headers<F: Fn(&Hash, u64) -> Header>(header_producer: F) ->
(Arc<sp_database::MemDb<DbHash>>, LightStorage<Block>)
{
let raw_db = Arc::new(sp_database::MemDb::default());
let db = LightStorage::from_kvdb(raw_db.clone()).unwrap();
let cht_size: u64 = cht::size();
let ucht_size: usize = cht_size as _;
@@ -758,8 +760,8 @@ pub(crate) mod tests {
for number in 0..cht::size() {
prev_hash = insert_block(&db, HashMap::new(), || header_producer(&prev_hash, 1 + number));
}
assert_eq!(db.db.iter(columns::HEADER).count(), 1 + ucht_size);
assert_eq!(db.db.iter(columns::CHT).count(), 0);
assert_eq!(raw_db.count(columns::HEADER), 1 + ucht_size);
assert_eq!(raw_db.count(columns::CHT), 0);
// insert next SIZE blocks && ensure that nothing is pruned
for number in 0..(cht_size as _) {
@@ -769,8 +771,8 @@ pub(crate) mod tests {
|| header_producer(&prev_hash, 1 + cht_size + number),
);
}
assert_eq!(db.db.iter(columns::HEADER).count(), 1 + ucht_size + ucht_size);
assert_eq!(db.db.iter(columns::CHT).count(), 0);
assert_eq!(raw_db.count(columns::HEADER), 1 + ucht_size + ucht_size);
assert_eq!(raw_db.count(columns::CHT), 0);
// insert block #{2 * cht::size() + 1} && check that new CHT is created + headers of this CHT are pruned
// nothing is yet finalized, so nothing is pruned.
@@ -779,23 +781,23 @@ pub(crate) mod tests {
HashMap::new(),
|| header_producer(&prev_hash, 1 + cht_size + cht_size),
);
assert_eq!(db.db.iter(columns::HEADER).count(), 2 + ucht_size + ucht_size);
assert_eq!(db.db.iter(columns::CHT).count(), 0);
assert_eq!(raw_db.count(columns::HEADER), 2 + ucht_size + ucht_size);
assert_eq!(raw_db.count(columns::CHT), 0);
// now finalize the block.
for i in (0..(ucht_size + ucht_size)).map(|i| i + 1) {
db.finalize_header(BlockId::Number(i as _)).unwrap();
}
db.finalize_header(BlockId::Hash(prev_hash)).unwrap();
db
(raw_db, db)
}
// when headers are created without changes tries roots
let db = insert_headers(default_header);
let (raw_db, db) = insert_headers(default_header);
let cht_size: u64 = cht::size();
assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht_size + 1) as usize);
assert_eq!(db.db.iter(columns::KEY_LOOKUP).count(), (2 * (1 + cht_size + 1)) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 1);
assert_eq!(raw_db.count(columns::HEADER), (1 + cht_size + 1) as usize);
assert_eq!(raw_db.count(columns::KEY_LOOKUP), (2 * (1 + cht_size + 1)) as usize);
assert_eq!(raw_db.count(columns::CHT), 1);
assert!((0..cht_size as _).all(|i| db.header(BlockId::Number(1 + i)).unwrap().is_none()));
assert!(db.header_cht_root(cht_size, cht_size / 2).unwrap().is_some());
assert!(db.header_cht_root(cht_size, cht_size + cht_size / 2).unwrap().is_none());
@@ -803,9 +805,9 @@ pub(crate) mod tests {
assert!(db.changes_trie_cht_root(cht_size, cht_size + cht_size / 2).unwrap().is_none());
// when headers are created with changes tries roots
let db = insert_headers(header_with_changes_trie);
assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht_size + 1) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 2);
let (raw_db, db) = insert_headers(header_with_changes_trie);
assert_eq!(raw_db.count(columns::HEADER), (1 + cht_size + 1) as usize);
assert_eq!(raw_db.count(columns::CHT), 2);
assert!((0..cht_size as _).all(|i| db.header(BlockId::Number(1 + i)).unwrap().is_none()));
assert!(db.header_cht_root(cht_size, cht_size / 2).unwrap().is_some());
assert!(db.header_cht_root(cht_size, cht_size + cht_size / 2).unwrap().is_none());
+9 -16
View File
@@ -21,14 +21,13 @@ use std::{
sync::Arc,
};
use crate::columns;
use kvdb::KeyValueDB;
use crate::{columns, Database, DbHash, Transaction};
use parking_lot::Mutex;
/// Offchain local storage
#[derive(Clone)]
pub struct LocalStorage {
db: Arc<dyn KeyValueDB>,
db: Arc<dyn Database<DbHash>>,
locks: Arc<Mutex<HashMap<Vec<u8>, Arc<Mutex<()>>>>>,
}
@@ -43,12 +42,13 @@ impl LocalStorage {
/// Create new offchain storage for tests (backed by memorydb)
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test() -> Self {
let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS));
let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
let db = sp_database::as_database(db);
Self::new(db as _)
}
/// Create offchain local storage with given `KeyValueDB` backend.
pub fn new(db: Arc<dyn KeyValueDB>) -> Self {
pub fn new(db: Arc<dyn Database<DbHash>>) -> Self {
Self {
db,
locks: Default::default(),
@@ -59,20 +59,15 @@ impl LocalStorage {
impl sp_core::offchain::OffchainStorage for LocalStorage {
fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
let key: Vec<u8> = prefix.iter().chain(key).cloned().collect();
let mut tx = self.db.transaction();
tx.put(columns::OFFCHAIN, &key, value);
let mut tx = Transaction::new();
tx.set(columns::OFFCHAIN, &key, value);
if let Err(e) = self.db.write(tx) {
log::warn!("Error writing to the offchain DB: {:?}", e);
}
self.db.commit(tx);
}
fn get(&self, prefix: &[u8], key: &[u8]) -> Option<Vec<u8>> {
let key: Vec<u8> = prefix.iter().chain(key).cloned().collect();
self.db.get(columns::OFFCHAIN, &key)
.ok()
.and_then(|x| x)
.map(|v| v.to_vec())
}
fn compare_and_set(
@@ -91,9 +86,7 @@ impl sp_core::offchain::OffchainStorage for LocalStorage {
let is_set;
{
let _key_guard = key_lock.lock();
let val = self.db.get(columns::OFFCHAIN, &key)
.ok()
.and_then(|x| x);
let val = self.db.get(columns::OFFCHAIN, &key);
is_set = val.as_ref().map(|x| &**x) == old_value;
if is_set {
+56
View File
@@ -0,0 +1,56 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
/// A `Database` adapter for parity-db.
use sp_database::{Database, Change, Transaction, ColumnId};
struct DbAdapter(parity_db::Db);
fn handle_err<T>(result: parity_db::Result<T>) -> T {
match result {
Ok(r) => r,
Err(e) => {
panic!("Critical database eror: {:?}", e);
}
}
}
/// Wrap RocksDb database into a trait object that implements `sp_database::Database`
pub fn open<H: Clone>(path: &std::path::Path, num_columns: u32) -> parity_db::Result<std::sync::Arc<dyn Database<H>>> {
let db = parity_db::Db::with_columns(path, num_columns as u8)?;
Ok(std::sync::Arc::new(DbAdapter(db)))
}
impl<H: Clone> Database<H> for DbAdapter {
fn commit(&self, transaction: Transaction<H>) {
handle_err(self.0.commit(transaction.0.into_iter().map(|change|
match change {
Change::Set(col, key, value) => (col as u8, key, Some(value)),
Change::Remove(col, key) => (col as u8, key, None),
_ => unimplemented!(),
}))
);
}
fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
handle_err(self.0.get(col as u8, key))
}
fn lookup(&self, _hash: &H) -> Option<Vec<u8>> {
unimplemented!();
}
}
+87
View File
@@ -0,0 +1,87 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
/// A `Database` adapter for subdb.
use sp_database::{self, ColumnId};
use parking_lot::RwLock;
use blake2_rfc::blake2b::blake2b;
use codec::Encode;
use subdb::{Database, KeyType};
/// A database hidden behind an RwLock, so that it implements Send + Sync.
///
/// Construct by creating a `Database` and then using `.into()`.
pub struct DbAdapter<H: KeyType>(RwLock<Database<H>>);
/// Wrap RocksDb database into a trait object that implements `sp_database::Database`
pub fn open<H: KeyType + 'static>(
path: &std::path::Path,
_num_columns: u32,
) -> Result<std::sync::Arc<dyn sp_database::Database<H>>, subdb::Error> {
let db = subdb::Options::from_path(path.into()).open()?;
Ok(std::sync::Arc::new(DbAdapter(RwLock::new(db))))
}
impl<H: KeyType> sp_database::Database<H> for DbAdapter<H> {
fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
let mut hash = H::default();
(col, key).using_encoded(|d|
hash.as_mut().copy_from_slice(blake2b(32, &[], d).as_bytes())
);
self.0.read().get(&hash)
}
fn with_get(&self, col: ColumnId, key: &[u8], f: &mut dyn FnMut(&[u8])) {
let mut hash = H::default();
(col, key).using_encoded(|d|
hash.as_mut().copy_from_slice(blake2b(32, &[], d).as_bytes())
);
let _ = self.0.read().get_ref(&hash).map(|d| f(d.as_ref()));
}
fn set(&self, col: ColumnId, key: &[u8], value: &[u8]) {
let mut hash = H::default();
(col, key).using_encoded(|d|
hash.as_mut().copy_from_slice(blake2b(32, &[], d).as_bytes())
);
self.0.write().insert(&value, &hash);
}
fn remove(&self, col: ColumnId, key: &[u8]) {
let mut hash = H::default();
(col, key).using_encoded(|d|
hash.as_mut().copy_from_slice(blake2b(32, &[], d).as_bytes())
);
let _ = self.0.write().remove(&hash);
}
fn lookup(&self, hash: &H) -> Option<Vec<u8>> {
self.0.read().get(hash)
}
fn with_lookup(&self, hash: &H, f: &mut dyn FnMut(&[u8])) {
let _ = self.0.read().get_ref(hash).map(|d| f(d.as_ref()));
}
fn store(&self, hash: &H, preimage: &[u8]) {
self.0.write().insert(preimage, hash);
}
fn release(&self, hash: &H) {
let _ = self.0.write().remove(hash);
}
}
+14 -89
View File
@@ -19,18 +19,9 @@
use std::fs;
use std::io::{Read, Write, ErrorKind};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use codec::Encode;
use kvdb_rocksdb::{Database, DatabaseConfig};
use parking_lot::RwLock;
use sp_blockchain::{well_known_cache_keys, Cache};
use sp_core::ChangesTrieConfiguration;
use sp_runtime::traits::Block as BlockT;
use crate::{
cache::{ComplexBlockId, DbCache, DbCacheSync},
utils::{DatabaseType, check_database_type, db_err, read_genesis_hash},
};
use crate::utils::DatabaseType;
/// Version file name.
const VERSION_FILE_NAME: &'static str = "db_version";
@@ -38,69 +29,21 @@ const VERSION_FILE_NAME: &'static str = "db_version";
/// Current db version.
const CURRENT_VERSION: u32 = 1;
/// Number of columns in v0.
const V0_NUM_COLUMNS: u32 = 10;
/// Upgrade database to current version.
pub fn upgrade_db<Block: BlockT>(db_path: &Path, db_type: DatabaseType) -> sp_blockchain::Result<()> {
let db_version = current_version(db_path)?;
match db_version {
0 => migrate_0_to_1::<Block>(db_path, db_type)?,
1 => (),
_ => Err(sp_blockchain::Error::Backend(format!("Future database version: {}", db_version)))?,
pub fn upgrade_db<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> sp_blockchain::Result<()> {
let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none());
if !is_empty {
let db_version = current_version(db_path)?;
match db_version {
0 => Err(sp_blockchain::Error::Backend(format!("Unsupported database version: {}", db_version)))?,
1 => (),
_ => Err(sp_blockchain::Error::Backend(format!("Future database version: {}", db_version)))?,
}
}
update_version(db_path)
}
/// Migration from version0 to version1:
/// 1) the number of columns has changed from 10 to 11;
/// 2) changes tries configuration are now cached.
fn migrate_0_to_1<Block: BlockT>(db_path: &Path, db_type: DatabaseType) -> sp_blockchain::Result<()> {
{
let db = open_database(db_path, db_type, V0_NUM_COLUMNS)?;
db.add_column().map_err(db_err)?;
db.flush().map_err(db_err)?;
}
let db = open_database(db_path, db_type, V0_NUM_COLUMNS + 1)?;
const V0_FULL_KEY_LOOKUP_COLUMN: u32 = 3;
const V0_FULL_HEADER_COLUMN: u32 = 4;
const V0_FULL_CACHE_COLUMN: u32 = 10; // that's the column we have just added
const V0_LIGHT_KEY_LOOKUP_COLUMN: u32 = 1;
const V0_LIGHT_HEADER_COLUMN: u32 = 2;
const V0_LIGHT_CACHE_COLUMN: u32 = 3;
let (key_lookup_column, header_column, cache_column) = match db_type {
DatabaseType::Full => (
V0_FULL_KEY_LOOKUP_COLUMN,
V0_FULL_HEADER_COLUMN,
V0_FULL_CACHE_COLUMN,
),
DatabaseType::Light => (
V0_LIGHT_KEY_LOOKUP_COLUMN,
V0_LIGHT_HEADER_COLUMN,
V0_LIGHT_CACHE_COLUMN,
),
};
let genesis_hash: Option<Block::Hash> = read_genesis_hash(&db)?;
if let Some(genesis_hash) = genesis_hash {
let cache: DbCacheSync<Block> = DbCacheSync(RwLock::new(DbCache::new(
Arc::new(db),
key_lookup_column,
header_column,
cache_column,
genesis_hash,
ComplexBlockId::new(genesis_hash, 0.into()),
)));
let changes_trie_config: Option<ChangesTrieConfiguration> = None;
cache.initialize(&well_known_cache_keys::CHANGES_TRIE_CONFIG, changes_trie_config.encode())?;
}
Ok(())
}
/// Reads current database version from the file at given path.
/// If the file does not exist returns 0.
@@ -118,14 +61,9 @@ fn current_version(path: &Path) -> sp_blockchain::Result<u32> {
}
}
/// Opens database of given type with given number of columns.
fn open_database(db_path: &Path, db_type: DatabaseType, db_columns: u32) -> sp_blockchain::Result<Database> {
let db_path = db_path.to_str()
.ok_or_else(|| sp_blockchain::Error::Backend("Invalid database path".into()))?;
let db_cfg = DatabaseConfig::with_columns(db_columns);
let db = Database::open(&db_cfg, db_path).map_err(db_err)?;
check_database_type(&db, db_type)?;
Ok(db)
/// Maps database error to client error
fn db_err(err: std::io::Error) -> sp_blockchain::Error {
sp_blockchain::Error::Backend(format!("{}", err))
}
/// Writes current database version to the file.
@@ -152,8 +90,6 @@ mod tests {
use super::*;
fn create_db(db_path: &Path, version: Option<u32>) {
let db_cfg = DatabaseConfig::with_columns(V0_NUM_COLUMNS);
Database::open(&db_cfg, db_path.to_str().unwrap()).unwrap();
if let Some(version) = version {
fs::create_dir_all(db_path).unwrap();
let mut file = fs::File::create(version_file_path(db_path)).unwrap();
@@ -166,7 +102,7 @@ mod tests {
state_cache_size: 0,
state_cache_child_ratio: None,
pruning: PruningMode::ArchiveAll,
source: DatabaseSettingsSrc::Path { path: db_path.to_owned(), cache_size: 128 },
source: DatabaseSettingsSrc::RocksDb { path: db_path.to_owned(), cache_size: 128 },
}, DatabaseType::Full).map(|_| ())
}
@@ -184,15 +120,4 @@ mod tests {
open_database(db_dir.path()).unwrap();
assert_eq!(current_version(db_dir.path()).unwrap(), CURRENT_VERSION);
}
#[test]
fn upgrade_from_0_to_1_works() {
for version_from_file in &[None, Some(0)] {
let db_dir = tempfile::TempDir::new().unwrap();
let db_path = db_dir.path();
create_db(db_path, *version_from_file);
open_database(db_path).unwrap();
assert_eq!(current_version(db_path).unwrap(), CURRENT_VERSION);
}
}
}
+53 -52
View File
@@ -18,21 +18,19 @@
//! full and light storages.
use std::sync::Arc;
use std::{io, convert::TryInto};
use std::convert::TryInto;
use kvdb::{KeyValueDB, DBTransaction};
#[cfg(any(feature = "kvdb-rocksdb", test))]
use kvdb_rocksdb::{Database, DatabaseConfig};
use log::debug;
use codec::Decode;
use sp_trie::DBValue;
use sp_database::Transaction;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, Zero,
UniqueSaturatedFrom, UniqueSaturatedInto,
};
use crate::{DatabaseSettings, DatabaseSettingsSrc};
use crate::{DatabaseSettings, DatabaseSettingsSrc, Database, DbHash};
/// Number of columns in the db. Must be the same for both full && light dbs.
/// Otherwise RocksDb will fail to open database && check its type.
@@ -136,35 +134,35 @@ pub fn lookup_key_to_number<N>(key: &[u8]) -> sp_blockchain::Result<N> where
/// Delete number to hash mapping in DB transaction.
pub fn remove_number_to_key_mapping<N: TryInto<u32>>(
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
key_lookup_col: u32,
number: N,
) -> sp_blockchain::Result<()> {
transaction.delete(key_lookup_col, number_index_key(number)?.as_ref());
transaction.remove(key_lookup_col, number_index_key(number)?.as_ref());
Ok(())
}
/// Remove key mappings.
pub fn remove_key_mappings<N: TryInto<u32>, H: AsRef<[u8]>>(
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
key_lookup_col: u32,
number: N,
hash: H,
) -> sp_blockchain::Result<()> {
remove_number_to_key_mapping(transaction, key_lookup_col, number)?;
transaction.delete(key_lookup_col, hash.as_ref());
transaction.remove(key_lookup_col, hash.as_ref());
Ok(())
}
/// Place a number mapping into the database. This maps number to current perceived
/// block hash at that position.
pub fn insert_number_to_key_mapping<N: TryInto<u32> + Clone, H: AsRef<[u8]>>(
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
key_lookup_col: u32,
number: N,
hash: H,
) -> sp_blockchain::Result<()> {
transaction.put_vec(
transaction.set_from_vec(
key_lookup_col,
number_index_key(number.clone())?.as_ref(),
number_and_hash_to_lookup_key(number, hash)?,
@@ -174,12 +172,12 @@ pub fn insert_number_to_key_mapping<N: TryInto<u32> + Clone, H: AsRef<[u8]>>(
/// Insert a hash to key mapping in the database.
pub fn insert_hash_to_key_mapping<N: TryInto<u32>, H: AsRef<[u8]> + Clone>(
transaction: &mut DBTransaction,
transaction: &mut Transaction<DbHash>,
key_lookup_col: u32,
number: N,
hash: H,
) -> sp_blockchain::Result<()> {
transaction.put_vec(
transaction.set_from_vec(
key_lookup_col,
hash.clone().as_ref(),
number_and_hash_to_lookup_key(number, hash)?,
@@ -191,42 +189,35 @@ pub fn insert_hash_to_key_mapping<N: TryInto<u32>, H: AsRef<[u8]> + Clone>(
/// block lookup key is the DB-key header, block and justification are stored under.
/// looks up lookup key by hash from DB as necessary.
pub fn block_id_to_lookup_key<Block>(
db: &dyn KeyValueDB,
db: &dyn Database<DbHash>,
key_lookup_col: u32,
id: BlockId<Block>
) -> Result<Option<Vec<u8>>, sp_blockchain::Error> where
Block: BlockT,
::sp_runtime::traits::NumberFor<Block>: UniqueSaturatedFrom<u64> + UniqueSaturatedInto<u64>,
{
let res = match id {
Ok(match id {
BlockId::Number(n) => db.get(
key_lookup_col,
number_index_key(n)?.as_ref(),
),
BlockId::Hash(h) => db.get(key_lookup_col, h.as_ref()),
};
res.map_err(db_err)
BlockId::Hash(h) => db.get(key_lookup_col, h.as_ref())
})
}
/// Maps database error to client error
pub fn db_err(err: io::Error) -> sp_blockchain::Error {
sp_blockchain::Error::Backend(format!("{}", err))
}
/// Open RocksDB database.
/// Opens the configured database.
pub fn open_database<Block: BlockT>(
config: &DatabaseSettings,
db_type: DatabaseType,
) -> sp_blockchain::Result<Arc<dyn KeyValueDB>> {
let db: Arc<dyn KeyValueDB> = match &config.source {
) -> sp_blockchain::Result<Arc<dyn Database<DbHash>>> {
let db: Arc<dyn Database<DbHash>> = match &config.source {
#[cfg(any(feature = "kvdb-rocksdb", test))]
DatabaseSettingsSrc::Path { path, cache_size } => {
DatabaseSettingsSrc::RocksDb { path, cache_size } => {
// first upgrade database to required version
crate::upgrade::upgrade_db::<Block>(&path, db_type)?;
// and now open database assuming that it has the latest version
let mut db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
let mut db_config = kvdb_rocksdb::DatabaseConfig::with_columns(NUM_COLUMNS);
let state_col_budget = (*cache_size as f64 * 0.9) as usize;
let other_col_budget = (cache_size - state_col_budget) / (NUM_COLUMNS as usize - 1);
let mut memory_budget = std::collections::HashMap::new();
@@ -245,21 +236,32 @@ pub fn open_database<Block: BlockT>(
log::trace!(
target: "db",
"Open database at {}, state column budget: {} MiB, others({}) column cache: {} MiB",
"Open RocksDB database at {}, state column budget: {} MiB, others({}) column cache: {} MiB",
path,
state_col_budget,
NUM_COLUMNS,
other_col_budget,
);
Arc::new(Database::open(&db_config, &path).map_err(db_err)?)
let db = kvdb_rocksdb::Database::open(&db_config, &path)
.map_err(|err| sp_blockchain::Error::Backend(format!("{}", err)))?;
sp_database::as_database(db)
},
#[cfg(not(any(feature = "kvdb-rocksdb", test)))]
DatabaseSettingsSrc::Path { .. } => {
let msg = "Try to open RocksDB database with RocksDB disabled".into();
return Err(sp_blockchain::Error::Backend(msg));
#[cfg(feature = "subdb")]
DatabaseSettingsSrc::SubDb { path } => {
crate::subdb::open(&path, NUM_COLUMNS)
.map_err(|e| sp_blockchain::Error::Backend(format!("{:?}", e)))?
},
#[cfg(feature = "parity-db")]
DatabaseSettingsSrc::ParityDb { path } => {
crate::parity_db::open(&path, NUM_COLUMNS)
.map_err(|e| sp_blockchain::Error::Backend(format!("{:?}", e)))?
},
DatabaseSettingsSrc::Custom(db) => db.clone(),
_ => {
let msg = "Trying to open a unsupported database".into();
return Err(sp_blockchain::Error::Backend(msg));
},
};
check_database_type(&*db, db_type)?;
@@ -268,8 +270,8 @@ pub fn open_database<Block: BlockT>(
}
/// Check database type.
pub fn check_database_type(db: &dyn KeyValueDB, db_type: DatabaseType) -> sp_blockchain::Result<()> {
match db.get(COLUMN_META, meta_keys::TYPE).map_err(db_err)? {
pub fn check_database_type(db: &dyn Database<DbHash>, db_type: DatabaseType) -> sp_blockchain::Result<()> {
match db.get(COLUMN_META, meta_keys::TYPE) {
Some(stored_type) => {
if db_type.as_str().as_bytes() != &*stored_type {
return Err(sp_blockchain::Error::Backend(
@@ -277,9 +279,9 @@ pub fn check_database_type(db: &dyn KeyValueDB, db_type: DatabaseType) -> sp_blo
}
},
None => {
let mut transaction = DBTransaction::new();
transaction.put(COLUMN_META, meta_keys::TYPE, db_type.as_str().as_bytes());
db.write(transaction).map_err(db_err)?;
let mut transaction = Transaction::new();
transaction.set(COLUMN_META, meta_keys::TYPE, db_type.as_str().as_bytes());
db.commit(transaction)
},
}
@@ -288,7 +290,7 @@ pub fn check_database_type(db: &dyn KeyValueDB, db_type: DatabaseType) -> sp_blo
/// Read database column entry for the given block.
pub fn read_db<Block>(
db: &dyn KeyValueDB,
db: &dyn Database<DbHash>,
col_index: u32,
col: u32,
id: BlockId<Block>
@@ -297,14 +299,14 @@ pub fn read_db<Block>(
Block: BlockT,
{
block_id_to_lookup_key(db, col_index, id).and_then(|key| match key {
Some(key) => db.get(col, key.as_ref()).map_err(db_err),
Some(key) => Ok(db.get(col, key.as_ref())),
None => Ok(None),
})
}
/// Read a header from the database.
pub fn read_header<Block: BlockT>(
db: &dyn KeyValueDB,
db: &dyn Database<DbHash>,
col_index: u32,
col: u32,
id: BlockId<Block>,
@@ -322,7 +324,7 @@ pub fn read_header<Block: BlockT>(
/// Required header from the database.
pub fn require_header<Block: BlockT>(
db: &dyn KeyValueDB,
db: &dyn Database<DbHash>,
col_index: u32,
col: u32,
id: BlockId<Block>,
@@ -334,7 +336,7 @@ pub fn require_header<Block: BlockT>(
}
/// Read meta from the database.
pub fn read_meta<Block>(db: &dyn KeyValueDB, col_header: u32) -> Result<
pub fn read_meta<Block>(db: &dyn Database<DbHash>, col_header: u32) -> Result<
Meta<<<Block as BlockT>::Header as HeaderT>::Number, Block::Hash>,
sp_blockchain::Error,
>
@@ -353,11 +355,10 @@ pub fn read_meta<Block>(db: &dyn KeyValueDB, col_header: u32) -> Result<
};
let load_meta_block = |desc, key| -> Result<_, sp_blockchain::Error> {
if let Some(Some(header)) = db.get(COLUMN_META, key).and_then(|id|
match id {
Some(id) => db.get(col_header, &id).map(|h| h.map(|b| Block::Header::decode(&mut &b[..]).ok())),
None => Ok(None),
}).map_err(db_err)?
if let Some(Some(header)) = match db.get(COLUMN_META, key) {
Some(id) => db.get(col_header, &id).map(|b| Block::Header::decode(&mut &b[..]).ok()),
None => None,
}
{
let hash = header.hash();
debug!("DB Opened blockchain db, fetched {} = {:?} ({})", desc, hash, header.number());
@@ -380,8 +381,8 @@ pub fn read_meta<Block>(db: &dyn KeyValueDB, col_header: u32) -> Result<
}
/// Read genesis hash from database.
pub fn read_genesis_hash<Hash: Decode>(db: &dyn KeyValueDB) -> sp_blockchain::Result<Option<Hash>> {
match db.get(COLUMN_META, meta_keys::GENESIS_HASH).map_err(db_err)? {
pub fn read_genesis_hash<Hash: Decode>(db: &dyn Database<DbHash>) -> sp_blockchain::Result<Option<Hash>> {
match db.get(COLUMN_META, meta_keys::GENESIS_HASH) {
Some(h) => match Decode::decode(&mut &h[..]) {
Ok(h) => Ok(Some(h)),
Err(err) => Err(sp_blockchain::Error::Backend(