Transactional updates in the state-db (#1616)

This commit is contained in:
Arkadiy Paronyan
2019-01-30 16:58:25 +01:00
committed by Gav Wood
parent f98f9ac58a
commit 742f030ddd
7 changed files with 439 additions and 215 deletions
+158 -120
View File
@@ -625,119 +625,22 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
};
trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
let commit = self.storage.state_db.canonicalize_block(&hash);
let commit = self.storage.state_db.canonicalize_block(&hash)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
};
Ok(())
}
// write stuff to a transaction after a new block is finalized.
// this canonicalizes finalized blocks. Fails if called with a block which
// was not a child of the last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
f_header: &Block::Header,
f_hash: Block::Hash,
) -> Result<(), client::error::Error> where
Block: BlockT<Hash=H256>,
{
let f_num = f_header.number().clone();
if f_num.as_() > self.storage.state_db.best_canonical() {
let parent_hash = f_header.parent_hash().clone();
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone());
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
let commit = self.storage.state_db.canonicalize_block(&f_hash);
apply_state_commit(transaction, commit);
// read config from genesis, since it is readonly atm
use client::backend::Backend;
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}
Ok(())
}
}
fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
}
}
impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> client::error::Result<()> {
let mut transaction = DBTransaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
}
self.storage.db.write(transaction).map_err(db_err)?;
Ok(())
}
fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> {
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
}
}
impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
type State = CachingState<Blake2Hasher, DbState, Block>;
type ChangesTrieStorage = DbChangesTrieStorage<Block>;
fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> {
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
Ok(BlockImportOperation {
pending_block: None,
old_state,
db_updates: MemoryDB::default(),
storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(),
aux_ops: Vec::new(),
finalized_blocks: Vec::new(),
})
}
fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> {
operation.old_state = self.state_at(block)?;
Ok(())
}
fn commit_operation(&self, mut operation: Self::BlockImportOperation)
fn try_commit_operation(&self, mut operation: BlockImportOperation<Block, Blake2Hasher>)
-> Result<(), client::error::Error>
{
let mut transaction = DBTransaction::new();
operation.apply_aux(&mut transaction);
let mut meta_updates = Vec::new();
if !operation.finalized_blocks.is_empty() {
let mut meta_updates = Vec::new();
let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
for (block, justification) in operation.finalized_blocks {
let block_hash = self.blockchain.expect_block_hash_from_id(&block)?;
@@ -752,10 +655,6 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
)?);
last_finalized_hash = block_hash;
}
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
}
if let Some(pending_block) = operation.pending_block {
@@ -851,23 +750,28 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates);
let finalized = match pending_block.leaf_state {
NewBlockState::Final => true,
_ => false,
};
let header = &pending_block.header;
let is_best = pending_block.leaf_state.is_best();
let changes_trie_updates = operation.changes_trie_updates;
self.changes_tries_storage.commit(&mut transaction, changes_trie_updates);
if finalized {
// TODO: ensure best chain contains this block.
self.ensure_sequential_finalization(&pending_block.header, None)?;
self.note_finalized(&mut transaction, &pending_block.header, hash)?;
self.ensure_sequential_finalization(header, None)?;
self.note_finalized(&mut transaction, header, hash)?;
} else {
// canonicalize blocks which are old enough, regardless of finality.
self.force_delayed_canonicalize(&mut transaction, hash, *pending_block.header.number())?
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
}
let is_best = pending_block.leaf_state.is_best();
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);
{
@@ -886,10 +790,14 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
drop(leaves);
}
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
self.blockchain.update_meta(
hash.clone(),
number.clone(),
pending_block.leaf_state.is_best(),
is_best,
finalized,
);
@@ -902,25 +810,155 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
Some(number),
|| is_best
);
} else {
// No pending block, just write the transaction and apply meta changes
self.storage.db.write(transaction).map_err(db_err)?;
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
}
Ok(())
}
// write stuff to a transaction after a new block is finalized.
// this canonicalizes finalized blocks. Fails if called with a block which
// was not a child of the last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
f_header: &Block::Header,
f_hash: Block::Hash,
) -> Result<(), client::error::Error> where
Block: BlockT<Hash=H256>,
{
let f_num = f_header.number().clone();
if f_num.as_() > self.storage.state_db.best_canonical() {
let parent_hash = f_header.parent_hash().clone();
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone());
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
let commit = self.storage.state_db.canonicalize_block(&f_hash)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
// read config from genesis, since it is readonly atm
use client::backend::Backend;
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}
Ok(())
}
}
fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
}
}
impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> client::error::Result<()> {
let mut transaction = DBTransaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
}
self.storage.db.write(transaction).map_err(db_err)?;
Ok(())
}
fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> {
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
}
}
impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
type State = CachingState<Blake2Hasher, DbState, Block>;
type ChangesTrieStorage = DbChangesTrieStorage<Block>;
fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> {
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
Ok(BlockImportOperation {
pending_block: None,
old_state,
db_updates: MemoryDB::default(),
storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(),
aux_ops: Vec::new(),
finalized_blocks: Vec::new(),
})
}
fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> {
operation.old_state = self.state_at(block)?;
Ok(())
}
fn commit_operation(&self, operation: Self::BlockImportOperation)
-> Result<(), client::error::Error>
{
match self.try_commit_operation(operation) {
Ok(_) => {
self.storage.state_db.apply_pending();
Ok(())
},
e @ Err(_) => {
self.storage.state_db.revert_pending();
e
}
}
}
fn finalize_block(&self, block: BlockId<Block>, justification: Option<Justification>)
-> Result<(), client::error::Error>
{
let mut transaction = DBTransaction::new();
let hash = self.blockchain.expect_block_hash_from_id(&block)?;
let header = self.blockchain.expect_header(block)?;
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
let commit = || {
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
Ok(())
};
match commit() {
Ok(()) => self.storage.state_db.apply_pending(),
e @ Err(_) => {
self.storage.state_db.revert_pending();
return e;
}
}
Ok(())
}
+46 -13
View File
@@ -72,8 +72,12 @@ pub enum Error<E: fmt::Debug> {
Db(E),
/// `Codec` decoding error.
Decoding,
/// NonCanonical error.
NonCanonical,
/// Trying to canonicalize invalid block.
InvalidBlock,
/// Trying to insert block with invalid number.
InvalidBlockNumber,
/// Trying to insert block with unknown parent.
InvalidParent,
}
impl<E: fmt::Debug> fmt::Debug for Error<E> {
@@ -81,7 +85,9 @@ impl<E: fmt::Debug> fmt::Debug for Error<E> {
match self {
Error::Db(e) => e.fmt(f),
Error::Decoding => write!(f, "Error decoding slicable value"),
Error::NonCanonical => write!(f, "Error processing non-canonical data"),
Error::InvalidBlock => write!(f, "Trying to canonicalize invalid block"),
Error::InvalidBlockNumber => write!(f, "Trying to insert block with invalid number"),
Error::InvalidParent => write!(f, "Trying to insert block with unknown parent"),
}
}
}
@@ -205,27 +211,25 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
}
}
pub fn canonicalize_block(&mut self, hash: &BlockHash) -> CommitSet<Key> {
// clear the temporary overlay from the previous canonicalization.
self.non_canonical.clear_overlay();
pub fn canonicalize_block<E: fmt::Debug>(&mut self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> {
let mut commit = match self.mode {
PruningMode::ArchiveAll => {
CommitSet::default()
},
PruningMode::ArchiveCanonical => {
let mut commit = self.non_canonical.canonicalize(hash);
let mut commit = self.non_canonical.canonicalize(hash)?;
commit.data.deleted.clear();
commit
},
PruningMode::Constrained(_) => {
self.non_canonical.canonicalize(hash)
self.non_canonical.canonicalize(hash)?
},
};
if let Some(ref mut pruning) = self.pruning {
pruning.note_canonical(hash, &mut commit);
}
self.prune(&mut commit);
commit
Ok(commit)
}
pub fn best_canonical(&self) -> u64 {
@@ -284,6 +288,20 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
}
db.get(key).map_err(|e| Error::Db(e))
}
pub fn apply_pending(&mut self) {
self.non_canonical.apply_pending();
if let Some(pruning) = &mut self.pruning {
pruning.apply_pending();
}
}
pub fn revert_pending(&mut self) {
if let Some(pruning) = &mut self.pruning {
pruning.revert_pending();
}
self.non_canonical.revert_pending();
}
}
/// State DB maintenance. See module description.
@@ -306,7 +324,7 @@ impl<BlockHash: Hash, Key: Hash> StateDb<BlockHash, Key> {
}
/// Finalize a previously inserted block.
pub fn canonicalize_block(&self, hash: &BlockHash) -> CommitSet<Key> {
pub fn canonicalize_block<E: fmt::Debug>(&self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> {
self.db.write().canonicalize_block(hash)
}
@@ -341,6 +359,16 @@ impl<BlockHash: Hash, Key: Hash> StateDb<BlockHash, Key> {
pub fn is_pruned(&self, number: u64) -> bool {
return self.db.read().is_pruned(number)
}
/// Apply all pending changes
pub fn apply_pending(&self) {
self.db.write().apply_pending();
}
/// Revert all pending changes
pub fn revert_pending(&self) {
self.db.write().revert_pending();
}
}
#[cfg(test)]
@@ -394,7 +422,9 @@ mod tests {
)
.unwrap(),
);
db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(1)));
state_db.apply_pending();
db.commit(&state_db.canonicalize_block::<io::Error>(&H256::from_low_u64_be(1)).unwrap());
state_db.apply_pending();
db.commit(
&state_db
.insert_block::<io::Error>(
@@ -405,8 +435,11 @@ mod tests {
)
.unwrap(),
);
db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(21)));
db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(3)));
state_db.apply_pending();
db.commit(&state_db.canonicalize_block::<io::Error>(&H256::from_low_u64_be(21)).unwrap());
state_db.apply_pending();
db.commit(&state_db.canonicalize_block::<io::Error>(&H256::from_low_u64_be(3)).unwrap());
state_db.apply_pending();
(db, state_db)
}
+170 -65
View File
@@ -17,8 +17,8 @@
//! Canonicalization window.
//! Maintains trees of block overlays and allows discarding trees/roots
//! The overlays are added in `insert` and removed in `canonicalize`.
//! Last canonicalized overlay is kept in memory until next call to `canonicalize` or
//! `clear_overlay`
//! All pending changes are kept in memory until next call to `apply_pending` or
//! `revert_pending`
use std::fmt;
use std::collections::{HashMap, VecDeque};
@@ -35,7 +35,8 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
last_canonicalized: Option<(BlockHash, u64)>,
levels: VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
parents: HashMap<BlockHash, BlockHash>,
last_canonicalized_overlay: HashMap<Key, DBValue>,
pending_canonicalizations: Vec<BlockHash>,
pending_insertions: Vec<BlockHash>,
}
#[derive(Encode, Decode)]
@@ -109,39 +110,45 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
last_canonicalized,
levels,
parents,
last_canonicalized_overlay: Default::default(),
pending_canonicalizations: Default::default(),
pending_insertions: Default::default(),
})
}
/// Insert a new block into the overlay. If inserted on the second level or lover expects parent to be present in the window.
pub fn insert<E: fmt::Debug>(&mut self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, changeset: ChangeSet<Key>) -> Result<CommitSet<Key>, Error<E>> {
let mut commit = CommitSet::default();
let front_block_number = self.pending_front_block_number();
if self.levels.is_empty() && self.last_canonicalized.is_none() {
if number < 1 {
return Err(Error::NonCanonical);
return Err(Error::InvalidBlockNumber);
}
// assume that parent was canonicalized
let last_canonicalized = (parent_hash.clone(), number - 1);
commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), last_canonicalized.encode()));
self.last_canonicalized = Some(last_canonicalized);
} else if self.last_canonicalized.is_some() {
if number < self.front_block_number() || number >= self.front_block_number() + self.levels.len() as u64 + 1 {
return Err(Error::NonCanonical);
if number < front_block_number || number >= front_block_number + self.levels.len() as u64 + 1 {
trace!(target: "state-db", "Failed to insert block {}, current is {} .. {})",
number,
front_block_number,
front_block_number + self.levels.len() as u64,
);
return Err(Error::InvalidBlockNumber);
}
// check for valid parent if inserting on second level or higher
if number == self.front_block_number() {
if number == front_block_number {
if !self.last_canonicalized.as_ref().map_or(false, |&(ref h, n)| h == parent_hash && n == number - 1) {
return Err(Error::NonCanonical);
return Err(Error::InvalidParent);
}
} else if !self.parents.contains_key(&parent_hash) {
return Err(Error::NonCanonical);
return Err(Error::InvalidParent);
}
}
let level = if self.levels.is_empty() || number == self.front_block_number() + self.levels.len() as u64 {
let level = if self.levels.is_empty() || number == front_block_number + self.levels.len() as u64 {
self.levels.push_back(Vec::new());
self.levels.back_mut().expect("can't be empty after insertion; qed")
} else {
let front_block_number = self.front_block_number();
self.levels.get_mut((number - front_block_number) as usize)
.expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed")
};
@@ -166,85 +173,118 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len());
let journal_record = journal_record.encode();
commit.meta.inserted.push((journal_key, journal_record));
self.pending_insertions.push(hash.clone());
Ok(commit)
}
fn discard(
levels: &mut [Vec<BlockOverlay<BlockHash, Key>>],
fn discard_descendants(
levels: &mut VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
index: usize,
parents: &mut HashMap<BlockHash, BlockHash>,
discarded_journals: &mut Vec<Vec<u8>>,
number: u64,
hash: &BlockHash,
) {
if let Some((level, sublevels)) = levels.split_first_mut() {
let mut discarded = Vec::new();
if let Some(level) = levels.get_mut(index) {
level.retain(|ref overlay| {
let parent = parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
if parent == *hash {
parents.remove(&overlay.hash);
discarded_journals.push(overlay.journal_key.clone());
Self::discard(sublevels, parents, discarded_journals, number + 1, &overlay.hash);
discarded.push(overlay.hash.clone());
false
} else {
true
}
});
}
for hash in discarded.into_iter() {
Self::discard_descendants(levels, index + 1, parents, &hash);
}
}
fn discard_journals(&self, level_index: usize, discarded_journals: &mut Vec<Vec<u8>>, hash: &BlockHash) {
if let Some(level) = self.levels.get(level_index) {
level.iter().for_each(|overlay| {
let parent = self.parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
if parent == *hash {
discarded_journals.push(overlay.journal_key.clone());
self.discard_journals(level_index + 1, discarded_journals, &overlay.hash);
}
});
}
}
fn front_block_number(&self) -> u64 {
self.last_canonicalized.as_ref().map(|&(_, n)| n + 1).unwrap_or(0)
}
fn pending_front_block_number(&self) -> u64 {
self.last_canonicalized
.as_ref()
.map(|&(_, n)| n + 1 + self.pending_canonicalizations.len() as u64)
.unwrap_or(0)
}
pub fn last_canonicalized_block_number(&self) -> u64 {
self.last_canonicalized.as_ref().map(|&(_, n)| n).unwrap_or(0)
}
/// This may be called when the last finalization commit was applied to the database.
pub fn clear_overlay(&mut self) {
self.last_canonicalized_overlay.clear();
}
/// Select a top-level root and canonicalized it. Discards all sibling subtrees and the root.
/// Returns a set of changes that need to be added to the DB.
pub fn canonicalize(&mut self, hash: &BlockHash) -> CommitSet<Key> {
pub fn canonicalize<E: fmt::Debug>(&mut self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> {
trace!(target: "state-db", "Canonicalizing {:?}", hash);
let level = self.levels.pop_front().expect("no blocks to canonicalize");
let index = level.iter().position(|overlay| overlay.hash == *hash)
.expect("attempting to canonicalize unknown block");
let level = self.levels.get(self.pending_canonicalizations.len()).ok_or_else(|| Error::InvalidBlock)?;
let index = level
.iter()
.position(|overlay| overlay.hash == *hash)
.ok_or_else(|| Error::InvalidBlock)?;
let mut commit = CommitSet::default();
let mut discarded_journals = Vec::new();
for (i, overlay) in level.into_iter().enumerate() {
self.parents.remove(&overlay.hash);
if i == index {
self.last_canonicalized_overlay = overlay.values;
// that's the one we need to canonicalize
commit.data.inserted = self.last_canonicalized_overlay.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
commit.data.deleted = overlay.deleted;
commit.data.inserted = overlay.values.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
commit.data.deleted = overlay.deleted.clone();
} else {
// borrow checker won't allow us to split out mutable references
// required for recursive processing. A more efficient implementation
// that does not require converting to vector is possible
let mut vec: Vec<_> = self.levels.drain(..).collect();
Self::discard(&mut vec, &mut self.parents, &mut discarded_journals, 0, &overlay.hash);
self.levels.extend(vec.into_iter());
self.discard_journals(self.pending_canonicalizations.len() + 1, &mut discarded_journals, &overlay.hash);
}
// cleanup journal entry
discarded_journals.push(overlay.journal_key);
discarded_journals.push(overlay.journal_key.clone());
}
commit.meta.deleted.append(&mut discarded_journals);
let last_canonicalized = (hash.clone(), self.front_block_number());
commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), last_canonicalized.encode()));
self.last_canonicalized = Some(last_canonicalized);
trace!(target: "state-db", "Discarded {} records", commit.meta.deleted.len());
commit
let canonicalized = (hash.clone(), self.front_block_number() + self.pending_canonicalizations.len() as u64);
commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), canonicalized.encode()));
trace!(target: "state-db", "Discarding {} records", commit.meta.deleted.len());
self.pending_canonicalizations.push(hash.clone());
Ok(commit)
}
fn apply_canonicalizations(&mut self) {
let last = self.pending_canonicalizations.last().cloned();
let count = self.pending_canonicalizations.len() as u64;
for hash in self.pending_canonicalizations.drain(..) {
trace!(target: "state-db", "Post canonicalizing {:?}", hash);
let level = self.levels.pop_front().expect("Hash validity is checked in `canonicalize`");
let index = level
.iter()
.position(|overlay| overlay.hash == hash)
.expect("Hash validity is checked in `canonicalize`");
// discard unfinalized overlays
for (i, overlay) in level.into_iter().enumerate() {
self.parents.remove(&overlay.hash);
if i != index {
Self::discard_descendants(&mut self.levels, 0, &mut self.parents, &overlay.hash);
}
}
}
if let Some(hash) = last {
let last_canonicalized = (hash, self.last_canonicalized_block_number() + count);
self.last_canonicalized = Some(last_canonicalized);
}
}
/// Get a value from the node overlay. This searches in every existing changeset.
pub fn get(&self, key: &Key) -> Option<DBValue> {
if let Some(value) = self.last_canonicalized_overlay.get(&key) {
return Some(value.clone());
}
for level in self.levels.iter() {
for overlay in level.iter() {
if let Some(value) = overlay.values.get(&key) {
@@ -266,14 +306,44 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
commit
})
}
fn revert_insertions(&mut self) {
self.pending_insertions.reverse();
for hash in self.pending_insertions.drain(..) {
self.parents.remove(&hash);
// find a level. When iterating insertions backwards the hash is always last in the level.
let level_index =
self.levels.iter().position(|level|
level.last().expect("Hash is added in `insert` in reverse order").hash == hash)
.expect("Hash is added in insert");
self.levels[level_index].pop();
if self.levels[level_index].is_empty() {
debug_assert_eq!(level_index, self.levels.len() - 1);
self.levels.pop_back();
}
}
}
/// Apply all pending changes
pub fn apply_pending(&mut self) {
self.apply_canonicalizations();
self.pending_insertions.clear();
}
/// Revert all pending changes
pub fn revert_pending(&mut self) {
self.pending_canonicalizations.clear();
self.revert_insertions();
}
}
#[cfg(test)]
mod tests {
use std::io;
use super::NonCanonicalOverlay;
use crate::ChangeSet;
use primitives::H256;
use super::{NonCanonicalOverlay, to_journal_key};
use crate::ChangeSet;
use crate::test::{make_db, make_changeset};
fn contains(overlay: &NonCanonicalOverlay<H256, H256>, key: u64) -> bool {
@@ -294,7 +364,7 @@ mod tests {
fn canonicalize_empty_panics() {
let db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
overlay.canonicalize(&H256::default());
overlay.canonicalize::<io::Error>(&H256::default()).unwrap();
}
#[test]
@@ -338,7 +408,7 @@ mod tests {
let db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
overlay.insert::<io::Error>(&h1, 1, &H256::default(), ChangeSet::default()).unwrap();
overlay.canonicalize(&h2);
overlay.canonicalize::<io::Error>(&h2).unwrap();
}
#[test]
@@ -353,7 +423,7 @@ mod tests {
assert_eq!(insertion.meta.inserted.len(), 2);
assert_eq!(insertion.meta.deleted.len(), 0);
db.commit(&insertion);
let finalization = overlay.canonicalize(&h1);
let finalization = overlay.canonicalize::<io::Error>(&h1).unwrap();
assert_eq!(finalization.data.inserted.len(), changeset.inserted.len());
assert_eq!(finalization.data.deleted.len(), changeset.deleted.len());
assert_eq!(finalization.meta.inserted.len(), 1);
@@ -386,7 +456,8 @@ mod tests {
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
db.commit(&overlay.insert::<io::Error>(&h1, 10, &H256::default(), make_changeset(&[3, 4], &[2])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h2, 11, &h1, make_changeset(&[5], &[3])).unwrap());
db.commit(&overlay.canonicalize(&h1));
db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap());
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
let overlay2 = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
@@ -410,15 +481,17 @@ mod tests {
assert!(contains(&overlay, 5));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 2);
db.commit(&overlay.canonicalize(&h1));
db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap());
assert!(contains(&overlay, 5));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 2);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 1);
assert!(contains(&overlay, 5));
overlay.clear_overlay();
assert!(!contains(&overlay, 5));
assert!(contains(&overlay, 7));
db.commit(&overlay.canonicalize(&h2));
overlay.clear_overlay();
db.commit(&overlay.canonicalize::<io::Error>(&h2).unwrap());
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8])));
@@ -427,6 +500,7 @@ mod tests {
#[test]
fn complex_tree() {
use crate::MetaDb;
let mut db = make_db(&[]);
// - 1 - 1_1 - 1_1_1
@@ -487,8 +561,8 @@ mod tests {
assert_eq!(overlay.last_canonicalized, overlay2.last_canonicalized);
// canonicalize 1. 2 and all its children should be discarded
db.commit(&overlay.canonicalize(&h_1));
overlay.clear_overlay();
db.commit(&overlay.canonicalize::<io::Error>(&h_1).unwrap());
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 6);
assert!(!contains(&overlay, 1));
@@ -497,10 +571,17 @@ mod tests {
assert!(!contains(&overlay, 22));
assert!(!contains(&overlay, 211));
assert!(contains(&overlay, 111));
assert!(!contains(&overlay, 211));
// check that journals are deleted
assert!(db.get_meta(&to_journal_key(1, 0)).unwrap().is_none());
assert!(db.get_meta(&to_journal_key(1, 1)).unwrap().is_none());
assert!(db.get_meta(&to_journal_key(2, 1)).unwrap().is_some());
assert!(db.get_meta(&to_journal_key(2, 2)).unwrap().is_none());
assert!(db.get_meta(&to_journal_key(2, 3)).unwrap().is_none());
// canonicalize 1_2. 1_1 and all its children should be discarded
db.commit(&overlay.canonicalize(&h_1_2));
overlay.clear_overlay();
db.commit(&overlay.canonicalize::<io::Error>(&h_1_2).unwrap());
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 3);
assert!(!contains(&overlay, 11));
@@ -510,8 +591,8 @@ mod tests {
assert!(contains(&overlay, 123));
// canonicalize 1_2_2
db.commit(&overlay.canonicalize(&h_1_2_2));
overlay.clear_overlay();
db.commit(&overlay.canonicalize::<io::Error>(&h_1_2_2).unwrap());
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
assert!(db.data_eq(&make_db(&[1, 12, 122])));
@@ -540,5 +621,29 @@ mod tests {
assert!(overlay.revert_one().is_none());
}
#[test]
fn revert_pending_insertion() {
let h1 = H256::random();
let h2_1 = H256::random();
let h2_2 = H256::random();
let db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
let changeset1 = make_changeset(&[5, 6], &[2]);
let changeset2 = make_changeset(&[7, 8], &[5, 3]);
let changeset3 = make_changeset(&[9], &[]);
overlay.insert::<io::Error>(&h1, 1, &H256::default(), changeset1).unwrap();
assert!(contains(&overlay, 5));
overlay.insert::<io::Error>(&h2_1, 2, &h1, changeset2).unwrap();
overlay.insert::<io::Error>(&h2_2, 2, &h1, changeset3).unwrap();
assert!(contains(&overlay, 7));
assert!(contains(&overlay, 5));
assert!(contains(&overlay, 9));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 3);
overlay.revert_pending();
assert!(!contains(&overlay, 5));
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
}
}
+64 -17
View File
@@ -23,10 +23,11 @@
//! The changes are journaled in the DB.
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use crate::codec::{Encode, Decode};
use parity_codec_derive::{Encode, Decode};
use crate::{CommitSet, Error, MetaDb, to_meta_key, Hash};
use log::trace;
use parity_codec_derive::{Encode, Decode};
use log::{trace, warn};
const LAST_PRUNED: &[u8] = b"last_pruned";
const PRUNING_JOURNAL: &[u8] = b"pruning_journal";
@@ -36,6 +37,8 @@ pub struct RefWindow<BlockHash: Hash, Key: Hash> {
death_rows: VecDeque<DeathRow<BlockHash, Key>>,
death_index: HashMap<Key, u64>,
pending_number: u64,
pending_records: Vec<(u64, JournalRecord<BlockHash, Key>)>,
pending_prunings: usize,
}
#[derive(Debug, PartialEq, Eq)]
@@ -69,6 +72,8 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
death_rows: Default::default(),
death_index: Default::default(),
pending_number: pending_number,
pending_records: Default::default(),
pending_prunings: 0,
};
// read the journal
trace!(target: "state-db", "Reading pruning journal. Pending #{}", pending_number);
@@ -110,11 +115,11 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
}
pub fn window_size(&self) -> u64 {
self.death_rows.len() as u64
(self.death_rows.len() + self.pending_records.len() - self.pending_prunings) as u64
}
pub fn next_hash(&self) -> Option<BlockHash> {
self.death_rows.front().map(|r| r.hash.clone())
self.death_rows.get(self.pending_prunings).map(|r| r.hash.clone())
}
pub fn mem_used(&self) -> usize {
@@ -122,20 +127,28 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
}
pub fn pending(&self) -> u64 {
self.pending_number
self.pending_number + self.pending_prunings as u64
}
/// Prune next block. Expects at least one block in the window. Adds changes to `commit`.
pub fn prune_one(&mut self, commit: &mut CommitSet<Key>) {
let pruned = self.death_rows.pop_front().expect("prune_one is only called with a non-empty window");
trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
for k in pruned.deleted.iter() {
self.death_index.remove(&k);
if let Some(pruned) = self.death_rows.get(self.pending_prunings) {
trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
let index = self.pending_number + self.pending_prunings as u64;
commit.data.deleted.extend(pruned.deleted.iter().cloned());
commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), index.encode()));
commit.meta.deleted.push(pruned.journal_key.clone());
self.pending_prunings += 1;
} else if let Some((block, pruned)) = self.pending_records.get(self.pending_prunings - self.death_rows.len()) {
trace!(target: "state-db", "Pruning pending{:?} ({} deleted)", pruned.hash, pruned.deleted.len());
commit.data.deleted.extend(pruned.deleted.iter().cloned());
commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), block.encode()));
let journal_key = to_journal_key(*block);
commit.meta.deleted.push(journal_key);
self.pending_prunings += 1;
} else {
warn!(target: "state-db", "Trying to prune when there's nothing to prune");
}
commit.data.deleted.extend(pruned.deleted.into_iter());
commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), self.pending_number.encode()));
commit.meta.deleted.push(pruned.journal_key);
self.pending_number += 1;
}
/// Add a change set to the window. Creates a journal record and pushes it to `commit`
@@ -148,11 +161,34 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
inserted,
deleted,
};
let block = self.pending_number + self.window_size();
let block = self.pending_number + self.window_size() as u64;
let journal_key = to_journal_key(block);
commit.meta.inserted.push((journal_key.clone(), journal_record.encode()));
self.pending_records.push((block, journal_record));
}
self.import(hash, journal_key, journal_record.inserted.into_iter(), journal_record.deleted);
/// Apply all pending changes
pub fn apply_pending(&mut self) {
for (block, journal_record) in mem::replace(&mut self.pending_records, Default::default()).into_iter() {
trace!(target: "state-db", "Applying pruning window record: {}: {:?}", block, journal_record.hash);
let journal_key = to_journal_key(block);
self.import(&journal_record.hash, journal_key, journal_record.inserted.into_iter(), journal_record.deleted);
}
for _ in 0 .. self.pending_prunings {
let pruned = self.death_rows.pop_front().expect("pending_prunings is always < death_rows.len()");
trace!(target: "state-db", "Applying pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
for k in pruned.deleted.iter() {
self.death_index.remove(&k);
}
self.pending_number += 1;
}
self.pending_prunings = 0;
}
/// Revert all pending changes
pub fn revert_pending(&mut self) {
self.pending_records.clear();
self.pending_prunings = 0;
}
}
@@ -180,12 +216,16 @@ mod tests {
}
#[test]
#[should_panic]
fn prune_empty_panics() {
fn prune_empty() {
let db = make_db(&[]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
assert_eq!(pruning.pending_number, 0);
assert!(pruning.death_rows.is_empty());
assert!(pruning.death_index.is_empty());
assert!(pruning.pending_prunings == 0);
assert!(pruning.pending_records.is_empty());
}
#[test]
@@ -196,6 +236,7 @@ mod tests {
let h = H256::random();
pruning.note_canonical(&h, &mut commit);
db.commit(&commit);
pruning.apply_pending();
assert!(commit.data.deleted.is_empty());
assert_eq!(pruning.death_rows.len(), 1);
assert_eq!(pruning.death_index.len(), 2);
@@ -205,6 +246,7 @@ mod tests {
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
pruning.apply_pending();
assert!(db.data_eq(&make_db(&[2, 4, 5])));
assert!(pruning.death_rows.is_empty());
assert!(pruning.death_index.is_empty());
@@ -221,6 +263,7 @@ mod tests {
let mut commit = make_commit(&[5], &[2]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
pruning.apply_pending();
assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
check_journal(&pruning, &db);
@@ -228,10 +271,12 @@ mod tests {
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
pruning.apply_pending();
assert!(db.data_eq(&make_db(&[2, 3, 4, 5])));
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
pruning.apply_pending();
assert!(db.data_eq(&make_db(&[3, 4, 5])));
assert_eq!(pruning.pending_number, 2);
}
@@ -250,6 +295,7 @@ mod tests {
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 2, 3])));
pruning.apply_pending();
check_journal(&pruning, &db);
@@ -264,6 +310,7 @@ mod tests {
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 3])));
pruning.apply_pending();
assert_eq!(pruning.pending_number, 3);
}
}
+1
View File
@@ -46,6 +46,7 @@ impl HashDb for TestDb {
impl TestDb {
pub fn commit(&mut self, commit: &CommitSet<H256>) {
self.data.extend(commit.data.inserted.iter().cloned());
self.meta.extend(commit.meta.inserted.iter().cloned());
for k in commit.data.deleted.iter() {
self.data.remove(k);
}