mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 01:11:10 +00:00
Pin states in memory so that they are not pruned away while still referenced (#2761)
* State pinning in client * Canonicalization queue * Fixed prioritization queue * possible fix of "hash mismatch" * Check for pinned discarded states * Release state before finalization * Style * Style
This commit is contained in:
committed by
Gavin Wood
parent
6ce7c1c8c8
commit
3b26453047
@@ -37,7 +37,7 @@ use std::fmt;
|
||||
use parking_lot::RwLock;
|
||||
use parity_codec as codec;
|
||||
use codec::Codec;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{VecDeque, HashMap, hash_map::Entry};
|
||||
use noncanonical::NonCanonicalOverlay;
|
||||
use pruning::RefWindow;
|
||||
use log::trace;
|
||||
@@ -78,6 +78,8 @@ pub enum Error<E: fmt::Debug> {
|
||||
InvalidBlockNumber,
|
||||
/// Trying to insert block with unknown parent.
|
||||
InvalidParent,
|
||||
/// Canonicalization would discard pinned state.
|
||||
DiscardingPinned,
|
||||
}
|
||||
|
||||
impl<E: fmt::Debug> fmt::Debug for Error<E> {
|
||||
@@ -88,6 +90,7 @@ impl<E: fmt::Debug> fmt::Debug for Error<E> {
|
||||
Error::InvalidBlock => write!(f, "Trying to canonicalize invalid block"),
|
||||
Error::InvalidBlockNumber => write!(f, "Trying to insert block with invalid number"),
|
||||
Error::InvalidParent => write!(f, "Trying to insert block with unknown parent"),
|
||||
Error::DiscardingPinned => write!(f, "Trying to discard pinned state"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -112,7 +115,7 @@ pub struct CommitSet<H: Hash> {
|
||||
}
|
||||
|
||||
/// Pruning constraints. If none are specified pruning is
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[derive(Default, Debug, Clone, Eq, PartialEq)]
|
||||
pub struct Constraints {
|
||||
/// Maximum blocks. Defaults to 0 when unspecified, effectively keeping only non-canonical states.
|
||||
pub max_blocks: Option<u32>,
|
||||
@@ -121,7 +124,7 @@ pub struct Constraints {
|
||||
}
|
||||
|
||||
/// Pruning mode.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum PruningMode {
|
||||
/// Maintain a pruning window.
|
||||
Constrained(Constraints),
|
||||
@@ -165,13 +168,14 @@ fn to_meta_key<S: Codec>(suffix: &[u8], data: &S) -> Vec<u8> {
|
||||
struct StateDbSync<BlockHash: Hash, Key: Hash> {
|
||||
mode: PruningMode,
|
||||
non_canonical: NonCanonicalOverlay<BlockHash, Key>,
|
||||
canonicalization_queue: VecDeque<BlockHash>,
|
||||
pruning: Option<RefWindow<BlockHash, Key>>,
|
||||
pinned: HashSet<BlockHash>,
|
||||
pinned: HashMap<BlockHash, u32>,
|
||||
}
|
||||
|
||||
impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
pub fn new<D: MetaDb>(mode: PruningMode, db: &D) -> Result<StateDbSync<BlockHash, Key>, Error<D::Error>> {
|
||||
trace!("StateDb settings: {:?}", mode);
|
||||
trace!(target: "state-db", "StateDb settings: {:?}", mode);
|
||||
let non_canonical: NonCanonicalOverlay<BlockHash, Key> = NonCanonicalOverlay::new(db)?;
|
||||
let pruning: Option<RefWindow<BlockHash, Key>> = match mode {
|
||||
PruningMode::Constrained(Constraints {
|
||||
@@ -186,6 +190,7 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
non_canonical,
|
||||
pruning,
|
||||
pinned: Default::default(),
|
||||
canonicalization_queue: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -206,21 +211,30 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
}
|
||||
|
||||
pub fn canonicalize_block<E: fmt::Debug>(&mut self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> {
|
||||
let mut commit = match self.mode {
|
||||
PruningMode::ArchiveAll => {
|
||||
CommitSet::default()
|
||||
},
|
||||
PruningMode::ArchiveCanonical => {
|
||||
let mut commit = self.non_canonical.canonicalize(hash)?;
|
||||
commit.data.deleted.clear();
|
||||
commit
|
||||
},
|
||||
PruningMode::Constrained(_) => {
|
||||
self.non_canonical.canonicalize(hash)?
|
||||
},
|
||||
};
|
||||
if let Some(ref mut pruning) = self.pruning {
|
||||
pruning.note_canonical(hash, &mut commit);
|
||||
let mut commit = CommitSet::default();
|
||||
if self.mode == PruningMode::ArchiveAll {
|
||||
return Ok(commit)
|
||||
}
|
||||
self.canonicalization_queue.push_back(hash.clone());
|
||||
while let Some(hash) = self.canonicalization_queue.front().cloned() {
|
||||
if self.pinned.contains_key(&hash) {
|
||||
break;
|
||||
}
|
||||
match self.non_canonical.canonicalize(&hash, &self.pinned, &mut commit) {
|
||||
Ok(()) => {
|
||||
self.canonicalization_queue.pop_front();
|
||||
if self.mode == PruningMode::ArchiveCanonical {
|
||||
commit.data.deleted.clear();
|
||||
}
|
||||
}
|
||||
Err(Error::DiscardingPinned) => {
|
||||
break;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
if let Some(ref mut pruning) = self.pruning {
|
||||
pruning.note_canonical(&hash, &mut commit);
|
||||
}
|
||||
}
|
||||
self.prune(&mut commit);
|
||||
Ok(commit)
|
||||
@@ -255,7 +269,7 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
}
|
||||
|
||||
let pinned = &self.pinned;
|
||||
if pruning.next_hash().map_or(false, |h| pinned.contains(&h)) {
|
||||
if pruning.next_hash().map_or(false, |h| pinned.contains_key(&h)) {
|
||||
break;
|
||||
}
|
||||
pruning.prune_one(commit);
|
||||
@@ -278,11 +292,23 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
}
|
||||
|
||||
pub fn pin(&mut self, hash: &BlockHash) {
|
||||
self.pinned.insert(hash.clone());
|
||||
trace!(target: "state-db", "Pinned block: {:?}", hash);
|
||||
*self.pinned.entry(hash.clone()).or_default() += 1;
|
||||
}
|
||||
|
||||
pub fn unpin(&mut self, hash: &BlockHash) {
|
||||
self.pinned.remove(hash);
|
||||
match self.pinned.entry(hash.clone()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
*entry.get_mut() -= 1;
|
||||
if *entry.get() == 0 {
|
||||
trace!(target: "state-db", "Unpinned block: {:?}", hash);
|
||||
entry.remove();
|
||||
} else {
|
||||
trace!(target: "state-db", "Releasing reference for {:?}", hash);
|
||||
}
|
||||
},
|
||||
Entry::Vacant(_) => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get<D: NodeDb>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
|
||||
|
||||
@@ -230,13 +230,20 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
Ok(commit)
|
||||
}
|
||||
|
||||
fn discard_journals(&self, level_index: usize, discarded_journals: &mut Vec<Vec<u8>>, hash: &BlockHash) {
|
||||
fn discard_journals(
|
||||
&self,
|
||||
level_index: usize,
|
||||
discarded_journals: &mut Vec<Vec<u8>>,
|
||||
discarded_blocks: &mut Vec<BlockHash>,
|
||||
hash: &BlockHash
|
||||
) {
|
||||
if let Some(level) = self.levels.get(level_index) {
|
||||
level.iter().for_each(|overlay| {
|
||||
let parent = self.parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
|
||||
if parent == *hash {
|
||||
discarded_journals.push(overlay.journal_key.clone());
|
||||
self.discard_journals(level_index + 1, discarded_journals, &overlay.hash);
|
||||
discarded_blocks.push(overlay.hash.clone());
|
||||
self.discard_journals(level_index + 1, discarded_journals, discarded_blocks, &overlay.hash);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -268,7 +275,12 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
|
||||
/// Select a top-level root and canonicalized it. Discards all sibling subtrees and the root.
|
||||
/// Returns a set of changes that need to be added to the DB.
|
||||
pub fn canonicalize<E: fmt::Debug>(&mut self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> {
|
||||
pub fn canonicalize<E: fmt::Debug>(
|
||||
&mut self,
|
||||
hash: &BlockHash,
|
||||
pinned: &HashMap<BlockHash, u32>,
|
||||
commit: &mut CommitSet<Key>,
|
||||
) -> Result<(), Error<E>> {
|
||||
trace!(target: "state-db", "Canonicalizing {:?}", hash);
|
||||
let level = self.levels.get(self.pending_canonicalizations.len()).ok_or_else(|| Error::InvalidBlock)?;
|
||||
let index = level
|
||||
@@ -276,26 +288,40 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
.position(|overlay| overlay.hash == *hash)
|
||||
.ok_or_else(|| Error::InvalidBlock)?;
|
||||
|
||||
let mut commit = CommitSet::default();
|
||||
let mut discarded_journals = Vec::new();
|
||||
for (i, overlay) in level.into_iter().enumerate() {
|
||||
if i == index {
|
||||
// that's the one we need to canonicalize
|
||||
commit.data.inserted = overlay.inserted.iter()
|
||||
.map(|k| (k.clone(), self.values.get(k).expect("For each key in verlays there's a value in values").1.clone()))
|
||||
.collect();
|
||||
commit.data.deleted = overlay.deleted.clone();
|
||||
} else {
|
||||
self.discard_journals(self.pending_canonicalizations.len() + 1, &mut discarded_journals, &overlay.hash);
|
||||
let mut discarded_blocks = Vec::new();
|
||||
for (i, overlay) in level.iter().enumerate() {
|
||||
if i != index {
|
||||
self.discard_journals(
|
||||
self.pending_canonicalizations.len() + 1,
|
||||
&mut discarded_journals,
|
||||
&mut discarded_blocks,
|
||||
&overlay.hash
|
||||
);
|
||||
}
|
||||
discarded_journals.push(overlay.journal_key.clone());
|
||||
discarded_blocks.push(overlay.hash.clone());
|
||||
}
|
||||
|
||||
for hash in discarded_blocks.into_iter() {
|
||||
if pinned.contains_key(&hash) {
|
||||
trace!(target: "state-db", "Refusing to discard pinned state {:?}", hash);
|
||||
return Err(Error::DiscardingPinned)
|
||||
}
|
||||
}
|
||||
|
||||
// get the one we need to canonicalize
|
||||
let overlay = &level[index];
|
||||
commit.data.inserted.extend(overlay.inserted.iter()
|
||||
.map(|k| (k.clone(), self.values.get(k).expect("For each key in overlays there's a value in values").1.clone())));
|
||||
commit.data.deleted.extend(overlay.deleted.clone());
|
||||
|
||||
commit.meta.deleted.append(&mut discarded_journals);
|
||||
let canonicalized = (hash.clone(), self.front_block_number() + self.pending_canonicalizations.len() as u64);
|
||||
commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), canonicalized.encode()));
|
||||
trace!(target: "state-db", "Discarding {} records", commit.meta.deleted.len());
|
||||
self.pending_canonicalizations.push(hash.clone());
|
||||
Ok(commit)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_canonicalizations(&mut self) {
|
||||
@@ -385,10 +411,10 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io;
|
||||
use std::{collections::HashMap, io};
|
||||
use primitives::H256;
|
||||
use super::{NonCanonicalOverlay, to_journal_key};
|
||||
use crate::ChangeSet;
|
||||
use crate::{ChangeSet, CommitSet};
|
||||
use crate::test::{make_db, make_changeset};
|
||||
|
||||
fn contains(overlay: &NonCanonicalOverlay<H256, H256>, key: u64) -> bool {
|
||||
@@ -409,7 +435,8 @@ mod tests {
|
||||
fn canonicalize_empty_panics() {
|
||||
let db = make_db(&[]);
|
||||
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
|
||||
overlay.canonicalize::<io::Error>(&H256::default()).unwrap();
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&H256::default(), &HashMap::default(), &mut commit).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -453,7 +480,8 @@ mod tests {
|
||||
let db = make_db(&[]);
|
||||
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
|
||||
overlay.insert::<io::Error>(&h1, 1, &H256::default(), ChangeSet::default()).unwrap();
|
||||
overlay.canonicalize::<io::Error>(&h2).unwrap();
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h2, &HashMap::default(), &mut commit).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -468,7 +496,8 @@ mod tests {
|
||||
assert_eq!(insertion.meta.inserted.len(), 2);
|
||||
assert_eq!(insertion.meta.deleted.len(), 0);
|
||||
db.commit(&insertion);
|
||||
let finalization = overlay.canonicalize::<io::Error>(&h1).unwrap();
|
||||
let mut finalization = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h1, &HashMap::default(), &mut finalization).unwrap();
|
||||
assert_eq!(finalization.data.inserted.len(), changeset.inserted.len());
|
||||
assert_eq!(finalization.data.deleted.len(), changeset.deleted.len());
|
||||
assert_eq!(finalization.meta.inserted.len(), 1);
|
||||
@@ -501,7 +530,9 @@ mod tests {
|
||||
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
|
||||
db.commit(&overlay.insert::<io::Error>(&h1, 10, &H256::default(), make_changeset(&[3, 4], &[2])).unwrap());
|
||||
db.commit(&overlay.insert::<io::Error>(&h2, 11, &h1, make_changeset(&[5], &[3])).unwrap());
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h1, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
overlay.apply_pending();
|
||||
assert_eq!(overlay.levels.len(), 1);
|
||||
|
||||
@@ -526,7 +557,9 @@ mod tests {
|
||||
assert!(contains(&overlay, 5));
|
||||
assert_eq!(overlay.levels.len(), 2);
|
||||
assert_eq!(overlay.parents.len(), 2);
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h1, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
assert!(contains(&overlay, 5));
|
||||
assert_eq!(overlay.levels.len(), 2);
|
||||
assert_eq!(overlay.parents.len(), 2);
|
||||
@@ -535,7 +568,9 @@ mod tests {
|
||||
assert_eq!(overlay.parents.len(), 1);
|
||||
assert!(!contains(&overlay, 5));
|
||||
assert!(contains(&overlay, 7));
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h2).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h2, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
overlay.apply_pending();
|
||||
assert_eq!(overlay.levels.len(), 0);
|
||||
assert_eq!(overlay.parents.len(), 0);
|
||||
@@ -552,7 +587,9 @@ mod tests {
|
||||
db.commit(&overlay.insert::<io::Error>(&h_1, 1, &H256::default(), c_1).unwrap());
|
||||
db.commit(&overlay.insert::<io::Error>(&h_2, 1, &H256::default(), c_2).unwrap());
|
||||
assert!(contains(&overlay, 1));
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h_1).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h_1, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
assert!(contains(&overlay, 1));
|
||||
overlay.apply_pending();
|
||||
assert!(!contains(&overlay, 1));
|
||||
@@ -569,8 +606,10 @@ mod tests {
|
||||
db.commit(&overlay.insert::<io::Error>(&h1, 1, &H256::default(), changeset.clone()).unwrap());
|
||||
db.commit(&overlay.insert::<io::Error>(&h2, 2, &h1, changeset.clone()).unwrap());
|
||||
overlay.apply_pending();
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap());
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h2).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h1, &HashMap::default(), &mut commit).unwrap();
|
||||
overlay.canonicalize::<io::Error>(&h2, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
db.commit(&overlay.insert::<io::Error>(&h3, 3, &h2, changeset.clone()).unwrap());
|
||||
overlay.apply_pending();
|
||||
assert_eq!(overlay.levels.len(), 1);
|
||||
@@ -639,7 +678,9 @@ mod tests {
|
||||
assert_eq!(overlay.last_canonicalized, overlay2.last_canonicalized);
|
||||
|
||||
// canonicalize 1. 2 and all its children should be discarded
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h_1).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h_1, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
overlay.apply_pending();
|
||||
assert_eq!(overlay.levels.len(), 2);
|
||||
assert_eq!(overlay.parents.len(), 6);
|
||||
@@ -657,8 +698,15 @@ mod tests {
|
||||
assert!(db.get_meta(&to_journal_key(2, 2)).unwrap().is_none());
|
||||
assert!(db.get_meta(&to_journal_key(2, 3)).unwrap().is_none());
|
||||
|
||||
// check that discarding pinned state produces an error.
|
||||
let mut commit = CommitSet::default();
|
||||
let pinned = vec![(h_1_1_1, 1)].into_iter().collect();
|
||||
assert!(overlay.canonicalize::<io::Error>(&h_1_2, &pinned, &mut commit).is_err());
|
||||
|
||||
// canonicalize 1_2. 1_1 and all its children should be discarded
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h_1_2).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h_1_2, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
overlay.apply_pending();
|
||||
assert_eq!(overlay.levels.len(), 1);
|
||||
assert_eq!(overlay.parents.len(), 3);
|
||||
@@ -673,7 +721,9 @@ mod tests {
|
||||
assert!(!overlay.have_block(&h_1_1_1));
|
||||
|
||||
// canonicalize 1_2_2
|
||||
db.commit(&overlay.canonicalize::<io::Error>(&h_1_2_2).unwrap());
|
||||
let mut commit = CommitSet::default();
|
||||
overlay.canonicalize::<io::Error>(&h_1_2_2, &HashMap::default(), &mut commit).unwrap();
|
||||
db.commit(&commit);
|
||||
overlay.apply_pending();
|
||||
assert_eq!(overlay.levels.len(), 0);
|
||||
assert_eq!(overlay.parents.len(), 0);
|
||||
|
||||
Reference in New Issue
Block a user