Rework storage iterators (#13284)

* Rework storage iterators

* Make sure storage iteration is also accounted for when benchmarking

* Use `trie-db` from crates.io

* Appease clippy

* Bump `trie-bench` to 0.35.0

* Fix tests' compilation

* Update comment to clarify how `IterArgs::start_at` works

* Add extra tests

* Fix iterators on `Client` so that they behave as before

* Add extra `unwrap`s in tests

* More clippy fixes

* Come on clippy, give me a break already

* Rename `allow_missing` to `stop_on_incomplete_database`

* Add `#[inline]` to `with_recorder_and_cache`

* Use `with_recorder_and_cache` in `with_trie_db`; add doc comment

* Simplify code: use `with_trie_db` in `next_storage_key_from_root`

* Remove `expect`s in the benchmarking CLI

* Add extra doc comments

* Move `RawIter` before `TrieBackendEssence` (no code changes; just cut-paste)

* Remove a TODO in tests

* Update comment for `StorageIterator::was_complete`

* Update `trie-db` to 0.25.1
This commit is contained in:
Koute
2023-02-22 16:49:25 +09:00
committed by GitHub
parent 236bbbd5ef
commit f8e3bdad3d
27 changed files with 1097 additions and 742 deletions
@@ -19,24 +19,23 @@
//! from storage.
use crate::{
backend::Consolidate, debug, trie_backend::AsLocalTrieCache, warn, StorageKey, StorageValue,
backend::{Consolidate, IterArgs, StorageIterator},
trie_backend::AsLocalTrieCache,
warn, StorageKey, StorageValue,
};
use codec::Codec;
use hash_db::{self, AsHashDB, HashDB, HashDBRef, Hasher, Prefix};
#[cfg(feature = "std")]
use parking_lot::RwLock;
use sp_core::storage::{ChildInfo, ChildType, StateVersion};
#[cfg(not(feature = "std"))]
use sp_std::marker::PhantomData;
use sp_std::{boxed::Box, vec::Vec};
use sp_std::{boxed::Box, marker::PhantomData, vec::Vec};
#[cfg(feature = "std")]
use sp_trie::recorder::Recorder;
use sp_trie::{
child_delta_trie_root, delta_trie_root, empty_child_trie_root, read_child_trie_hash,
read_child_trie_value, read_trie_value,
trie_types::{TrieDBBuilder, TrieError},
DBValue, KeySpacedDB, NodeCodec, Trie, TrieCache, TrieDBIterator, TrieDBKeyIterator,
TrieRecorder,
DBValue, KeySpacedDB, NodeCodec, Trie, TrieCache, TrieDBRawIterator, TrieRecorder,
};
#[cfg(feature = "std")]
use std::{collections::HashMap, sync::Arc};
@@ -76,6 +75,109 @@ impl<H> Cache<H> {
}
}
enum IterState {
Pending,
FinishedComplete,
FinishedIncomplete,
}
/// A raw iterator over the storage.
pub struct RawIter<S, H, C>
where
H: Hasher,
{
stop_on_incomplete_database: bool,
root: H::Out,
child_info: Option<ChildInfo>,
trie_iter: TrieDBRawIterator<Layout<H>>,
state: IterState,
_phantom: PhantomData<(S, C)>,
}
impl<S, H, C> RawIter<S, H, C>
where
H: Hasher,
S: TrieBackendStorage<H>,
H::Out: Codec + Ord,
C: AsLocalTrieCache<H> + Send + Sync,
{
#[inline]
fn prepare<R>(
&mut self,
backend: &TrieBackendEssence<S, H, C>,
callback: impl FnOnce(
&sp_trie::TrieDB<Layout<H>>,
&mut TrieDBRawIterator<Layout<H>>,
) -> Option<core::result::Result<R, Box<TrieError<<H as Hasher>::Out>>>>,
) -> Option<Result<R>> {
if !matches!(self.state, IterState::Pending) {
return None
}
let result = backend.with_trie_db(self.root, self.child_info.as_ref(), |db| {
callback(&db, &mut self.trie_iter)
});
match result {
Some(Ok(key_value)) => Some(Ok(key_value)),
None => {
self.state = IterState::FinishedComplete;
None
},
Some(Err(error)) => {
self.state = IterState::FinishedIncomplete;
if matches!(*error, TrieError::IncompleteDatabase(_)) &&
self.stop_on_incomplete_database
{
None
} else {
Some(Err(format!("TrieDB iteration error: {}", error)))
}
},
}
}
}
impl<S, H, C> Default for RawIter<S, H, C>
where
H: Hasher,
{
fn default() -> Self {
Self {
stop_on_incomplete_database: false,
child_info: None,
root: Default::default(),
trie_iter: TrieDBRawIterator::empty(),
state: IterState::FinishedComplete,
_phantom: Default::default(),
}
}
}
impl<S, H, C> StorageIterator<H> for RawIter<S, H, C>
where
H: Hasher,
S: TrieBackendStorage<H>,
H::Out: Codec + Ord,
C: AsLocalTrieCache<H> + Send + Sync,
{
type Backend = crate::TrieBackend<S, H, C>;
type Error = crate::DefaultError;
#[inline]
fn next_key(&mut self, backend: &Self::Backend) -> Option<Result<StorageKey>> {
self.prepare(&backend.essence, |trie, trie_iter| trie_iter.next_key(&trie))
}
#[inline]
fn next_pair(&mut self, backend: &Self::Backend) -> Option<Result<(StorageKey, StorageValue)>> {
self.prepare(&backend.essence, |trie, trie_iter| trie_iter.next_item(&trie))
}
fn was_complete(&self) -> bool {
matches!(self.state, IterState::FinishedComplete)
}
}
/// Patricia trie-based pairs storage essence.
pub struct TrieBackendEssence<S: TrieBackendStorage<H>, H: Hasher, C> {
storage: S,
@@ -168,6 +270,7 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C: AsLocalTrieCache<H>> TrieBackendEss
///
/// If the given `storage_root` is `None`, `self.root` will be used.
#[cfg(feature = "std")]
#[inline]
fn with_recorder_and_cache<R>(
&self,
storage_root: Option<H::Out>,
@@ -193,6 +296,7 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C: AsLocalTrieCache<H>> TrieBackendEss
}
#[cfg(not(feature = "std"))]
#[inline]
fn with_recorder_and_cache<R>(
&self,
_: Option<H::Out>,
@@ -262,6 +366,31 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C: AsLocalTrieCache<H> + Send + Sync>
where
H::Out: Codec + Ord,
{
/// Calls the given closure with a [`TrieDb`] constructed for the given
/// storage root and (optionally) child trie.
#[inline]
fn with_trie_db<R>(
&self,
root: H::Out,
child_info: Option<&ChildInfo>,
callback: impl FnOnce(&sp_trie::TrieDB<Layout<H>>) -> R,
) -> R {
let backend = self as &dyn HashDBRef<H, Vec<u8>>;
let db = child_info
.as_ref()
.map(|child_info| KeySpacedDB::new(backend, child_info.keyspace()));
let db = db.as_ref().map(|db| db as &dyn HashDBRef<H, Vec<u8>>).unwrap_or(backend);
self.with_recorder_and_cache(Some(root), |recorder, cache| {
let trie = TrieDBBuilder::<H>::new(db, &root)
.with_optional_recorder(recorder)
.with_optional_cache(cache)
.build();
callback(&trie)
})
}
/// Return the next key in the trie i.e. the minimum key that is strictly superior to `key` in
/// lexicographic order.
pub fn next_storage_key(&self, key: &[u8]) -> Result<Option<StorageKey>> {
@@ -316,21 +445,7 @@ where
child_info: Option<&ChildInfo>,
key: &[u8],
) -> Result<Option<StorageKey>> {
let dyn_eph: &dyn HashDBRef<_, _>;
let keyspace_eph;
if let Some(child_info) = child_info.as_ref() {
keyspace_eph = KeySpacedDB::new(self, child_info.keyspace());
dyn_eph = &keyspace_eph;
} else {
dyn_eph = self;
}
self.with_recorder_and_cache(Some(*root), |recorder, cache| {
let trie = TrieDBBuilder::<H>::new(dyn_eph, root)
.with_optional_recorder(recorder)
.with_optional_cache(cache)
.build();
self.with_trie_db(*root, child_info, |trie| {
let mut iter = trie.key_iter().map_err(|e| format!("TrieDB iteration error: {}", e))?;
// The key just after the one given in input, basically `key++0`.
@@ -429,246 +544,42 @@ where
})
}
/// Retrieve all entries keys of storage and call `f` for each of those keys.
/// Aborts as soon as `f` returns false.
///
/// Returns `true` when all keys were iterated.
pub fn apply_to_key_values_while(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: impl FnMut(Vec<u8>, Vec<u8>) -> bool,
allow_missing_nodes: bool,
) -> Result<bool> {
let root = if let Some(child_info) = child_info.as_ref() {
match self.child_root(child_info)? {
Some(child_root) => child_root,
None => return Ok(true),
}
/// Create a raw iterator over the storage.
pub fn raw_iter(&self, args: IterArgs) -> Result<RawIter<S, H, C>> {
let root = if let Some(child_info) = args.child_info.as_ref() {
let root = match self.child_root(&child_info)? {
Some(root) => root,
None => return Ok(Default::default()),
};
root
} else {
self.root
};
self.trie_iter_inner(&root, prefix, f, child_info, start_at, allow_missing_nodes)
}
/// Retrieve all entries keys of a storage and call `f` for each of those keys.
/// Aborts as soon as `f` returns false.
pub fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
) {
let root = if let Some(child_info) = child_info.as_ref() {
match self.child_root(child_info) {
Ok(Some(v)) => v,
// If the child trie doesn't exist, there is no need to continue.
Ok(None) => return,
Err(e) => {
debug!(target: "trie", "Error while iterating child storage: {}", e);
return
},
}
} else {
self.root
};
self.trie_iter_key_inner(&root, prefix, f, child_info, start_at)
}
/// Execute given closure for all keys starting with prefix.
pub fn for_child_keys_with_prefix(
&self,
child_info: &ChildInfo,
prefix: &[u8],
mut f: impl FnMut(&[u8]),
) {
let root = match self.child_root(child_info) {
Ok(Some(v)) => v,
// If the child trie doesn't exist, there is no need to continue.
Ok(None) => return,
Err(e) => {
debug!(target: "trie", "Error while iterating child storage: {}", e);
return
},
};
self.trie_iter_key_inner(
&root,
Some(prefix),
|k| {
f(k);
true
},
Some(child_info),
None,
)
}
/// Execute given closure for all keys starting with prefix.
pub fn for_keys_with_prefix<F: FnMut(&[u8])>(&self, prefix: &[u8], mut f: F) {
self.trie_iter_key_inner(
&self.root,
Some(prefix),
|k| {
f(k);
true
},
None,
None,
)
}
fn trie_iter_key_inner<F: FnMut(&[u8]) -> bool>(
&self,
root: &H::Out,
maybe_prefix: Option<&[u8]>,
mut f: F,
child_info: Option<&ChildInfo>,
maybe_start_at: Option<&[u8]>,
) {
let mut iter = move |db| -> sp_std::result::Result<(), Box<TrieError<H::Out>>> {
self.with_recorder_and_cache(Some(*root), |recorder, cache| {
let trie = TrieDBBuilder::<H>::new(db, root)
.with_optional_recorder(recorder)
.with_optional_cache(cache)
.build();
let prefix = maybe_prefix.unwrap_or(&[]);
let iter = match maybe_start_at {
Some(start_at) =>
TrieDBKeyIterator::new_prefixed_then_seek(&trie, prefix, start_at),
None => TrieDBKeyIterator::new_prefixed(&trie, prefix),
}?;
for x in iter {
let key = x?;
debug_assert!(maybe_prefix
.as_ref()
.map(|prefix| key.starts_with(prefix))
.unwrap_or(true));
if !f(&key) {
break
}
}
Ok(())
})
};
let result = if let Some(child_info) = child_info {
let db = KeySpacedDB::new(self, child_info.keyspace());
iter(&db)
} else {
iter(self)
};
if let Err(e) = result {
debug!(target: "trie", "Error while iterating by prefix: {}", e);
if self.root == Default::default() {
// A special-case for an empty storage root.
return Ok(Default::default())
}
}
fn trie_iter_inner<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
root: &H::Out,
prefix: Option<&[u8]>,
mut f: F,
child_info: Option<&ChildInfo>,
start_at: Option<&[u8]>,
allow_missing_nodes: bool,
) -> Result<bool> {
let mut iter = move |db| -> sp_std::result::Result<bool, Box<TrieError<H::Out>>> {
self.with_recorder_and_cache(Some(*root), |recorder, cache| {
let trie = TrieDBBuilder::<H>::new(db, root)
.with_optional_recorder(recorder)
.with_optional_cache(cache)
.build();
let prefix = prefix.unwrap_or(&[]);
let iterator = if let Some(start_at) = start_at {
TrieDBIterator::new_prefixed_then_seek(&trie, prefix, start_at)?
let trie_iter = self
.with_trie_db(root, args.child_info.as_ref(), |db| {
let prefix = args.prefix.as_deref().unwrap_or(&[]);
if let Some(start_at) = args.start_at {
TrieDBRawIterator::new_prefixed_then_seek(db, prefix, &start_at)
} else {
TrieDBIterator::new_prefixed(&trie, prefix)?
};
for x in iterator {
let (key, value) = x?;
debug_assert!(key.starts_with(prefix));
if !f(key, value) {
return Ok(false)
}
TrieDBRawIterator::new_prefixed(db, prefix)
}
Ok(true)
})
};
.map_err(|e| format!("TrieDB iteration error: {}", e))?;
let result = if let Some(child_info) = child_info {
let db = KeySpacedDB::new(self, child_info.keyspace());
iter(&db)
} else {
iter(self)
};
match result {
Ok(completed) => Ok(completed),
Err(e) if matches!(*e, TrieError::IncompleteDatabase(_)) && allow_missing_nodes =>
Ok(false),
Err(e) => Err(format!("TrieDB iteration error: {}", e)),
}
}
/// Execute given closure for all key and values starting with prefix.
pub fn for_key_values_with_prefix<F: FnMut(&[u8], &[u8])>(&self, prefix: &[u8], mut f: F) {
let _ = self.trie_iter_inner(
&self.root,
Some(prefix),
|k, v| {
f(&k, &v);
true
},
None,
None,
false,
);
}
/// Returns all `(key, value)` pairs in the trie.
pub fn pairs(&self) -> Vec<(StorageKey, StorageValue)> {
let collect_all = || -> sp_std::result::Result<_, Box<TrieError<H::Out>>> {
self.with_recorder_and_cache(None, |recorder, cache| {
let trie = TrieDBBuilder::<H>::new(self, self.root())
.with_optional_cache(cache)
.with_optional_recorder(recorder)
.build();
let mut v = Vec::new();
for x in trie.iter()? {
let (key, value) = x?;
v.push((key.to_vec(), value.to_vec()));
}
Ok(v)
})
};
match collect_all() {
Ok(v) => v,
Err(e) => {
debug!(target: "trie", "Error extracting trie values: {}", e);
Vec::new()
},
}
}
/// Returns all keys that start with the given `prefix`.
pub fn keys(&self, prefix: &[u8]) -> Vec<StorageKey> {
let mut keys = Vec::new();
self.for_keys_with_prefix(prefix, |k| keys.push(k.to_vec()));
keys
Ok(RawIter {
stop_on_incomplete_database: args.stop_on_incomplete_database,
child_info: args.child_info,
root,
trie_iter,
state: IterState::Pending,
_phantom: Default::default(),
})
}
/// Return the storage root after applying the given `delta`.