Storage chains part 1 (#7868)

* CLI options and DB upgrade

* Transaction storage

* Block pruning

* Block pruning test

* Style

* Naming

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Style

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2021-01-14 21:55:41 +03:00
committed by GitHub
parent 086c7946ca
commit b59c3297cc
13 changed files with 347 additions and 41 deletions
+200 -16
View File
@@ -66,7 +66,7 @@ use codec::{Decode, Encode};
use hash_db::Prefix;
use sp_trie::{MemoryDB, PrefixedMemoryDB, prefixed_key};
use sp_database::Transaction;
use sp_core::ChangesTrieConfiguration;
use sp_core::{Hasher, ChangesTrieConfiguration};
use sp_core::offchain::storage::{OffchainOverlayedChange, OffchainOverlayedChanges};
use sp_core::storage::{well_known_keys, ChildInfo};
use sp_arithmetic::traits::Saturating;
@@ -264,10 +264,33 @@ pub struct DatabaseSettings {
pub state_cache_size: usize,
/// Ratio of cache size dedicated to child tries.
pub state_cache_child_ratio: Option<(usize, usize)>,
/// Pruning mode.
pub pruning: PruningMode,
/// State pruning mode.
pub state_pruning: PruningMode,
/// Where to find the database.
pub source: DatabaseSettingsSrc,
/// Block pruning mode.
pub keep_blocks: KeepBlocks,
/// Block body/Transaction storage scheme.
pub transaction_storage: TransactionStorageMode,
}
/// Block pruning settings.
#[derive(Debug, Clone, Copy)]
pub enum KeepBlocks {
/// Keep full block history.
All,
/// Keep N recent finalized blocks.
Some(u32),
}
/// Block body storage scheme.
#[derive(Debug, Clone, Copy)]
pub enum TransactionStorageMode {
/// Store block body as an encoded list of full transactions in the BODY column
BlockBody,
/// Store a list of hashes in the BODY column and each transaction individually
/// in the TRANSACTION column.
StorageChain,
}
/// Where to find the database..
@@ -334,6 +357,8 @@ pub(crate) mod columns {
/// Offchain workers local storage
pub const OFFCHAIN: u32 = 9;
pub const CACHE: u32 = 10;
/// Transactions
pub const TRANSACTION: u32 = 11;
}
struct PendingBlock<Block: BlockT> {
@@ -372,10 +397,14 @@ pub struct BlockchainDb<Block: BlockT> {
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
header_metadata_cache: Arc<HeaderMetadataCache<Block>>,
header_cache: Mutex<LinkedHashMap<Block::Hash, Option<Block::Header>>>,
transaction_storage: TransactionStorageMode,
}
impl<Block: BlockT> BlockchainDb<Block> {
fn new(db: Arc<dyn Database<DbHash>>) -> ClientResult<Self> {
fn new(
db: Arc<dyn Database<DbHash>>,
transaction_storage: TransactionStorageMode
) -> ClientResult<Self> {
let meta = read_meta::<Block>(&*db, columns::HEADER)?;
let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
Ok(BlockchainDb {
@@ -384,6 +413,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
meta: Arc::new(RwLock::new(meta)),
header_metadata_cache: Arc::new(HeaderMetadataCache::default()),
header_cache: Default::default(),
transaction_storage,
})
}
@@ -418,6 +448,20 @@ impl<Block: BlockT> BlockchainDb<Block> {
header.digest().log(DigestItem::as_changes_trie_root)
.cloned()))
}
fn extrinsic(&self, hash: &Block::Hash) -> ClientResult<Option<Block::Extrinsic>> {
match self.db.get(columns::TRANSACTION, hash.as_ref()) {
Some(ex) => {
match Decode::decode(&mut &ex[..]) {
Ok(ex) => Ok(Some(ex)),
Err(err) => Err(sp_blockchain::Error::Backend(
format!("Error decoding extrinsic {}: {}", hash, err)
)),
}
},
None => Ok(None),
}
}
}
impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for BlockchainDb<Block> {
@@ -476,11 +520,30 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha
impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body: {}", err)
)),
Some(body) => {
match self.transaction_storage {
TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body: {}", err)
)),
},
TransactionStorageMode::StorageChain => {
match Vec::<Block::Hash>::decode(&mut &body[..]) {
Ok(hashes) => {
let extrinsics: ClientResult<Vec<Block::Extrinsic>> = hashes.into_iter().map(
|h| self.extrinsic(&h) .and_then(|maybe_ex| maybe_ex.ok_or_else(
|| sp_blockchain::Error::Backend(
format!("Missing transaction: {}", h))))
).collect();
Ok(Some(extrinsics?))
}
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body list: {}", err)
)),
}
}
}
}
None => Ok(None),
}
@@ -855,6 +918,8 @@ pub struct Backend<Block: BlockT> {
shared_cache: SharedCache<Block>,
import_lock: Arc<RwLock<()>>,
is_archive: bool,
keep_blocks: KeepBlocks,
transaction_storage: TransactionStorageMode,
io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
state_usage: Arc<StateUsageStats>,
}
@@ -871,13 +936,29 @@ impl<Block: BlockT> Backend<Block> {
/// Create new memory-backed client backend for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self {
Self::new_test_with_tx_storage(
keep_blocks,
canonicalization_delay,
TransactionStorageMode::BlockBody,
)
}
/// Create new memory-backed client backend for tests.
#[cfg(any(test, feature = "test-helpers"))]
fn new_test_with_tx_storage(
keep_blocks: u32,
canonicalization_delay: u64,
transaction_storage: TransactionStorageMode,
) -> Self {
let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
let db = sp_database::as_database(db);
let db_setting = DatabaseSettings {
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
pruning: PruningMode::keep_blocks(keep_blocks),
state_pruning: PruningMode::keep_blocks(keep_blocks),
source: DatabaseSettingsSrc::Custom(db),
keep_blocks: KeepBlocks::Some(keep_blocks),
transaction_storage,
};
Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
@@ -888,12 +969,12 @@ impl<Block: BlockT> Backend<Block> {
canonicalization_delay: u64,
config: &DatabaseSettings,
) -> ClientResult<Self> {
let is_archive_pruning = config.pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let is_archive_pruning = config.state_pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone(), config.transaction_storage.clone())?;
let meta = blockchain.meta.clone();
let map_e = |e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e);
let state_db: StateDb<_, _> = StateDb::new(
config.pruning.clone(),
config.state_pruning.clone(),
!config.source.supports_ref_counting(),
&StateMetaDb(&*db),
).map_err(map_e)?;
@@ -933,6 +1014,8 @@ impl<Block: BlockT> Backend<Block> {
is_archive: is_archive_pruning,
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)),
state_usage: Arc::new(StateUsageStats::new()),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
})
}
@@ -1140,7 +1223,21 @@ impl<Block: BlockT> Backend<Block> {
transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
if let Some(body) = &pending_block.body {
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
match self.transaction_storage {
TransactionStorageMode::BlockBody => {
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
},
TransactionStorageMode::StorageChain => {
let mut hashes = Vec::with_capacity(body.len());
for extrinsic in body {
let extrinsic = extrinsic.encode();
let hash = HashFor::<Block>::hash(&extrinsic);
transaction.set(columns::TRANSACTION, &hash.as_ref(), &extrinsic);
hashes.push(hash);
}
transaction.set_from_vec(columns::BODY, &lookup_key, hashes.encode());
},
}
}
if let Some(justification) = pending_block.justification {
transaction.set_from_vec(columns::JUSTIFICATION, &lookup_key, justification.encode());
@@ -1391,6 +1488,7 @@ impl<Block: BlockT> Backend<Block> {
}
}
self.prune_blocks(transaction, f_num)?;
let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
match displaced {
x @ &mut None => *x = Some(new_displaced),
@@ -1399,6 +1497,50 @@ impl<Block: BlockT> Backend<Block> {
Ok(())
}
fn prune_blocks(
&self,
transaction: &mut Transaction<DbHash>,
finalized: NumberFor<Block>,
) -> ClientResult<()> {
if let KeepBlocks::Some(keep_blocks) = self.keep_blocks {
// Always keep the last finalized block
let keep = std::cmp::max(keep_blocks, 1);
if finalized < keep.into() {
return Ok(())
}
let number = finalized.saturating_sub(keep.into());
match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, BlockId::<Block>::number(number))? {
Some(body) => {
debug!(target: "db", "Removing block #{}", number);
utils::remove_from_db(
transaction,
&*self.storage.db,
columns::KEY_LOOKUP,
columns::BODY,
BlockId::<Block>::number(number),
)?;
match self.transaction_storage {
TransactionStorageMode::BlockBody => {},
TransactionStorageMode::StorageChain => {
match Vec::<Block::Hash>::decode(&mut &body[..]) {
Ok(hashes) => {
for h in hashes {
transaction.remove(columns::TRANSACTION, h.as_ref());
}
}
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body list: {}", err)
)),
}
}
}
}
None => return Ok(()),
}
}
Ok(())
}
}
fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db::CommitSet<Vec<u8>>) {
@@ -1804,6 +1946,17 @@ pub(crate) mod tests {
parent_hash: H256,
changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
extrinsics_root: H256,
) -> H256 {
insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new())
}
pub fn insert_block(
backend: &Backend<Block>,
number: u64,
parent_hash: H256,
changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
extrinsics_root: H256,
body: Vec<ExtrinsicWrapper<u64>>,
) -> H256 {
use sp_runtime::testing::Digest;
@@ -1830,7 +1983,7 @@ pub(crate) mod tests {
};
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, block_id).unwrap();
op.set_block_data(header, Some(Vec::new()), None, NewBlockState::Best).unwrap();
op.set_block_data(header, Some(body), None, NewBlockState::Best).unwrap();
op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap();
backend.commit_operation(op).unwrap();
@@ -1882,8 +2035,10 @@ pub(crate) mod tests {
let backend = Backend::<Block>::new(DatabaseSettings {
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
pruning: PruningMode::keep_blocks(1),
state_pruning: PruningMode::keep_blocks(1),
source: DatabaseSettingsSrc::Custom(backing),
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
}, 0).unwrap();
assert_eq!(backend.blockchain().info().best_number, 9);
for i in 0..10 {
@@ -2427,4 +2582,33 @@ pub(crate) mod tests {
assert_eq!(cht_root_1, cht_root_2);
assert_eq!(cht_root_2, cht_root_3);
}
#[test]
fn prune_blocks_on_finalize() {
for storage in &[TransactionStorageMode::BlockBody, TransactionStorageMode::StorageChain] {
let backend = Backend::<Block>::new_test_with_tx_storage(2, 0, *storage);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0 .. 5 {
let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()]);
blocks.push(hash);
prev_hash = hash;
}
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
for i in 1 .. 5 {
op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
}
backend.commit_operation(op).unwrap();
}
let bc = backend.blockchain();
assert_eq!(None, bc.body(BlockId::hash(blocks[0])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[1])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[2])).unwrap());
assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap());
assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
}
}
}
+33 -5
View File
@@ -24,21 +24,26 @@ use std::path::{Path, PathBuf};
use sp_runtime::traits::Block as BlockT;
use crate::utils::DatabaseType;
use kvdb_rocksdb::{Database, DatabaseConfig};
/// Version file name.
const VERSION_FILE_NAME: &'static str = "db_version";
/// Current db version.
const CURRENT_VERSION: u32 = 1;
const CURRENT_VERSION: u32 = 2;
/// Number of columns in v1.
const V1_NUM_COLUMNS: u32 = 11;
/// Upgrade database to current version.
pub fn upgrade_db<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> sp_blockchain::Result<()> {
pub fn upgrade_db<Block: BlockT>(db_path: &Path, db_type: DatabaseType) -> sp_blockchain::Result<()> {
let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none());
if !is_empty {
let db_version = current_version(db_path)?;
match db_version {
0 => Err(sp_blockchain::Error::Backend(format!("Unsupported database version: {}", db_version)))?,
1 => (),
1 => migrate_1_to_2::<Block>(db_path, db_type)?,
CURRENT_VERSION => (),
_ => Err(sp_blockchain::Error::Backend(format!("Future database version: {}", db_version)))?,
}
}
@@ -46,6 +51,16 @@ pub fn upgrade_db<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> sp_b
update_version(db_path)
}
/// Migration from version1 to version2:
/// 1) the number of columns has changed from 11 to 12;
/// 2) transactions column is added;
fn migrate_1_to_2<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> sp_blockchain::Result<()> {
let db_path = db_path.to_str()
.ok_or_else(|| sp_blockchain::Error::Backend("Invalid database path".into()))?;
let db_cfg = DatabaseConfig::with_columns(V1_NUM_COLUMNS);
let db = Database::open(&db_cfg, db_path).map_err(db_err)?;
db.add_column().map_err(db_err)
}
/// Reads current database version from the file at given path.
/// If the file does not exist returns 0.
@@ -87,7 +102,7 @@ fn version_file_path(path: &Path) -> PathBuf {
#[cfg(test)]
mod tests {
use sc_state_db::PruningMode;
use crate::{DatabaseSettings, DatabaseSettingsSrc};
use crate::{DatabaseSettings, DatabaseSettingsSrc, KeepBlocks, TransactionStorageMode};
use crate::tests::Block;
use super::*;
@@ -103,8 +118,10 @@ mod tests {
crate::utils::open_database::<Block>(&DatabaseSettings {
state_cache_size: 0,
state_cache_child_ratio: None,
pruning: PruningMode::ArchiveAll,
state_pruning: PruningMode::ArchiveAll,
source: DatabaseSettingsSrc::RocksDb { path: db_path.to_owned(), cache_size: 128 },
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
}, DatabaseType::Full).map(|_| ())
}
@@ -122,4 +139,15 @@ mod tests {
open_database(db_dir.path()).unwrap();
assert_eq!(current_version(db_dir.path()).unwrap(), CURRENT_VERSION);
}
#[test]
fn upgrade_from_1_to_2_works() {
for version_from_file in &[None, Some(1)] {
let db_dir = tempfile::TempDir::new().unwrap();
let db_path = db_dir.path();
create_db(db_path, *version_from_file);
open_database(db_path).unwrap();
assert_eq!(current_version(db_path).unwrap(), CURRENT_VERSION);
}
}
}
+18 -1
View File
@@ -37,7 +37,7 @@ use crate::{DatabaseSettings, DatabaseSettingsSrc, Database, DbHash};
/// Number of columns in the db. Must be the same for both full && light dbs.
/// Otherwise RocksDb will fail to open database && check its type.
#[cfg(any(feature = "with-kvdb-rocksdb", feature = "with-parity-db", feature = "test-helpers", test))]
pub const NUM_COLUMNS: u32 = 11;
pub const NUM_COLUMNS: u32 = 12;
/// Meta column. The set of keys in the column is shared by full && light storages.
pub const COLUMN_META: u32 = 0;
@@ -327,6 +327,23 @@ pub fn read_db<Block>(
})
}
/// Remove database column entry for the given block.
pub fn remove_from_db<Block>(
transaction: &mut Transaction<DbHash>,
db: &dyn Database<DbHash>,
col_index: u32,
col: u32,
id: BlockId<Block>,
) -> sp_blockchain::Result<()>
where
Block: BlockT,
{
block_id_to_lookup_key(db, col_index, id).and_then(|key| match key {
Some(key) => Ok(transaction.remove(col, key.as_ref())),
None => Ok(()),
})
}
/// Read a header from the database.
pub fn read_header<Block: BlockT>(
db: &dyn Database<DbHash>,