State-db refactoring (#12239)

* Prune discarded blocks immediately

* state-db refactoring part 1

* Some renames

* Get rid of pending state

* Revert "Prune discarded blocks immediately"

This reverts commit b60d98c569e8af18d99087da93f0911d4f24006e.

* Cleanup

* Make clippy happy

* Minor changes
This commit is contained in:
Arkadiy Paronyan
2022-11-08 11:58:02 +01:00
committed by GitHub
parent ec6a428ade
commit 617fa6c0ba
4 changed files with 181 additions and 557 deletions
+51 -194
View File
@@ -19,8 +19,6 @@
//! Canonicalization window.
//! Maintains trees of block overlays and allows discarding trees/roots
//! The overlays are added in `insert` and removed in `canonicalize`.
//! All pending changes are kept in memory until next call to `apply_pending` or
//! `revert_pending`
use super::{to_meta_key, ChangeSet, CommitSet, DBValue, Error, Hash, MetaDb, StateDbError};
use codec::{Decode, Encode};
@@ -37,8 +35,6 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
last_canonicalized: Option<(BlockHash, u64)>,
levels: VecDeque<OverlayLevel<BlockHash, Key>>,
parents: HashMap<BlockHash, BlockHash>,
pending_canonicalizations: Vec<BlockHash>,
pending_insertions: Vec<BlockHash>,
values: HashMap<Key, (u32, DBValue)>, // ref counted
// would be deleted but kept around because block is pinned, ref counted.
pinned: HashMap<BlockHash, u32>,
@@ -229,8 +225,6 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
last_canonicalized,
levels,
parents,
pending_canonicalizations: Default::default(),
pending_insertions: Default::default(),
pinned: Default::default(),
pinned_insertions: Default::default(),
values,
@@ -316,9 +310,8 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
deleted: changeset.deleted,
};
commit.meta.inserted.push((journal_key, journal_record.encode()));
trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len());
trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} {:?} ({} inserted, {} deleted)", number, index, hash, journal_record.inserted.len(), journal_record.deleted.len());
insert_values(&mut self.values, journal_record.inserted);
self.pending_insertions.push(hash.clone());
Ok(commit)
}
@@ -355,24 +348,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
}
pub fn last_canonicalized_block_number(&self) -> Option<u64> {
match self.last_canonicalized.as_ref().map(|&(_, n)| n) {
Some(n) => Some(n + self.pending_canonicalizations.len() as u64),
None if !self.pending_canonicalizations.is_empty() =>
Some(self.pending_canonicalizations.len() as u64),
_ => None,
}
}
pub fn last_canonicalized_hash(&self) -> Option<BlockHash> {
self.last_canonicalized.as_ref().map(|&(ref h, _)| h.clone())
}
pub fn top_level(&self) -> Vec<(BlockHash, u64)> {
let start = self.last_canonicalized_block_number().unwrap_or(0);
self.levels
.get(self.pending_canonicalizations.len())
.map(|level| level.blocks.iter().map(|r| (r.hash.clone(), start)).collect())
.unwrap_or_default()
self.last_canonicalized.as_ref().map(|&(_, n)| n)
}
/// Select a top-level root and canonicalized it. Discards all sibling subtrees and the root.
@@ -384,10 +360,10 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
commit: &mut CommitSet<Key>,
) -> Result<u64, StateDbError> {
trace!(target: "state-db", "Canonicalizing {:?}", hash);
let level = self
.levels
.get(self.pending_canonicalizations.len())
.ok_or(StateDbError::InvalidBlock)?;
let level = match self.levels.pop_front() {
Some(level) => level,
None => return Err(StateDbError::InvalidBlock),
};
let index = level
.blocks
.iter()
@@ -396,91 +372,63 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
let mut discarded_journals = Vec::new();
let mut discarded_blocks = Vec::new();
for (i, overlay) in level.blocks.iter().enumerate() {
if i != index {
for (i, overlay) in level.blocks.into_iter().enumerate() {
let mut pinned_children = 0;
// That's the one we need to canonicalize
if i == index {
commit.data.inserted.extend(overlay.inserted.iter().map(|k| {
(
k.clone(),
self.values
.get(k)
.expect("For each key in overlays there's a value in values")
.1
.clone(),
)
}));
commit.data.deleted.extend(overlay.deleted.clone());
} else {
// Discard this overlay
self.discard_journals(
self.pending_canonicalizations.len() + 1,
0,
&mut discarded_journals,
&mut discarded_blocks,
&overlay.hash,
);
pinned_children = discard_descendants(
&mut self.levels.as_mut_slices(),
&mut self.values,
&mut self.parents,
&self.pinned,
&mut self.pinned_insertions,
&overlay.hash,
);
}
if self.pinned.contains_key(&overlay.hash) {
pinned_children += 1;
}
if pinned_children != 0 {
self.pinned_insertions
.insert(overlay.hash.clone(), (overlay.inserted, pinned_children));
} else {
self.parents.remove(&overlay.hash);
discard_values(&mut self.values, overlay.inserted);
}
discarded_journals.push(overlay.journal_key.clone());
discarded_blocks.push(overlay.hash.clone());
}
// get the one we need to canonicalize
let overlay = &level.blocks[index];
commit.data.inserted.extend(overlay.inserted.iter().map(|k| {
(
k.clone(),
self.values
.get(k)
.expect("For each key in overlays there's a value in values")
.1
.clone(),
)
}));
commit.data.deleted.extend(overlay.deleted.clone());
commit.meta.deleted.append(&mut discarded_journals);
let canonicalized =
(hash.clone(), self.front_block_number() + self.pending_canonicalizations.len() as u64);
let canonicalized = (hash.clone(), self.front_block_number());
commit
.meta
.inserted
.push((to_meta_key(LAST_CANONICAL, &()), canonicalized.encode()));
trace!(target: "state-db", "Discarding {} records", commit.meta.deleted.len());
self.pending_canonicalizations.push(hash.clone());
Ok(canonicalized.1)
}
fn apply_canonicalizations(&mut self) {
let last = self.pending_canonicalizations.last().cloned();
let count = self.pending_canonicalizations.len() as u64;
for hash in self.pending_canonicalizations.drain(..) {
trace!(target: "state-db", "Post canonicalizing {:?}", hash);
let level =
self.levels.pop_front().expect("Hash validity is checked in `canonicalize`");
let index = level
.blocks
.iter()
.position(|overlay| overlay.hash == hash)
.expect("Hash validity is checked in `canonicalize`");
// discard unfinalized overlays and values
for (i, overlay) in level.blocks.into_iter().enumerate() {
let mut pinned_children = if i != index {
discard_descendants(
&mut self.levels.as_mut_slices(),
&mut self.values,
&mut self.parents,
&self.pinned,
&mut self.pinned_insertions,
&overlay.hash,
)
} else {
0
};
if self.pinned.contains_key(&overlay.hash) {
pinned_children += 1;
}
if pinned_children != 0 {
self.pinned_insertions
.insert(overlay.hash.clone(), (overlay.inserted, pinned_children));
} else {
self.parents.remove(&overlay.hash);
discard_values(&mut self.values, overlay.inserted);
}
}
}
if let Some(hash) = last {
let last_canonicalized = (
hash,
self.last_canonicalized.as_ref().map(|(_, n)| n + count).unwrap_or(count - 1),
);
self.last_canonicalized = Some(last_canonicalized);
}
let num = canonicalized.1;
self.last_canonicalized = Some(canonicalized);
Ok(num)
}
/// Get a value from the node overlay. This searches in every existing changeset.
@@ -494,8 +442,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
/// Check if the block is in the canonicalization queue.
pub fn have_block(&self, hash: &BlockHash) -> bool {
(self.parents.contains_key(hash) || self.pending_insertions.contains(hash)) &&
!self.pending_canonicalizations.contains(hash)
self.parents.contains_key(hash)
}
/// Revert a single level. Returns commit set that deletes the journal or `None` if not
@@ -543,50 +490,8 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
}
}
fn revert_insertions(&mut self) {
self.pending_insertions.reverse();
for hash in self.pending_insertions.drain(..) {
self.parents.remove(&hash);
// find a level. When iterating insertions backwards the hash is always last in the
// level.
let level_index = self
.levels
.iter()
.position(|level| {
level.blocks.last().expect("Hash is added in `insert` in reverse order").hash ==
hash
})
.expect("Hash is added in insert");
let overlay_index = self.levels[level_index].blocks.len() - 1;
let overlay = self.levels[level_index].remove(overlay_index);
discard_values(&mut self.values, overlay.inserted);
if self.levels[level_index].blocks.is_empty() {
debug_assert_eq!(level_index, self.levels.len() - 1);
self.levels.pop_back();
}
}
}
/// Apply all pending changes
pub fn apply_pending(&mut self) {
self.apply_canonicalizations();
self.pending_insertions.clear();
}
/// Revert all pending changes
pub fn revert_pending(&mut self) {
self.pending_canonicalizations.clear();
self.revert_insertions();
}
/// Pin state values in memory
pub fn pin(&mut self, hash: &BlockHash) {
if self.pending_insertions.contains(hash) {
// Pinning pending state is not implemented. Pending states
// won't be pruned for quite some time anyway, so it's not a big deal.
return
}
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash);
@@ -779,7 +684,6 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h1, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
let overlay2 = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
@@ -806,18 +710,13 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h1, &mut commit).unwrap();
db.commit(&commit);
assert!(contains(&overlay, 5));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 2);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 1);
assert!(!contains(&overlay, 5));
assert!(contains(&overlay, 7));
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 1);
let mut commit = CommitSet::default();
overlay.canonicalize(&h2, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8])));
@@ -836,13 +735,11 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h_1, &mut commit).unwrap();
db.commit(&commit);
assert!(contains(&overlay, 1));
overlay.apply_pending();
assert!(!contains(&overlay, 1));
}
#[test]
fn insert_with_pending_canonicalization() {
fn insert_and_canonicalize() {
let h1 = H256::random();
let h2 = H256::random();
let h3 = H256::random();
@@ -851,13 +748,11 @@ mod tests {
let changeset = make_changeset(&[], &[]);
db.commit(&overlay.insert(&h1, 1, &H256::default(), changeset.clone()).unwrap());
db.commit(&overlay.insert(&h2, 2, &h1, changeset.clone()).unwrap());
overlay.apply_pending();
let mut commit = CommitSet::default();
overlay.canonicalize(&h1, &mut commit).unwrap();
overlay.canonicalize(&h2, &mut commit).unwrap();
db.commit(&commit);
db.commit(&overlay.insert(&h3, 3, &h2, changeset.clone()).unwrap());
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
}
@@ -927,7 +822,6 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h_1, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 6);
assert!(!contains(&overlay, 1));
@@ -948,7 +842,6 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h_1_2, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 3);
assert!(!contains(&overlay, 11));
@@ -965,7 +858,6 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h_1_2_2, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
assert!(db.data_eq(&make_db(&[1, 12, 122])));
@@ -994,31 +886,6 @@ mod tests {
assert!(overlay.revert_one().is_none());
}
#[test]
fn revert_pending_insertion() {
let h1 = H256::random();
let h2_1 = H256::random();
let h2_2 = H256::random();
let db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
let changeset1 = make_changeset(&[5, 6], &[2]);
let changeset2 = make_changeset(&[7, 8], &[5, 3]);
let changeset3 = make_changeset(&[9], &[]);
overlay.insert(&h1, 1, &H256::default(), changeset1).unwrap();
assert!(contains(&overlay, 5));
overlay.insert(&h2_1, 2, &h1, changeset2).unwrap();
overlay.insert(&h2_2, 2, &h1, changeset3).unwrap();
assert!(contains(&overlay, 7));
assert!(contains(&overlay, 5));
assert!(contains(&overlay, 9));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 3);
overlay.revert_pending();
assert!(!contains(&overlay, 5));
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
}
#[test]
fn keeps_pinned() {
let mut db = make_db(&[]);
@@ -1033,14 +900,12 @@ mod tests {
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
db.commit(&overlay.insert(&h_1, 1, &H256::default(), c_1).unwrap());
db.commit(&overlay.insert(&h_2, 1, &H256::default(), c_2).unwrap());
overlay.apply_pending();
overlay.pin(&h_1);
let mut commit = CommitSet::default();
overlay.canonicalize(&h_2, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending();
assert!(contains(&overlay, 1));
overlay.unpin(&h_1);
assert!(!contains(&overlay, 1));
@@ -1064,14 +929,12 @@ mod tests {
db.commit(&overlay.insert(&h_1, 1, &H256::default(), c_1).unwrap());
db.commit(&overlay.insert(&h_2, 1, &H256::default(), c_2).unwrap());
db.commit(&overlay.insert(&h_3, 1, &H256::default(), c_3).unwrap());
overlay.apply_pending();
overlay.pin(&h_1);
let mut commit = CommitSet::default();
overlay.canonicalize(&h_3, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending(); // 1_2 should be discarded, 1_1 is pinned
assert!(contains(&overlay, 1));
overlay.unpin(&h_1);
@@ -1094,14 +957,12 @@ mod tests {
db.commit(&overlay.insert(&h_11, 1, &H256::default(), c_11).unwrap());
db.commit(&overlay.insert(&h_12, 1, &H256::default(), c_12).unwrap());
db.commit(&overlay.insert(&h_21, 2, &h_11, c_21).unwrap());
overlay.apply_pending();
overlay.pin(&h_21);
let mut commit = CommitSet::default();
overlay.canonicalize(&h_12, &mut commit).unwrap();
db.commit(&commit);
overlay.apply_pending(); // 1_1 and 2_1 should be both pinned
assert!(contains(&overlay, 1));
overlay.unpin(&h_21);
@@ -1129,12 +990,10 @@ mod tests {
overlay.canonicalize(&root, &mut commit).unwrap();
overlay.canonicalize(&h2, &mut commit).unwrap(); // h11 should stay in the DB
db.commit(&commit);
overlay.apply_pending();
assert_eq!(overlay.levels.len(), 1);
assert!(contains(&overlay, 21));
assert!(!contains(&overlay, 11));
assert!(db.get_meta(&to_journal_key(12, 1)).unwrap().is_some());
assert!(db.get_meta(&to_journal_key(12, 0)).unwrap().is_none());
// Restore into a new overlay and check that journaled value exists.
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
@@ -1143,7 +1002,6 @@ mod tests {
let mut commit = CommitSet::default();
overlay.canonicalize(&h21, &mut commit).unwrap(); // h11 should stay in the DB
db.commit(&commit);
overlay.apply_pending();
assert!(!contains(&overlay, 21));
}
@@ -1167,7 +1025,6 @@ mod tests {
overlay.canonicalize(&root, &mut commit).unwrap();
overlay.canonicalize(&h2, &mut commit).unwrap(); // h11 should stay in the DB
db.commit(&commit);
overlay.apply_pending();
// add another block at top level. It should reuse journal index 0 of previously discarded
// block