Pruning changes tries (#856)

* changes trie pruning

* add comment

* do not prune changes tries on archive nodes
This commit is contained in:
Svyatoslav Nikolsky
2018-10-17 11:08:45 +03:00
committed by Gav Wood
parent 8bc5242c92
commit 9886d12c26
10 changed files with 536 additions and 44 deletions
+129 -14
View File
@@ -67,7 +67,8 @@ use hash_db::Hasher;
use kvdb::{KeyValueDB, DBTransaction};
use trie::MemoryDB;
use parking_lot::RwLock;
use primitives::{H256, AuthorityId, Blake2Hasher};
use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration};
use primitives::storage::well_known_keys;
use runtime_primitives::{generic::BlockId, Justification};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, Digest, DigestItem};
use runtime_primitives::BuildStorage;
@@ -80,6 +81,7 @@ use state_db::StateDb;
pub use state_db::PruningMode;
const CANONICALIZATION_DELAY: u64 = 256;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u64 = 32768;
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2Hasher>>, Blake2Hasher>;
@@ -360,9 +362,42 @@ impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {
pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
min_blocks_to_keep: Option<u64>,
_phantom: ::std::marker::PhantomData<Block>,
}
impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// Commit new changes trie.
pub fn commit(&self, tx: &mut DBTransaction, mut changes_trie: MemoryDB<Blake2Hasher>) {
for (key, (val, _)) in changes_trie.drain() {
tx.put(columns::CHANGES_TRIE, &key[..], &val);
}
}
/// Prune obsolete changes tries.
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block: NumberFor<Block>) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
None => return,
};
// read configuration from the database. it is OK to do it here (without checking tx for
// modifications), since config can't change
let config = match config {
Some(config) => config,
None => return,
};
state_machine::prune_changes_tries(
&config,
&*self,
min_blocks_to_keep,
block.as_(),
|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
}
}
impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, block: u64) -> Result<Option<H256>, String> {
Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block)))
@@ -389,7 +424,7 @@ impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChange
/// Otherwise, trie nodes are kept only from some recent blocks.
pub struct Backend<Block: BlockT> {
storage: Arc<StorageDb<Block>>,
tries_change_storage: DbChangesTrieStorage<Block>,
changes_tries_storage: DbChangesTrieStorage<Block>,
blockchain: BlockchainDb<Block>,
canonicalization_delay: u64,
}
@@ -418,6 +453,7 @@ impl<Block: BlockT> Backend<Block> {
}
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
let is_archive_pruning = pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
@@ -425,14 +461,15 @@ impl<Block: BlockT> Backend<Block> {
db: db.clone(),
state_db,
};
let tries_change_storage = DbChangesTrieStorage {
let changes_tries_storage = DbChangesTrieStorage {
db,
min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
_phantom: Default::default(),
};
Ok(Backend {
storage: Arc::new(storage_db),
tries_change_storage,
changes_tries_storage,
blockchain,
canonicalization_delay,
})
@@ -487,7 +524,8 @@ impl<Block: BlockT> Backend<Block> {
let f_num = f_header.number().clone();
if f_num.as_() > self.storage.state_db.best_canonical() {
if &meta.finalized_hash != f_header.parent_hash() {
let parent_hash = f_header.parent_hash().clone();
if meta.finalized_hash != parent_hash {
return Err(::client::error::ErrorKind::NonSequentialFinalization(
format!("Last finalized {:?} not parent of {:?}",
meta.finalized_hash, f_hash),
@@ -497,6 +535,13 @@ impl<Block: BlockT> Backend<Block> {
let commit = self.storage.state_db.canonicalize_block(&f_hash);
apply_state_commit(transaction, commit);
// read config from genesis, since it is readonly atm
use client::backend::Backend;
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_num);
}
Ok(())
@@ -518,12 +563,6 @@ fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitS
}
}
fn apply_changes_trie_commit(transaction: &mut DBTransaction, mut commit: MemoryDB<Blake2Hasher>) {
for (key, (val, _)) in commit.drain() {
transaction.put(columns::CHANGES_TRIE, &key[..], &val);
}
}
impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
@@ -618,7 +657,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
apply_changes_trie_commit(&mut transaction, operation.changes_trie_updates);
self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates);
let finalized = match pending_block.leaf_state {
NewBlockState::Final => true,
@@ -679,7 +718,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
}
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> {
Some(&self.tries_change_storage)
Some(&self.changes_tries_storage)
}
fn revert(&self, n: NumberFor<Block>) -> Result<NumberFor<Block>, client::error::Error> {
@@ -1062,7 +1101,7 @@ mod tests {
let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| {
let (changes_root, mut changes_trie_update) = prepare_changes(changes);
assert_eq!(backend.tries_change_storage.root(block), Ok(Some(changes_root)));
assert_eq!(backend.changes_tries_storage.root(block), Ok(Some(changes_root)));
for (key, (val, _)) in changes_trie_update.drain() {
assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val)));
@@ -1086,6 +1125,82 @@ mod tests {
check_changes(&backend, 2, changes2);
}
#[test]
fn changes_tries_are_pruned_on_finalization() {
let mut backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.min_blocks_to_keep = Some(8);
let config = ChangesTrieConfiguration {
digest_interval: 2,
digest_levels: 2,
};
// insert some blocks
let block0 = insert_header(&backend, 0, Default::default(), vec![(b"key_at_0".to_vec(), b"val_at_0".to_vec())], Default::default());
let block1 = insert_header(&backend, 1, block0, vec![(b"key_at_1".to_vec(), b"val_at_1".to_vec())], Default::default());
let block2 = insert_header(&backend, 2, block1, vec![(b"key_at_2".to_vec(), b"val_at_2".to_vec())], Default::default());
let block3 = insert_header(&backend, 3, block2, vec![(b"key_at_3".to_vec(), b"val_at_3".to_vec())], Default::default());
let block4 = insert_header(&backend, 4, block3, vec![(b"key_at_4".to_vec(), b"val_at_4".to_vec())], Default::default());
let block5 = insert_header(&backend, 5, block4, vec![(b"key_at_5".to_vec(), b"val_at_5".to_vec())], Default::default());
let block6 = insert_header(&backend, 6, block5, vec![(b"key_at_6".to_vec(), b"val_at_6".to_vec())], Default::default());
let block7 = insert_header(&backend, 7, block6, vec![(b"key_at_7".to_vec(), b"val_at_7".to_vec())], Default::default());
let block8 = insert_header(&backend, 8, block7, vec![(b"key_at_8".to_vec(), b"val_at_8".to_vec())], Default::default());
let block9 = insert_header(&backend, 9, block8, vec![(b"key_at_9".to_vec(), b"val_at_9".to_vec())], Default::default());
let block10 = insert_header(&backend, 10, block9, vec![(b"key_at_10".to_vec(), b"val_at_10".to_vec())], Default::default());
let block11 = insert_header(&backend, 11, block10, vec![(b"key_at_11".to_vec(), b"val_at_11".to_vec())], Default::default());
let _ = insert_header(&backend, 12, block11, vec![(b"key_at_12".to_vec(), b"val_at_12".to_vec())], Default::default());
// check that roots of all tries are in the columns::CHANGES_TRIE
fn read_changes_trie_root(backend: &Backend<Block>, num: u64) -> H256 {
backend.blockchain().header(BlockId::Number(num)).unwrap().unwrap().digest().logs().iter()
.find(|i| i.as_changes_trie_root().is_some()).unwrap().as_changes_trie_root().unwrap().clone()
}
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(12).unwrap(), Some(root12));
// now simulate finalization of block#12, causing prune of tries at #1..#4
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 12);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root1).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root2).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root3).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root4).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root7).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root8).unwrap().is_some());
// now simulate finalization of block#16, causing prune of tries at #5..#8
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 16);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root7).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root8).unwrap().is_none());
// now "change" pruning mode to archive && simulate finalization of block#20
// => no changes tries are pruned, because we never prune in archive mode
backend.changes_tries_storage.min_blocks_to_keep = None;
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config), &mut tx, 20);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root9).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root10).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root11).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root12).unwrap().is_some());
}
#[test]
fn tree_route_works() {
let backend = Backend::<Block>::new_test(1000, 100);