Support reference-counting state backend. (#5769)

* Optimize pinning

* Ref counting state backend

* Style

Co-Authored-By: Wei Tang <hi@that.world>

* Update Cargo.lock

* Handle empty node

Co-authored-by: Wei Tang <hi@that.world>
This commit is contained in:
Arkadiy Paronyan
2020-04-27 12:24:50 +02:00
committed by GitHub
parent 636ddd95d2
commit 64ed36d093
11 changed files with 281 additions and 189 deletions
+18 -10
View File
@@ -201,9 +201,10 @@ struct StateDbSync<BlockHash: Hash, Key: Hash> {
impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<BlockHash, Key> {
fn new<D: MetaDb>(
mode: PruningMode,
ref_counting: bool,
db: &D,
) -> Result<StateDbSync<BlockHash, Key>, Error<D::Error>> {
trace!(target: "state-db", "StateDb settings: {:?}", mode);
trace!(target: "state-db", "StateDb settings: {:?}. Ref-counting: {}", mode, ref_counting);
// Check that settings match
Self::check_meta(&mode, db)?;
@@ -214,7 +215,7 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<Block
max_mem: Some(_),
..
}) => unimplemented!(),
PruningMode::Constrained(_) => Some(RefWindow::new(db)?),
PruningMode::Constrained(_) => Some(RefWindow::new(db, ref_counting)?),
PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => None,
};
@@ -387,8 +388,11 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<Block
}
}
pub fn get<D: NodeDb>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where Key: AsRef<D::Key>
pub fn get<D: NodeDb, Q: ?Sized>(&self, key: &Q, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where
Q: AsRef<D::Key>,
Key: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
if let Some(value) = self.non_canonical.get(key) {
return Ok(Some(value));
@@ -438,10 +442,11 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDb<BlockHash
/// Creates a new instance. Does not expect any metadata in the database.
pub fn new<D: MetaDb>(
mode: PruningMode,
ref_counting: bool,
db: &D,
) -> Result<StateDb<BlockHash, Key>, Error<D::Error>> {
Ok(StateDb {
db: RwLock::new(StateDbSync::new(mode, db)?)
db: RwLock::new(StateDbSync::new(mode, ref_counting, db)?)
})
}
@@ -475,8 +480,11 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDb<BlockHash
}
/// Get a value from non-canonical/pruning overlay or the backing DB.
pub fn get<D: NodeDb>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where Key: AsRef<D::Key>
pub fn get<D: NodeDb, Q: ?Sized>(&self, key: &Q, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where
Q: AsRef<D::Key>,
Key: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
self.db.read().get(key, db)
}
@@ -523,7 +531,7 @@ mod tests {
fn make_test_db(settings: PruningMode) -> (TestDb, StateDb<H256, H256>) {
let mut db = make_db(&[91, 921, 922, 93, 94]);
let state_db = StateDb::new(settings, &db).unwrap();
let state_db = StateDb::new(settings, false, &db).unwrap();
db.commit(
&state_db
@@ -638,7 +646,7 @@ mod tests {
#[test]
fn detects_incompatible_mode() {
let mut db = make_db(&[]);
let state_db = StateDb::new(PruningMode::ArchiveAll, &db).unwrap();
let state_db = StateDb::new(PruningMode::ArchiveAll, false, &db).unwrap();
db.commit(
&state_db
.insert_block::<io::Error>(
@@ -650,7 +658,7 @@ mod tests {
.unwrap(),
);
let new_mode = PruningMode::Constrained(Constraints { max_blocks: Some(2), max_mem: None });
let state_db: Result<StateDb<H256, H256>, _> = StateDb::new(new_mode, &db);
let state_db: Result<StateDb<H256, H256>, _> = StateDb::new(new_mode, false, &db);
assert!(state_db.is_err());
}
}
+76 -41
View File
@@ -40,7 +40,7 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
values: HashMap<Key, (u32, DBValue)>, //ref counted
//would be deleted but kept around because block is pinned, ref counted.
pinned: HashMap<BlockHash, u32>,
pinned_insertions: HashMap<BlockHash, Vec<Key>>,
pinned_insertions: HashMap<BlockHash, (Vec<Key>, u32)>,
}
#[derive(Encode, Decode)]
@@ -90,25 +90,44 @@ fn discard_values<Key: Hash>(values: &mut HashMap<Key, (u32, DBValue)>, inserted
}
fn discard_descendants<BlockHash: Hash, Key: Hash>(
levels: &mut VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
levels: &mut (&mut [Vec<BlockOverlay<BlockHash, Key>>], &mut [Vec<BlockOverlay<BlockHash, Key>>]),
mut values: &mut HashMap<Key, (u32, DBValue)>,
index: usize,
parents: &mut HashMap<BlockHash, BlockHash>,
pinned: &HashMap<BlockHash, u32>,
pinned_insertions: &mut HashMap<BlockHash, Vec<Key>>,
pinned_insertions: &mut HashMap<BlockHash, (Vec<Key>, u32)>,
hash: &BlockHash,
) {
let mut discarded = Vec::new();
if let Some(level) = levels.get_mut(index) {
) -> u32 {
let (first, mut remainder) = if let Some((first, rest)) = levels.0.split_first_mut() {
(Some(first), (rest, &mut levels.1[..]))
} else {
if let Some((first, rest)) = levels.1.split_first_mut() {
(Some(first), (&mut levels.0[..], rest))
} else {
(None, (&mut levels.0[..], &mut levels.1[..]))
}
};
let mut pinned_children = 0;
if let Some(level) = first {
*level = level.drain(..).filter_map(|overlay| {
let parent = parents.get(&overlay.hash)
.expect("there is a parent entry for each entry in levels; qed");
if parent == hash {
discarded.push(overlay.hash.clone());
let mut num_pinned = discard_descendants(
&mut remainder,
values,
parents,
pinned,
pinned_insertions,
&overlay.hash
);
if pinned.contains_key(&overlay.hash) {
num_pinned += 1;
}
if num_pinned != 0 {
// save to be discarded later.
pinned_insertions.insert(overlay.hash.clone(), overlay.inserted);
pinned_insertions.insert(overlay.hash.clone(), (overlay.inserted, num_pinned));
pinned_children += num_pinned;
} else {
// discard immediately.
parents.remove(&overlay.hash);
@@ -120,9 +139,7 @@ fn discard_descendants<BlockHash: Hash, Key: Hash>(
}
}).collect();
}
for hash in discarded {
discard_descendants(levels, values, index + 1, parents, pinned, pinned_insertions, &hash);
}
pinned_children
}
impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
@@ -346,19 +363,23 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
// discard unfinalized overlays and values
for (i, overlay) in level.into_iter().enumerate() {
if i != index {
let mut pinned_children = if i != index {
discard_descendants(
&mut self.levels,
&mut self.levels.as_mut_slices(),
&mut self.values,
0,
&mut self.parents,
&self.pinned,
&mut self.pinned_insertions,
&overlay.hash,
);
}
)
} else {
0
};
if self.pinned.contains_key(&overlay.hash) {
self.pinned_insertions.insert(overlay.hash.clone(), overlay.inserted);
pinned_children += 1;
}
if pinned_children != 0 {
self.pinned_insertions.insert(overlay.hash.clone(), (overlay.inserted, pinned_children));
} else {
self.parents.remove(&overlay.hash);
discard_values(&mut self.values, overlay.inserted);
@@ -372,7 +393,11 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
}
/// Get a value from the node overlay. This searches in every existing changeset.
pub fn get(&self, key: &Key) -> Option<DBValue> {
pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<DBValue>
where
Key: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
if let Some((_, value)) = self.values.get(&key) {
return Some(value.clone());
}
@@ -435,37 +460,47 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
debug_assert!(false, "Trying to pin pending state");
return;
}
// Also pin all parents
let mut parent = Some(hash);
while let Some(hash) = parent {
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash);
}
*refs += 1;
parent = self.parents.get(hash);
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash);
}
*refs += 1;
}
/// Discard pinned state
pub fn unpin(&mut self, hash: &BlockHash) {
// Also unpin all parents
let mut parent = Some(hash.clone());
while let Some(hash) = parent {
parent = self.parents.get(&hash).cloned();
match self.pinned.entry(hash.clone()) {
Entry::Occupied(mut entry) => {
*entry.get_mut() -= 1;
if *entry.get() == 0 {
entry.remove();
if let Some(inserted) = self.pinned_insertions.remove(&hash) {
let removed = match self.pinned.entry(hash.clone()) {
Entry::Occupied(mut entry) => {
*entry.get_mut() -= 1;
if *entry.get() == 0 {
entry.remove();
true
} else {
false
}
},
Entry::Vacant(_) => false,
};
if removed {
let mut parent = Some(hash.clone());
while let Some(hash) = parent {
parent = self.parents.get(&hash).cloned();
match self.pinned_insertions.entry(hash.clone()) {
Entry::Occupied(mut entry) => {
entry.get_mut().1 -= 1;
if entry.get().1 == 0 {
let (inserted, _) = entry.remove();
trace!(target: "state-db-pin", "Discarding unpinned non-canon block: {:?}", hash);
discard_values(&mut self.values, inserted);
self.parents.remove(&hash);
true
} else {
false
}
}
},
Entry::Vacant(_) => {},
},
Entry::Vacant(_) => break,
};
}
}
}
+64 -23
View File
@@ -45,6 +45,10 @@ pub struct RefWindow<BlockHash: Hash, Key: Hash> {
/// Number of calls of `prune_one` after
/// last call `apply_pending` or `revert_pending`
pending_prunings: usize,
/// Keep track of re-inserted keys and do not delete them when pruning.
/// Setting this to false requires backend that supports reference
/// counting.
count_insertions: bool,
}
#[derive(Debug, PartialEq, Eq, parity_util_mem_derive::MallocSizeOf)]
@@ -66,7 +70,7 @@ fn to_journal_key(block: u64) -> Vec<u8> {
}
impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
pub fn new<D: MetaDb>(db: &D) -> Result<RefWindow<BlockHash, Key>, Error<D::Error>> {
pub fn new<D: MetaDb>(db: &D, count_insertions: bool) -> Result<RefWindow<BlockHash, Key>, Error<D::Error>> {
let last_pruned = db.get_meta(&to_meta_key(LAST_PRUNED, &()))
.map_err(|e| Error::Db(e))?;
let pending_number: u64 = match last_pruned {
@@ -80,6 +84,7 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
pending_number: pending_number,
pending_canonicalizations: 0,
pending_prunings: 0,
count_insertions,
};
// read the journal
trace!(target: "state-db", "Reading pruning journal. Pending #{}", pending_number);
@@ -99,17 +104,19 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
}
fn import<I: IntoIterator<Item=Key>>(&mut self, hash: &BlockHash, journal_key: Vec<u8>, inserted: I, deleted: Vec<Key>) {
// remove all re-inserted keys from death rows
for k in inserted {
if let Some(block) = self.death_index.remove(&k) {
self.death_rows[(block - self.pending_number) as usize].deleted.remove(&k);
if self.count_insertions {
// remove all re-inserted keys from death rows
for k in inserted {
if let Some(block) = self.death_index.remove(&k) {
self.death_rows[(block - self.pending_number) as usize].deleted.remove(&k);
}
}
}
// add new keys
let imported_block = self.pending_number + self.death_rows.len() as u64;
for k in deleted.iter() {
self.death_index.insert(k.clone(), imported_block);
// add new keys
let imported_block = self.pending_number + self.death_rows.len() as u64;
for k in deleted.iter() {
self.death_index.insert(k.clone(), imported_block);
}
}
self.death_rows.push_back(
DeathRow {
@@ -157,7 +164,11 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
/// Add a change set to the window. Creates a journal record and pushes it to `commit`
pub fn note_canonical(&mut self, hash: &BlockHash, commit: &mut CommitSet<Key>) {
trace!(target: "state-db", "Adding to pruning window: {:?} ({} inserted, {} deleted)", hash, commit.data.inserted.len(), commit.data.deleted.len());
let inserted = commit.data.inserted.iter().map(|(k, _)| k.clone()).collect();
let inserted = if self.count_insertions {
commit.data.inserted.iter().map(|(k, _)| k.clone()).collect()
} else {
Default::default()
};
let deleted = ::std::mem::replace(&mut commit.data.deleted, Vec::new());
let journal_record = JournalRecord {
hash: hash.clone(),
@@ -177,8 +188,10 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
for _ in 0 .. self.pending_prunings {
let pruned = self.death_rows.pop_front().expect("pending_prunings is always < death_rows.len()");
trace!(target: "state-db", "Applying pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
for k in pruned.deleted.iter() {
self.death_index.remove(&k);
if self.count_insertions {
for k in pruned.deleted.iter() {
self.death_index.remove(&k);
}
}
self.pending_number += 1;
}
@@ -192,8 +205,10 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
// We don't bother to track and revert that for now. This means that a few nodes might end up no being
// deleted in case transaction fails and `revert_pending` is called.
self.death_rows.truncate(self.death_rows.len() - self.pending_canonicalizations);
let new_max_block = self.death_rows.len() as u64 + self.pending_number;
self.death_index.retain(|_, block| *block < new_max_block);
if self.count_insertions {
let new_max_block = self.death_rows.len() as u64 + self.pending_number;
self.death_index.retain(|_, block| *block < new_max_block);
}
self.pending_canonicalizations = 0;
self.pending_prunings = 0;
}
@@ -207,7 +222,7 @@ mod tests {
use crate::test::{make_db, make_commit, TestDb};
fn check_journal(pruning: &RefWindow<H256, H256>, db: &TestDb) {
let restored: RefWindow<H256, H256> = RefWindow::new(db).unwrap();
let restored: RefWindow<H256, H256> = RefWindow::new(db, pruning.count_insertions).unwrap();
assert_eq!(pruning.pending_number, restored.pending_number);
assert_eq!(pruning.death_rows, restored.death_rows);
assert_eq!(pruning.death_index, restored.death_index);
@@ -216,7 +231,7 @@ mod tests {
#[test]
fn created_from_empty_db() {
let db = make_db(&[]);
let pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
assert_eq!(pruning.pending_number, 0);
assert!(pruning.death_rows.is_empty());
assert!(pruning.death_index.is_empty());
@@ -225,7 +240,7 @@ mod tests {
#[test]
fn prune_empty() {
let db = make_db(&[]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
assert_eq!(pruning.pending_number, 0);
@@ -238,7 +253,7 @@ mod tests {
#[test]
fn prune_one() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
let mut commit = make_commit(&[4, 5], &[1, 3]);
let h = H256::random();
pruning.note_canonical(&h, &mut commit);
@@ -267,7 +282,7 @@ mod tests {
#[test]
fn prune_two() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
let mut commit = make_commit(&[4], &[1]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
@@ -295,7 +310,7 @@ mod tests {
#[test]
fn prune_two_pending() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
let mut commit = make_commit(&[4], &[1]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
@@ -318,7 +333,7 @@ mod tests {
#[test]
fn reinserted_survives() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
let mut commit = make_commit(&[], &[2]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
@@ -351,7 +366,7 @@ mod tests {
#[test]
fn reinserted_survive_pending() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, true).unwrap();
let mut commit = make_commit(&[], &[2]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
@@ -377,4 +392,30 @@ mod tests {
pruning.apply_pending();
assert_eq!(pruning.pending_number, 3);
}
#[test]
fn reinserted_ignores() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db, false).unwrap();
let mut commit = make_commit(&[], &[2]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
let mut commit = make_commit(&[2], &[]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
let mut commit = make_commit(&[], &[2]);
pruning.note_canonical(&H256::random(), &mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 2, 3])));
pruning.apply_pending();
check_journal(&pruning, &db);
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 3])));
assert!(pruning.death_index.is_empty());
}
}