mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 17:01:09 +00:00
Reference counted uncanonicalised overlay (#1695)
* Refcounted uncanonicalised overlay * Removed into_iter * Using free functions
This commit is contained in:
@@ -21,7 +21,7 @@
|
||||
//! `revert_pending`
|
||||
|
||||
use std::fmt;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::{HashMap, VecDeque, hash_map::Entry};
|
||||
use super::{Error, DBValue, ChangeSet, CommitSet, MetaDb, Hash, to_meta_key};
|
||||
use crate::codec::{Decode, Encode};
|
||||
use parity_codec_derive::{Decode, Encode};
|
||||
@@ -37,6 +37,7 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
|
||||
parents: HashMap<BlockHash, BlockHash>,
|
||||
pending_canonicalizations: Vec<BlockHash>,
|
||||
pending_insertions: Vec<BlockHash>,
|
||||
values: HashMap<Key, (u32, DBValue)>, //ref counted
|
||||
}
|
||||
|
||||
#[derive(Encode, Decode)]
|
||||
@@ -55,10 +56,61 @@ fn to_journal_key(block: u64, index: u64) -> Vec<u8> {
|
||||
struct BlockOverlay<BlockHash: Hash, Key: Hash> {
|
||||
hash: BlockHash,
|
||||
journal_key: Vec<u8>,
|
||||
values: HashMap<Key, DBValue>,
|
||||
inserted: Vec<Key>,
|
||||
deleted: Vec<Key>,
|
||||
}
|
||||
|
||||
fn insert_values<Key: Hash>(values: &mut HashMap<Key, (u32, DBValue)>, inserted: Vec<(Key, DBValue)>) {
|
||||
for (k, v) in inserted {
|
||||
debug_assert!(values.get(&k).map_or(true, |(_, value)| *value == v));
|
||||
let (ref mut counter, _) = values.entry(k).or_insert_with(|| (0, v));
|
||||
*counter += 1;
|
||||
}
|
||||
}
|
||||
|
||||
fn discard_values<Key: Hash>(values: &mut HashMap<Key, (u32, DBValue)>, inserted: Vec<Key>) {
|
||||
for k in inserted {
|
||||
match values.entry(k) {
|
||||
Entry::Occupied(mut e) => {
|
||||
let (ref mut counter, _) = e.get_mut();
|
||||
*counter -= 1;
|
||||
if *counter == 0 {
|
||||
e.remove();
|
||||
}
|
||||
},
|
||||
Entry::Vacant(_) => {
|
||||
debug_assert!(false, "Trying to discard missing value");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn discard_descendants<BlockHash: Hash, Key: Hash>(
|
||||
levels: &mut VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
|
||||
mut values: &mut HashMap<Key, (u32, DBValue)>,
|
||||
index: usize,
|
||||
parents: &mut HashMap<BlockHash, BlockHash>,
|
||||
hash: &BlockHash,
|
||||
) {
|
||||
let mut discarded = Vec::new();
|
||||
if let Some(level) = levels.get_mut(index) {
|
||||
*level = level.drain(..).filter_map(|overlay| {
|
||||
let parent = parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
|
||||
if parent == *hash {
|
||||
parents.remove(&overlay.hash);
|
||||
discarded.push(overlay.hash);
|
||||
discard_values(&mut values, overlay.inserted);
|
||||
None
|
||||
} else {
|
||||
Some(overlay)
|
||||
}
|
||||
}).collect();
|
||||
}
|
||||
for hash in discarded {
|
||||
discard_descendants(levels, values, index + 1, parents, &hash);
|
||||
}
|
||||
}
|
||||
|
||||
impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
/// Creates a new instance. Does not expect any metadata to be present in the DB.
|
||||
pub fn new<D: MetaDb>(db: &D) -> Result<NonCanonicalOverlay<BlockHash, Key>, Error<D::Error>> {
|
||||
@@ -70,6 +122,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
};
|
||||
let mut levels = VecDeque::new();
|
||||
let mut parents = HashMap::new();
|
||||
let mut values = HashMap::new();
|
||||
if let Some((ref hash, mut block)) = last_canonicalized {
|
||||
// read the journal
|
||||
trace!(target: "state-db", "Reading uncanonicalized journal. Last canonicalized #{} ({:?})", block, hash);
|
||||
@@ -83,13 +136,15 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
match db.get_meta(&journal_key).map_err(|e| Error::Db(e))? {
|
||||
Some(record) => {
|
||||
let record: JournalRecord<BlockHash, Key> = Decode::decode(&mut record.as_slice()).ok_or(Error::Decoding)?;
|
||||
let inserted = record.inserted.iter().map(|(k, _)| k.clone()).collect();
|
||||
let overlay = BlockOverlay {
|
||||
hash: record.hash.clone(),
|
||||
journal_key,
|
||||
values: record.inserted.into_iter().collect(),
|
||||
inserted: inserted,
|
||||
deleted: record.deleted,
|
||||
};
|
||||
trace!(target: "state-db", "Uncanonicalized journal entry {}.{} ({} inserted, {} deleted)", block, index, overlay.values.len(), overlay.deleted.len());
|
||||
insert_values(&mut values, record.inserted);
|
||||
trace!(target: "state-db", "Uncanonicalized journal entry {}.{} ({} inserted, {} deleted)", block, index, overlay.inserted.len(), overlay.deleted.len());
|
||||
level.push(overlay);
|
||||
parents.insert(record.hash, record.parent_hash);
|
||||
index += 1;
|
||||
@@ -112,6 +167,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
parents,
|
||||
pending_canonicalizations: Default::default(),
|
||||
pending_insertions: Default::default(),
|
||||
values: values,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -153,10 +209,11 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
let index = level.len() as u64;
|
||||
let journal_key = to_journal_key(number, index);
|
||||
|
||||
let inserted = changeset.inserted.iter().map(|(k, _)| k.clone()).collect();
|
||||
let overlay = BlockOverlay {
|
||||
hash: hash.clone(),
|
||||
journal_key: journal_key.clone(),
|
||||
values: changeset.inserted.iter().cloned().collect(),
|
||||
inserted: inserted,
|
||||
deleted: changeset.deleted.clone(),
|
||||
};
|
||||
level.push(overlay);
|
||||
@@ -167,37 +224,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
inserted: changeset.inserted,
|
||||
deleted: changeset.deleted,
|
||||
};
|
||||
commit.meta.inserted.push((journal_key, journal_record.encode()));
|
||||
trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len());
|
||||
let journal_record = journal_record.encode();
|
||||
commit.meta.inserted.push((journal_key, journal_record));
|
||||
insert_values(&mut self.values, journal_record.inserted);
|
||||
self.pending_insertions.push(hash.clone());
|
||||
Ok(commit)
|
||||
}
|
||||
|
||||
fn discard_descendants(
|
||||
levels: &mut VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
|
||||
index: usize,
|
||||
parents: &mut HashMap<BlockHash, BlockHash>,
|
||||
hash: &BlockHash,
|
||||
) {
|
||||
let mut discarded = Vec::new();
|
||||
if let Some(level) = levels.get_mut(index) {
|
||||
level.retain(|ref overlay| {
|
||||
let parent = parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
|
||||
if parent == *hash {
|
||||
parents.remove(&overlay.hash);
|
||||
discarded.push(overlay.hash.clone());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
for hash in discarded.into_iter() {
|
||||
Self::discard_descendants(levels, index + 1, parents, &hash);
|
||||
}
|
||||
}
|
||||
|
||||
fn discard_journals(&self, level_index: usize, discarded_journals: &mut Vec<Vec<u8>>, hash: &BlockHash) {
|
||||
if let Some(level) = self.levels.get(level_index) {
|
||||
level.iter().for_each(|overlay| {
|
||||
@@ -249,7 +282,9 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
for (i, overlay) in level.into_iter().enumerate() {
|
||||
if i == index {
|
||||
// that's the one we need to canonicalize
|
||||
commit.data.inserted = overlay.values.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
|
||||
commit.data.inserted = overlay.inserted.iter()
|
||||
.map(|k| (k.clone(), self.values.get(k).expect("For each key in verlays there's a value in values").1.clone()))
|
||||
.collect();
|
||||
commit.data.deleted = overlay.deleted.clone();
|
||||
} else {
|
||||
self.discard_journals(self.pending_canonicalizations.len() + 1, &mut discarded_journals, &overlay.hash);
|
||||
@@ -275,12 +310,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
.position(|overlay| overlay.hash == hash)
|
||||
.expect("Hash validity is checked in `canonicalize`");
|
||||
|
||||
// discard unfinalized overlays
|
||||
// discard unfinalized overlays and values
|
||||
for (i, overlay) in level.into_iter().enumerate() {
|
||||
self.parents.remove(&overlay.hash);
|
||||
if i != index {
|
||||
Self::discard_descendants(&mut self.levels, 0, &mut self.parents, &overlay.hash);
|
||||
discard_descendants(&mut self.levels, &mut self.values, 0, &mut self.parents, &overlay.hash);
|
||||
}
|
||||
discard_values(&mut self.values, overlay.inserted);
|
||||
}
|
||||
}
|
||||
if let Some(hash) = last {
|
||||
@@ -291,12 +327,8 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
|
||||
/// Get a value from the node overlay. This searches in every existing changeset.
|
||||
pub fn get(&self, key: &Key) -> Option<DBValue> {
|
||||
for level in self.levels.iter() {
|
||||
for overlay in level.iter() {
|
||||
if let Some(value) = overlay.values.get(&key) {
|
||||
return Some(value.clone());
|
||||
}
|
||||
}
|
||||
if let Some((_, value)) = self.values.get(&key) {
|
||||
return Some(value.clone());
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -314,6 +346,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
for overlay in level.into_iter() {
|
||||
commit.meta.deleted.push(overlay.journal_key);
|
||||
self.parents.remove(&overlay.hash);
|
||||
discard_values(&mut self.values, overlay.inserted);
|
||||
}
|
||||
commit
|
||||
})
|
||||
@@ -329,7 +362,8 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
level.last().expect("Hash is added in `insert` in reverse order").hash == hash)
|
||||
.expect("Hash is added in insert");
|
||||
|
||||
self.levels[level_index].pop();
|
||||
let overlay = self.levels[level_index].pop().expect("Empty levels are not allowed in self.levels");
|
||||
discard_values(&mut self.values, overlay.inserted);
|
||||
if self.levels[level_index].is_empty() {
|
||||
debug_assert_eq!(level_index, self.levels.len() - 1);
|
||||
self.levels.pop_back();
|
||||
@@ -509,6 +543,22 @@ mod tests {
|
||||
assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8])));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_same_key() {
|
||||
let mut db = make_db(&[]);
|
||||
let (h_1, c_1) = (H256::random(), make_changeset(&[1], &[]));
|
||||
let (h_2, c_2) = (H256::random(), make_changeset(&[1], &[]));
|
||||
|
||||
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
|
||||
db.commit(&overlay.insert::<io::Error>(&h_1, 1, &H256::default(), c_1).unwrap());
|
||||
db.commit(&overlay.insert::<io::Error>(&h_2, 1, &H256::default(), c_2).unwrap());
|
||||
assert!(contains(&overlay, 1));
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h_1).unwrap());
|
||||
assert!(contains(&overlay, 1));
|
||||
overlay.apply_pending();
|
||||
assert!(!contains(&overlay, 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_with_pending_canonicalization() {
|
||||
let h1 = H256::random();
|
||||
|
||||
Reference in New Issue
Block a user