mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 08:51:09 +00:00
Rework the trie cache (#12982)
* Rework the trie cache * Align `state-machine` tests * Bump `schnellru` to 0.1.1 * Fix off-by-one * Align to review comments * Bump `ahash` to 0.8.2 * Bump `schnellru` to 0.2.0 * Bump `schnellru` to 0.2.1 * Remove unnecessary bound * Remove unnecessary loop when calculating maximum memory usage * Remove unnecessary `mut`s
This commit is contained in:
Generated
+4
-4
@@ -8383,7 +8383,7 @@ dependencies = [
|
||||
name = "sc-finality-grandpa"
|
||||
version = "0.10.0-dev"
|
||||
dependencies = [
|
||||
"ahash 0.7.6",
|
||||
"ahash 0.8.2",
|
||||
"array-bytes",
|
||||
"assert_matches",
|
||||
"async-trait",
|
||||
@@ -8583,7 +8583,7 @@ dependencies = [
|
||||
name = "sc-network-gossip"
|
||||
version = "0.10.0-dev"
|
||||
dependencies = [
|
||||
"ahash 0.7.6",
|
||||
"ahash 0.8.2",
|
||||
"futures",
|
||||
"futures-timer",
|
||||
"libp2p",
|
||||
@@ -10352,18 +10352,18 @@ dependencies = [
|
||||
name = "sp-trie"
|
||||
version = "7.0.0"
|
||||
dependencies = [
|
||||
"ahash 0.7.6",
|
||||
"ahash 0.8.2",
|
||||
"array-bytes",
|
||||
"criterion",
|
||||
"hash-db",
|
||||
"hashbrown 0.12.3",
|
||||
"lazy_static",
|
||||
"lru",
|
||||
"memory-db",
|
||||
"nohash-hasher",
|
||||
"parity-scale-codec",
|
||||
"parking_lot 0.12.1",
|
||||
"scale-info",
|
||||
"schnellru",
|
||||
"sp-core",
|
||||
"sp-runtime",
|
||||
"sp-std",
|
||||
|
||||
@@ -110,7 +110,7 @@ impl<B: BlockT> BenchmarkingState<B> {
|
||||
proof_recorder_root: Cell::new(root),
|
||||
enable_tracking,
|
||||
// Enable the cache, but do not sync anything to the shared state.
|
||||
shared_trie_cache: SharedTrieCache::new(CacheSize::Maximum(0)),
|
||||
shared_trie_cache: SharedTrieCache::new(CacheSize::new(0)),
|
||||
};
|
||||
|
||||
state.add_whitelist_to_tracker();
|
||||
|
||||
@@ -1243,7 +1243,7 @@ impl<Block: BlockT> Backend<Block> {
|
||||
blocks_pruning: config.blocks_pruning,
|
||||
genesis_state: RwLock::new(None),
|
||||
shared_trie_cache: config.trie_cache_maximum_size.map(|maximum_size| {
|
||||
SharedTrieCache::new(sp_trie::cache::CacheSize::Maximum(maximum_size))
|
||||
SharedTrieCache::new(sp_trie::cache::CacheSize::new(maximum_size))
|
||||
}),
|
||||
};
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ readme = "README.md"
|
||||
targets = ["x86_64-unknown-linux-gnu"]
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.7.6"
|
||||
ahash = "0.8.2"
|
||||
array-bytes = "4.1"
|
||||
async-trait = "0.1.57"
|
||||
dyn-clone = "1.0"
|
||||
|
||||
@@ -14,7 +14,7 @@ readme = "README.md"
|
||||
targets = ["x86_64-unknown-linux-gnu"]
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.7.6"
|
||||
ahash = "0.8.2"
|
||||
futures = "0.3.21"
|
||||
futures-timer = "3.0.1"
|
||||
libp2p = "0.50.0"
|
||||
|
||||
@@ -412,19 +412,19 @@ pub mod tests {
|
||||
fn $name() {
|
||||
let parameters = vec![
|
||||
(StateVersion::V0, None, None),
|
||||
(StateVersion::V0, Some(SharedCache::new(CacheSize::Unlimited)), None),
|
||||
(StateVersion::V0, Some(SharedCache::new(CacheSize::unlimited())), None),
|
||||
(StateVersion::V0, None, Some(Recorder::default())),
|
||||
(
|
||||
StateVersion::V0,
|
||||
Some(SharedCache::new(CacheSize::Unlimited)),
|
||||
Some(SharedCache::new(CacheSize::unlimited())),
|
||||
Some(Recorder::default()),
|
||||
),
|
||||
(StateVersion::V1, None, None),
|
||||
(StateVersion::V1, Some(SharedCache::new(CacheSize::Unlimited)), None),
|
||||
(StateVersion::V1, Some(SharedCache::new(CacheSize::unlimited())), None),
|
||||
(StateVersion::V1, None, Some(Recorder::default())),
|
||||
(
|
||||
StateVersion::V1,
|
||||
Some(SharedCache::new(CacheSize::Unlimited)),
|
||||
Some(SharedCache::new(CacheSize::unlimited())),
|
||||
Some(Recorder::default()),
|
||||
),
|
||||
];
|
||||
@@ -760,7 +760,7 @@ pub mod tests {
|
||||
.clone()
|
||||
.for_each(|i| assert_eq!(trie.storage(&[i]).unwrap().unwrap(), vec![i; size_content]));
|
||||
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::Unlimited)), None] {
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::unlimited())), None] {
|
||||
// Run multiple times to have a different cache conditions.
|
||||
for i in 0..5 {
|
||||
if let Some(cache) = &cache {
|
||||
@@ -793,7 +793,7 @@ pub mod tests {
|
||||
proof_record_works_with_iter_inner(StateVersion::V1);
|
||||
}
|
||||
fn proof_record_works_with_iter_inner(state_version: StateVersion) {
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::Unlimited)), None] {
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::unlimited())), None] {
|
||||
// Run multiple times to have a different cache conditions.
|
||||
for i in 0..5 {
|
||||
if let Some(cache) = &cache {
|
||||
@@ -870,7 +870,7 @@ pub mod tests {
|
||||
assert_eq!(in_memory.child_storage(child_info_2, &[i]).unwrap().unwrap(), vec![i])
|
||||
});
|
||||
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::Unlimited)), None] {
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::unlimited())), None] {
|
||||
// Run multiple times to have a different cache conditions.
|
||||
for i in 0..5 {
|
||||
eprintln!("Running with cache {}, iteration {}", cache.is_some(), i);
|
||||
@@ -1002,7 +1002,7 @@ pub mod tests {
|
||||
nodes
|
||||
};
|
||||
|
||||
let cache = SharedTrieCache::<BlakeTwo256>::new(CacheSize::Unlimited);
|
||||
let cache = SharedTrieCache::<BlakeTwo256>::new(CacheSize::unlimited());
|
||||
{
|
||||
let local_cache = cache.local_cache();
|
||||
let mut trie_cache = local_cache.as_trie_db_cache(child_1_root);
|
||||
@@ -1093,7 +1093,7 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn new_data_is_added_to_the_cache() {
|
||||
let shared_cache = SharedTrieCache::new(CacheSize::Unlimited);
|
||||
let shared_cache = SharedTrieCache::new(CacheSize::unlimited());
|
||||
let new_data = vec![
|
||||
(&b"new_data0"[..], Some(&b"0"[..])),
|
||||
(&b"new_data1"[..], Some(&b"1"[..])),
|
||||
@@ -1159,7 +1159,7 @@ pub mod tests {
|
||||
assert_eq!(in_memory.child_storage(child_info_1, &key).unwrap().unwrap(), child_trie_1_val);
|
||||
assert_eq!(in_memory.child_storage(child_info_2, &key).unwrap().unwrap(), child_trie_2_val);
|
||||
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::Unlimited)), None] {
|
||||
for cache in [Some(SharedTrieCache::new(CacheSize::unlimited())), None] {
|
||||
// Run multiple times to have a different cache conditions.
|
||||
for i in 0..5 {
|
||||
eprintln!("Running with cache {}, iteration {}", cache.is_some(), i);
|
||||
|
||||
@@ -18,12 +18,11 @@ name = "bench"
|
||||
harness = false
|
||||
|
||||
[dependencies]
|
||||
ahash = { version = "0.7.6", optional = true }
|
||||
ahash = { version = "0.8.2", optional = true }
|
||||
codec = { package = "parity-scale-codec", version = "3.2.2", default-features = false }
|
||||
hashbrown = { version = "0.12.3", optional = true }
|
||||
hash-db = { version = "0.15.2", default-features = false }
|
||||
lazy_static = { version = "1.4.0", optional = true }
|
||||
lru = { version = "0.8.1", optional = true }
|
||||
memory-db = { version = "0.31.0", default-features = false }
|
||||
nohash-hasher = { version = "0.2.0", optional = true }
|
||||
parking_lot = { version = "0.12.1", optional = true }
|
||||
@@ -34,6 +33,7 @@ trie-db = { version = "0.24.0", default-features = false }
|
||||
trie-root = { version = "0.17.0", default-features = false }
|
||||
sp-core = { version = "7.0.0", default-features = false, path = "../core" }
|
||||
sp-std = { version = "5.0.0", default-features = false, path = "../std" }
|
||||
schnellru = { version = "0.2.1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
array-bytes = "4.1"
|
||||
@@ -50,7 +50,7 @@ std = [
|
||||
"hashbrown",
|
||||
"hash-db/std",
|
||||
"lazy_static",
|
||||
"lru",
|
||||
"schnellru",
|
||||
"memory-db/std",
|
||||
"nohash-hasher",
|
||||
"parking_lot",
|
||||
|
||||
+435
-147
@@ -36,13 +36,17 @@
|
||||
|
||||
use crate::{Error, NodeCodec};
|
||||
use hash_db::Hasher;
|
||||
use hashbrown::HashSet;
|
||||
use nohash_hasher::BuildNoHashHasher;
|
||||
use parking_lot::{Mutex, MutexGuard, RwLockReadGuard};
|
||||
use shared_cache::{SharedValueCache, ValueCacheKey};
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use schnellru::LruMap;
|
||||
use shared_cache::{ValueCacheKey, ValueCacheRef};
|
||||
use std::{
|
||||
collections::{hash_map::Entry as MapEntry, HashMap},
|
||||
sync::Arc,
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use trie_db::{node::NodeOwned, CachedValue};
|
||||
|
||||
@@ -50,29 +54,267 @@ mod shared_cache;
|
||||
|
||||
pub use shared_cache::SharedTrieCache;
|
||||
|
||||
use self::shared_cache::{SharedTrieCacheInner, ValueCacheKeyHash};
|
||||
use self::shared_cache::ValueCacheKeyHash;
|
||||
|
||||
const LOG_TARGET: &str = "trie-cache";
|
||||
|
||||
/// The size of the cache.
|
||||
/// The maximum amount of time we'll wait trying to acquire the shared cache lock
|
||||
/// when the local cache is dropped and synchronized with the share cache.
|
||||
///
|
||||
/// This is just a failsafe; normally this should never trigger.
|
||||
const SHARED_CACHE_WRITE_LOCK_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
|
||||
/// The maximum number of existing keys in the shared cache that a single local cache
|
||||
/// can promote to the front of the LRU cache in one go.
|
||||
///
|
||||
/// If we have a big shared cache and the local cache hits all of those keys we don't
|
||||
/// want to spend forever bumping all of them.
|
||||
const SHARED_NODE_CACHE_MAX_PROMOTED_KEYS: u32 = 1792;
|
||||
/// Same as [`SHARED_NODE_CACHE_MAX_PROMOTED_KEYS`].
|
||||
const SHARED_VALUE_CACHE_MAX_PROMOTED_KEYS: u32 = 1792;
|
||||
|
||||
/// The maximum portion of the shared cache (in percent) that a single local
|
||||
/// cache can replace in one go.
|
||||
///
|
||||
/// We don't want a single local cache instance to have the ability to replace
|
||||
/// everything in the shared cache.
|
||||
const SHARED_NODE_CACHE_MAX_REPLACE_PERCENT: usize = 33;
|
||||
/// Same as [`SHARED_NODE_CACHE_MAX_REPLACE_PERCENT`].
|
||||
const SHARED_VALUE_CACHE_MAX_REPLACE_PERCENT: usize = 33;
|
||||
|
||||
/// The maximum inline capacity of the local cache, in bytes.
|
||||
///
|
||||
/// This is just an upper limit; since the maps are resized in powers of two
|
||||
/// their actual size will most likely not exactly match this.
|
||||
const LOCAL_NODE_CACHE_MAX_INLINE_SIZE: usize = 512 * 1024;
|
||||
/// Same as [`LOCAL_NODE_CACHE_MAX_INLINE_SIZE`].
|
||||
const LOCAL_VALUE_CACHE_MAX_INLINE_SIZE: usize = 512 * 1024;
|
||||
|
||||
/// The maximum size of the memory allocated on the heap by the local cache, in bytes.
|
||||
const LOCAL_NODE_CACHE_MAX_HEAP_SIZE: usize = 2 * 1024 * 1024;
|
||||
/// Same as [`LOCAL_NODE_CACHE_MAX_HEAP_SIZE`].
|
||||
const LOCAL_VALUE_CACHE_MAX_HEAP_SIZE: usize = 4 * 1024 * 1024;
|
||||
|
||||
/// The size of the shared cache.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum CacheSize {
|
||||
/// Do not limit the cache size.
|
||||
Unlimited,
|
||||
/// Let the cache in maximum use the given amount of bytes.
|
||||
Maximum(usize),
|
||||
}
|
||||
pub struct CacheSize(usize);
|
||||
|
||||
impl CacheSize {
|
||||
/// Returns `true` if the `current_size` exceeds the allowed size.
|
||||
fn exceeds(&self, current_size: usize) -> bool {
|
||||
match self {
|
||||
Self::Unlimited => false,
|
||||
Self::Maximum(max) => *max < current_size,
|
||||
/// An unlimited cache size.
|
||||
pub const fn unlimited() -> Self {
|
||||
CacheSize(usize::MAX)
|
||||
}
|
||||
|
||||
/// A cache size `bytes` big.
|
||||
pub const fn new(bytes: usize) -> Self {
|
||||
CacheSize(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
/// A limiter for the local node cache. This makes sure the local cache doesn't grow too big.
|
||||
#[derive(Default)]
|
||||
pub struct LocalNodeCacheLimiter {
|
||||
/// The current size (in bytes) of data allocated by this cache on the heap.
|
||||
///
|
||||
/// This doesn't include the size of the map itself.
|
||||
current_heap_size: usize,
|
||||
}
|
||||
|
||||
impl<H> schnellru::Limiter<H, NodeCached<H>> for LocalNodeCacheLimiter
|
||||
where
|
||||
H: AsRef<[u8]> + std::fmt::Debug,
|
||||
{
|
||||
type KeyToInsert<'a> = H;
|
||||
type LinkType = u32;
|
||||
|
||||
#[inline]
|
||||
fn is_over_the_limit(&self, length: usize) -> bool {
|
||||
// Only enforce the limit if there's more than one element to make sure
|
||||
// we can always add a new element to the cache.
|
||||
if length <= 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
self.current_heap_size > LOCAL_NODE_CACHE_MAX_HEAP_SIZE
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_insert<'a>(
|
||||
&mut self,
|
||||
_length: usize,
|
||||
key: H,
|
||||
cached_node: NodeCached<H>,
|
||||
) -> Option<(H, NodeCached<H>)> {
|
||||
self.current_heap_size += cached_node.heap_size();
|
||||
Some((key, cached_node))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_replace(
|
||||
&mut self,
|
||||
_length: usize,
|
||||
_old_key: &mut H,
|
||||
_new_key: H,
|
||||
old_node: &mut NodeCached<H>,
|
||||
new_node: &mut NodeCached<H>,
|
||||
) -> bool {
|
||||
debug_assert_eq!(_old_key.as_ref().len(), _new_key.as_ref().len());
|
||||
self.current_heap_size =
|
||||
self.current_heap_size + new_node.heap_size() - old_node.heap_size();
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_removed(&mut self, _key: &mut H, cached_node: &mut NodeCached<H>) {
|
||||
self.current_heap_size -= cached_node.heap_size();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_cleared(&mut self) {
|
||||
self.current_heap_size = 0;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_grow(&mut self, new_memory_usage: usize) -> bool {
|
||||
new_memory_usage <= LOCAL_NODE_CACHE_MAX_INLINE_SIZE
|
||||
}
|
||||
}
|
||||
|
||||
/// A limiter for the local value cache. This makes sure the local cache doesn't grow too big.
|
||||
#[derive(Default)]
|
||||
pub struct LocalValueCacheLimiter {
|
||||
/// The current size (in bytes) of data allocated by this cache on the heap.
|
||||
///
|
||||
/// This doesn't include the size of the map itself.
|
||||
current_heap_size: usize,
|
||||
}
|
||||
|
||||
impl<H> schnellru::Limiter<ValueCacheKey<H>, CachedValue<H>> for LocalValueCacheLimiter
|
||||
where
|
||||
H: AsRef<[u8]>,
|
||||
{
|
||||
type KeyToInsert<'a> = ValueCacheRef<'a, H>;
|
||||
type LinkType = u32;
|
||||
|
||||
#[inline]
|
||||
fn is_over_the_limit(&self, length: usize) -> bool {
|
||||
// Only enforce the limit if there's more than one element to make sure
|
||||
// we can always add a new element to the cache.
|
||||
if length <= 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
self.current_heap_size > LOCAL_VALUE_CACHE_MAX_HEAP_SIZE
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_insert(
|
||||
&mut self,
|
||||
_length: usize,
|
||||
key: Self::KeyToInsert<'_>,
|
||||
value: CachedValue<H>,
|
||||
) -> Option<(ValueCacheKey<H>, CachedValue<H>)> {
|
||||
self.current_heap_size += key.storage_key.len();
|
||||
Some((key.into(), value))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_replace(
|
||||
&mut self,
|
||||
_length: usize,
|
||||
_old_key: &mut ValueCacheKey<H>,
|
||||
_new_key: ValueCacheRef<H>,
|
||||
_old_value: &mut CachedValue<H>,
|
||||
_new_value: &mut CachedValue<H>,
|
||||
) -> bool {
|
||||
debug_assert_eq!(_old_key.storage_key.len(), _new_key.storage_key.len());
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_removed(&mut self, key: &mut ValueCacheKey<H>, _: &mut CachedValue<H>) {
|
||||
self.current_heap_size -= key.storage_key.len();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_cleared(&mut self) {
|
||||
self.current_heap_size = 0;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn on_grow(&mut self, new_memory_usage: usize) -> bool {
|
||||
new_memory_usage <= LOCAL_VALUE_CACHE_MAX_INLINE_SIZE
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct to gather hit/miss stats to aid in debugging the performance of the cache.
|
||||
#[derive(Default)]
|
||||
struct HitStats {
|
||||
shared_hits: AtomicU64,
|
||||
shared_fetch_attempts: AtomicU64,
|
||||
local_hits: AtomicU64,
|
||||
local_fetch_attempts: AtomicU64,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for HitStats {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
let shared_hits = self.shared_hits.load(Ordering::Relaxed);
|
||||
let shared_fetch_attempts = self.shared_fetch_attempts.load(Ordering::Relaxed);
|
||||
let local_hits = self.local_hits.load(Ordering::Relaxed);
|
||||
let local_fetch_attempts = self.local_fetch_attempts.load(Ordering::Relaxed);
|
||||
if shared_fetch_attempts == 0 && local_hits == 0 {
|
||||
write!(fmt, "empty")
|
||||
} else {
|
||||
let percent_local = (local_hits as f32 / local_fetch_attempts as f32) * 100.0;
|
||||
let percent_shared = (shared_hits as f32 / shared_fetch_attempts as f32) * 100.0;
|
||||
write!(
|
||||
fmt,
|
||||
"local hit rate = {}% [{}/{}], shared hit rate = {}% [{}/{}]",
|
||||
percent_local as u32,
|
||||
local_hits,
|
||||
local_fetch_attempts,
|
||||
percent_shared as u32,
|
||||
shared_hits,
|
||||
shared_fetch_attempts
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct to gather hit/miss stats for the node cache and the value cache.
|
||||
#[derive(Default)]
|
||||
struct TrieHitStats {
|
||||
node_cache: HitStats,
|
||||
value_cache: HitStats,
|
||||
}
|
||||
|
||||
/// An internal struct to store the cached trie nodes.
|
||||
pub(crate) struct NodeCached<H> {
|
||||
/// The cached node.
|
||||
pub node: NodeOwned<H>,
|
||||
/// Whether this node was fetched from the shared cache or not.
|
||||
pub is_from_shared_cache: bool,
|
||||
}
|
||||
|
||||
impl<H> NodeCached<H> {
|
||||
/// Returns the number of bytes allocated on the heap by this node.
|
||||
fn heap_size(&self) -> usize {
|
||||
self.node.size_in_bytes() - std::mem::size_of::<NodeOwned<H>>()
|
||||
}
|
||||
}
|
||||
|
||||
type NodeCacheMap<H> = LruMap<H, NodeCached<H>, LocalNodeCacheLimiter, schnellru::RandomState>;
|
||||
|
||||
type ValueCacheMap<H> = LruMap<
|
||||
ValueCacheKey<H>,
|
||||
CachedValue<H>,
|
||||
LocalValueCacheLimiter,
|
||||
BuildNoHashHasher<ValueCacheKey<H>>,
|
||||
>;
|
||||
|
||||
type ValueAccessSet =
|
||||
LruMap<ValueCacheKeyHash, (), schnellru::ByLength, BuildNoHashHasher<ValueCacheKeyHash>>;
|
||||
|
||||
/// The local trie cache.
|
||||
///
|
||||
/// This cache should be used per state instance created by the backend. One state instance is
|
||||
@@ -86,21 +328,13 @@ impl CacheSize {
|
||||
pub struct LocalTrieCache<H: Hasher> {
|
||||
/// The shared trie cache that created this instance.
|
||||
shared: SharedTrieCache<H>,
|
||||
|
||||
/// The local cache for the trie nodes.
|
||||
node_cache: Mutex<HashMap<H::Out, NodeOwned<H::Out>>>,
|
||||
/// Keeps track of all the trie nodes accessed in the shared cache.
|
||||
///
|
||||
/// This will be used to ensure that these nodes are brought to the front of the lru when this
|
||||
/// local instance is merged back to the shared cache.
|
||||
shared_node_cache_access: Mutex<HashSet<H::Out>>,
|
||||
node_cache: Mutex<NodeCacheMap<H::Out>>,
|
||||
|
||||
/// The local cache for the values.
|
||||
value_cache: Mutex<
|
||||
HashMap<
|
||||
ValueCacheKey<'static, H::Out>,
|
||||
CachedValue<H::Out>,
|
||||
BuildNoHashHasher<ValueCacheKey<'static, H::Out>>,
|
||||
>,
|
||||
>,
|
||||
value_cache: Mutex<ValueCacheMap<H::Out>>,
|
||||
|
||||
/// Keeps track of all values accessed in the shared cache.
|
||||
///
|
||||
/// This will be used to ensure that these nodes are brought to the front of the lru when this
|
||||
@@ -109,8 +343,9 @@ pub struct LocalTrieCache<H: Hasher> {
|
||||
/// as we only use this set to update the lru position it is fine, even if we bring the wrong
|
||||
/// value to the top. The important part is that we always get the correct value from the value
|
||||
/// cache for a given key.
|
||||
shared_value_cache_access:
|
||||
Mutex<HashSet<ValueCacheKeyHash, BuildNoHashHasher<ValueCacheKeyHash>>>,
|
||||
shared_value_cache_access: Mutex<ValueAccessSet>,
|
||||
|
||||
stats: TrieHitStats,
|
||||
}
|
||||
|
||||
impl<H: Hasher> LocalTrieCache<H> {
|
||||
@@ -118,19 +353,18 @@ impl<H: Hasher> LocalTrieCache<H> {
|
||||
///
|
||||
/// The given `storage_root` needs to be the storage root of the trie this cache is used for.
|
||||
pub fn as_trie_db_cache(&self, storage_root: H::Out) -> TrieCache<'_, H> {
|
||||
let shared_inner = self.shared.read_lock_inner();
|
||||
|
||||
let value_cache = ValueCache::ForStorageRoot {
|
||||
storage_root,
|
||||
local_value_cache: self.value_cache.lock(),
|
||||
shared_value_cache_access: self.shared_value_cache_access.lock(),
|
||||
buffered_value: None,
|
||||
};
|
||||
|
||||
TrieCache {
|
||||
shared_inner,
|
||||
shared_cache: self.shared.clone(),
|
||||
local_cache: self.node_cache.lock(),
|
||||
value_cache,
|
||||
shared_node_cache_access: self.shared_node_cache_access.lock(),
|
||||
stats: &self.stats,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,63 +377,89 @@ impl<H: Hasher> LocalTrieCache<H> {
|
||||
/// would break because of this.
|
||||
pub fn as_trie_db_mut_cache(&self) -> TrieCache<'_, H> {
|
||||
TrieCache {
|
||||
shared_inner: self.shared.read_lock_inner(),
|
||||
shared_cache: self.shared.clone(),
|
||||
local_cache: self.node_cache.lock(),
|
||||
value_cache: ValueCache::Fresh(Default::default()),
|
||||
shared_node_cache_access: self.shared_node_cache_access.lock(),
|
||||
stats: &self.stats,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: Hasher> Drop for LocalTrieCache<H> {
|
||||
fn drop(&mut self) {
|
||||
let mut shared_inner = self.shared.write_lock_inner();
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Local node trie cache dropped: {}",
|
||||
self.stats.node_cache
|
||||
);
|
||||
|
||||
shared_inner
|
||||
.node_cache_mut()
|
||||
.update(self.node_cache.lock().drain(), self.shared_node_cache_access.lock().drain());
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Local value trie cache dropped: {}",
|
||||
self.stats.value_cache
|
||||
);
|
||||
|
||||
shared_inner
|
||||
.value_cache_mut()
|
||||
.update(self.value_cache.lock().drain(), self.shared_value_cache_access.lock().drain());
|
||||
let mut shared_inner = match self.shared.write_lock_inner() {
|
||||
Some(inner) => inner,
|
||||
None => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Timeout while trying to acquire a write lock for the shared trie cache"
|
||||
);
|
||||
return
|
||||
},
|
||||
};
|
||||
|
||||
shared_inner.node_cache_mut().update(self.node_cache.get_mut().drain());
|
||||
|
||||
shared_inner.value_cache_mut().update(
|
||||
self.value_cache.get_mut().drain(),
|
||||
self.shared_value_cache_access.get_mut().drain().map(|(key, ())| key),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// The abstraction of the value cache for the [`TrieCache`].
|
||||
enum ValueCache<'a, H> {
|
||||
enum ValueCache<'a, H: Hasher> {
|
||||
/// The value cache is fresh, aka not yet associated to any storage root.
|
||||
/// This is used for example when a new trie is being build, to cache new values.
|
||||
Fresh(HashMap<Arc<[u8]>, CachedValue<H>>),
|
||||
Fresh(HashMap<Arc<[u8]>, CachedValue<H::Out>>),
|
||||
/// The value cache is already bound to a specific storage root.
|
||||
ForStorageRoot {
|
||||
shared_value_cache_access: MutexGuard<
|
||||
'a,
|
||||
HashSet<ValueCacheKeyHash, nohash_hasher::BuildNoHashHasher<ValueCacheKeyHash>>,
|
||||
>,
|
||||
local_value_cache: MutexGuard<
|
||||
'a,
|
||||
HashMap<
|
||||
ValueCacheKey<'static, H>,
|
||||
CachedValue<H>,
|
||||
nohash_hasher::BuildNoHashHasher<ValueCacheKey<'static, H>>,
|
||||
>,
|
||||
>,
|
||||
storage_root: H,
|
||||
shared_value_cache_access: MutexGuard<'a, ValueAccessSet>,
|
||||
local_value_cache: MutexGuard<'a, ValueCacheMap<H::Out>>,
|
||||
storage_root: H::Out,
|
||||
// The shared value cache needs to be temporarily locked when reading from it
|
||||
// so we need to clone the value that is returned, but we need to be able to
|
||||
// return a reference to the value, so we just buffer it here.
|
||||
buffered_value: Option<CachedValue<H::Out>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<H: AsRef<[u8]> + std::hash::Hash + Eq + Clone + Copy> ValueCache<'_, H> {
|
||||
impl<H: Hasher> ValueCache<'_, H> {
|
||||
/// Get the value for the given `key`.
|
||||
fn get<'a>(
|
||||
&'a mut self,
|
||||
key: &[u8],
|
||||
shared_value_cache: &'a SharedValueCache<H>,
|
||||
) -> Option<&CachedValue<H>> {
|
||||
match self {
|
||||
Self::Fresh(map) => map.get(key),
|
||||
Self::ForStorageRoot { local_value_cache, shared_value_cache_access, storage_root } => {
|
||||
let key = ValueCacheKey::new_ref(key, *storage_root);
|
||||
shared_cache: &SharedTrieCache<H>,
|
||||
stats: &HitStats,
|
||||
) -> Option<&CachedValue<H::Out>> {
|
||||
stats.local_fetch_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
match self {
|
||||
Self::Fresh(map) =>
|
||||
if let Some(value) = map.get(key) {
|
||||
stats.local_hits.fetch_add(1, Ordering::Relaxed);
|
||||
Some(value)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
Self::ForStorageRoot {
|
||||
local_value_cache,
|
||||
shared_value_cache_access,
|
||||
storage_root,
|
||||
buffered_value,
|
||||
} => {
|
||||
// We first need to look up in the local cache and then the shared cache.
|
||||
// It can happen that some value is cached in the shared cache, but the
|
||||
// weak reference of the data can not be upgraded anymore. This for example
|
||||
@@ -207,35 +467,39 @@ impl<H: AsRef<[u8]> + std::hash::Hash + Eq + Clone + Copy> ValueCache<'_, H> {
|
||||
//
|
||||
// So, the logic of the trie would lookup the data and the node and store both
|
||||
// in our local caches.
|
||||
local_value_cache
|
||||
.get(unsafe {
|
||||
// SAFETY
|
||||
//
|
||||
// We need to convert the lifetime to make the compiler happy. However, as
|
||||
// we only use the `key` to looking up the value this lifetime conversion is
|
||||
// safe.
|
||||
std::mem::transmute::<&ValueCacheKey<'_, H>, &ValueCacheKey<'static, H>>(
|
||||
&key,
|
||||
)
|
||||
})
|
||||
.or_else(|| {
|
||||
shared_value_cache.get(&key).map(|v| {
|
||||
shared_value_cache_access.insert(key.get_hash());
|
||||
v
|
||||
})
|
||||
})
|
||||
|
||||
let hash = ValueCacheKey::hash_data(key, storage_root);
|
||||
|
||||
if let Some(value) = local_value_cache
|
||||
.peek_by_hash(hash.raw(), |existing_key, _| {
|
||||
existing_key.is_eq(storage_root, key)
|
||||
}) {
|
||||
stats.local_hits.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
return Some(value)
|
||||
}
|
||||
|
||||
stats.shared_fetch_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
if let Some(value) = shared_cache.peek_value_by_hash(hash, storage_root, key) {
|
||||
stats.shared_hits.fetch_add(1, Ordering::Relaxed);
|
||||
shared_value_cache_access.insert(hash, ());
|
||||
*buffered_value = Some(value.clone());
|
||||
return buffered_value.as_ref()
|
||||
}
|
||||
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert some new `value` under the given `key`.
|
||||
fn insert(&mut self, key: &[u8], value: CachedValue<H>) {
|
||||
fn insert(&mut self, key: &[u8], value: CachedValue<H::Out>) {
|
||||
match self {
|
||||
Self::Fresh(map) => {
|
||||
map.insert(key.into(), value);
|
||||
},
|
||||
Self::ForStorageRoot { local_value_cache, storage_root, .. } => {
|
||||
local_value_cache.insert(ValueCacheKey::new_value(key, *storage_root), value);
|
||||
local_value_cache.insert(ValueCacheRef::new(key, *storage_root), value);
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -247,10 +511,10 @@ impl<H: AsRef<[u8]> + std::hash::Hash + Eq + Clone + Copy> ValueCache<'_, H> {
|
||||
/// be merged back into the [`LocalTrieCache`] with [`Self::merge_into`] after all operations are
|
||||
/// done.
|
||||
pub struct TrieCache<'a, H: Hasher> {
|
||||
shared_inner: RwLockReadGuard<'a, SharedTrieCacheInner<H>>,
|
||||
shared_node_cache_access: MutexGuard<'a, HashSet<H::Out>>,
|
||||
local_cache: MutexGuard<'a, HashMap<H::Out, NodeOwned<H::Out>>>,
|
||||
value_cache: ValueCache<'a, H::Out>,
|
||||
shared_cache: SharedTrieCache<H>,
|
||||
local_cache: MutexGuard<'a, NodeCacheMap<H::Out>>,
|
||||
value_cache: ValueCache<'a, H>,
|
||||
stats: &'a TrieHitStats,
|
||||
}
|
||||
|
||||
impl<'a, H: Hasher> TrieCache<'a, H> {
|
||||
@@ -267,16 +531,11 @@ impl<'a, H: Hasher> TrieCache<'a, H> {
|
||||
let mut value_cache = local.value_cache.lock();
|
||||
let partial_hash = ValueCacheKey::hash_partial_data(&storage_root);
|
||||
|
||||
cache
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
let hash =
|
||||
ValueCacheKeyHash::from_hasher_and_storage_key(partial_hash.clone(), &k);
|
||||
(ValueCacheKey::Value { storage_key: k, storage_root, hash }, v)
|
||||
})
|
||||
.for_each(|(k, v)| {
|
||||
value_cache.insert(k, v);
|
||||
});
|
||||
cache.into_iter().for_each(|(k, v)| {
|
||||
let hash = ValueCacheKeyHash::from_hasher_and_storage_key(partial_hash.clone(), &k);
|
||||
let k = ValueCacheRef { storage_root, storage_key: &k, hash };
|
||||
value_cache.insert(k, v);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -287,53 +546,85 @@ impl<'a, H: Hasher> trie_db::TrieCache<NodeCodec<H>> for TrieCache<'a, H> {
|
||||
hash: H::Out,
|
||||
fetch_node: &mut dyn FnMut() -> trie_db::Result<NodeOwned<H::Out>, H::Out, Error<H::Out>>,
|
||||
) -> trie_db::Result<&NodeOwned<H::Out>, H::Out, Error<H::Out>> {
|
||||
if let Some(res) = self.shared_inner.node_cache().get(&hash) {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from shared cache");
|
||||
self.shared_node_cache_access.insert(hash);
|
||||
return Ok(res)
|
||||
let mut is_local_cache_hit = true;
|
||||
self.stats.node_cache.local_fetch_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// First try to grab the node from the local cache.
|
||||
let node = self.local_cache.get_or_insert_fallible(hash, || {
|
||||
is_local_cache_hit = false;
|
||||
|
||||
// It was not in the local cache; try the shared cache.
|
||||
self.stats.node_cache.shared_fetch_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
if let Some(node) = self.shared_cache.peek_node(&hash) {
|
||||
self.stats.node_cache.shared_hits.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from shared cache");
|
||||
|
||||
return Ok(NodeCached::<H::Out> { node: node.clone(), is_from_shared_cache: true })
|
||||
}
|
||||
|
||||
// It was not in the shared cache; try fetching it from the database.
|
||||
match fetch_node() {
|
||||
Ok(node) => {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from database");
|
||||
Ok(NodeCached::<H::Out> { node, is_from_shared_cache: false })
|
||||
},
|
||||
Err(error) => {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from database failed");
|
||||
Err(error)
|
||||
},
|
||||
}
|
||||
});
|
||||
|
||||
if is_local_cache_hit {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from local cache");
|
||||
self.stats.node_cache.local_hits.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
match self.local_cache.entry(hash) {
|
||||
MapEntry::Occupied(res) => {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from local cache");
|
||||
Ok(res.into_mut())
|
||||
},
|
||||
MapEntry::Vacant(vacant) => {
|
||||
let node = (*fetch_node)();
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
?hash,
|
||||
fetch_successful = node.is_ok(),
|
||||
"Node not found, needed to fetch it."
|
||||
);
|
||||
|
||||
Ok(vacant.insert(node?))
|
||||
},
|
||||
}
|
||||
Ok(&node?
|
||||
.expect("you can always insert at least one element into the local cache; qed")
|
||||
.node)
|
||||
}
|
||||
|
||||
fn get_node(&mut self, hash: &H::Out) -> Option<&NodeOwned<H::Out>> {
|
||||
if let Some(node) = self.shared_inner.node_cache().get(hash) {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Getting node from shared cache");
|
||||
self.shared_node_cache_access.insert(*hash);
|
||||
return Some(node)
|
||||
let mut is_local_cache_hit = true;
|
||||
self.stats.node_cache.local_fetch_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// First try to grab the node from the local cache.
|
||||
let cached_node = self.local_cache.get_or_insert_fallible(*hash, || {
|
||||
is_local_cache_hit = false;
|
||||
|
||||
// It was not in the local cache; try the shared cache.
|
||||
self.stats.node_cache.shared_fetch_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
if let Some(node) = self.shared_cache.peek_node(&hash) {
|
||||
self.stats.node_cache.shared_hits.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from shared cache");
|
||||
|
||||
Ok(NodeCached::<H::Out> { node: node.clone(), is_from_shared_cache: true })
|
||||
} else {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from cache failed");
|
||||
|
||||
Err(())
|
||||
}
|
||||
});
|
||||
|
||||
if is_local_cache_hit {
|
||||
tracing::trace!(target: LOG_TARGET, ?hash, "Serving node from local cache");
|
||||
self.stats.node_cache.local_hits.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
let res = self.local_cache.get(hash);
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
?hash,
|
||||
found = res.is_some(),
|
||||
"Getting node from local cache"
|
||||
);
|
||||
|
||||
res
|
||||
match cached_node {
|
||||
Ok(Some(cached_node)) => Some(&cached_node.node),
|
||||
Ok(None) => {
|
||||
unreachable!(
|
||||
"you can always insert at least one element into the local cache; qed"
|
||||
);
|
||||
},
|
||||
Err(()) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn lookup_value_for_key(&mut self, key: &[u8]) -> Option<&CachedValue<H::Out>> {
|
||||
let res = self.value_cache.get(key, self.shared_inner.value_cache());
|
||||
let res = self.value_cache.get(key, &self.shared_cache, &self.stats.value_cache);
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -352,7 +643,7 @@ impl<'a, H: Hasher> trie_db::TrieCache<NodeCodec<H>> for TrieCache<'a, H> {
|
||||
"Caching value for key",
|
||||
);
|
||||
|
||||
self.value_cache.insert(key.into(), data);
|
||||
self.value_cache.insert(key, data);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,7 +660,7 @@ mod tests {
|
||||
const TEST_DATA: &[(&[u8], &[u8])] =
|
||||
&[(b"key1", b"val1"), (b"key2", &[2; 64]), (b"key3", b"val3"), (b"key4", &[4; 64])];
|
||||
const CACHE_SIZE_RAW: usize = 1024 * 10;
|
||||
const CACHE_SIZE: CacheSize = CacheSize::Maximum(CACHE_SIZE_RAW);
|
||||
const CACHE_SIZE: CacheSize = CacheSize::new(CACHE_SIZE_RAW);
|
||||
|
||||
fn create_trie() -> (MemoryDB, TrieHash<Layout>) {
|
||||
let mut db = MemoryDB::default();
|
||||
@@ -418,7 +709,7 @@ mod tests {
|
||||
let fake_data = Bytes::from(&b"fake_data"[..]);
|
||||
|
||||
let local_cache = shared_cache.local_cache();
|
||||
shared_cache.write_lock_inner().value_cache_mut().lru.put(
|
||||
shared_cache.write_lock_inner().unwrap().value_cache_mut().lru.insert(
|
||||
ValueCacheKey::new_value(TEST_DATA[1].0, root),
|
||||
(fake_data.clone(), Default::default()).into(),
|
||||
);
|
||||
@@ -591,7 +882,7 @@ mod tests {
|
||||
.lru
|
||||
.iter()
|
||||
.map(|d| d.0)
|
||||
.all(|l| TEST_DATA.iter().any(|d| l.storage_key().unwrap() == d.0)));
|
||||
.all(|l| TEST_DATA.iter().any(|d| &*l.storage_key == d.0)));
|
||||
|
||||
// Run this in a loop. The first time we check that with the filled value cache,
|
||||
// the expected values are at the top of the LRU.
|
||||
@@ -617,7 +908,7 @@ mod tests {
|
||||
.iter()
|
||||
.take(2)
|
||||
.map(|d| d.0)
|
||||
.all(|l| { TEST_DATA.iter().take(2).any(|d| l.storage_key().unwrap() == d.0) }));
|
||||
.all(|l| { TEST_DATA.iter().take(2).any(|d| &*l.storage_key == d.0) }));
|
||||
|
||||
// Delete the value cache, so that we access the nodes.
|
||||
shared_cache.reset_value_cache();
|
||||
@@ -684,9 +975,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let node_cache_size = shared_cache.read_lock_inner().node_cache().size_in_bytes;
|
||||
let value_cache_size = shared_cache.read_lock_inner().value_cache().size_in_bytes;
|
||||
|
||||
assert!(node_cache_size + value_cache_size < CACHE_SIZE_RAW);
|
||||
assert!(shared_cache.used_memory_size() < CACHE_SIZE_RAW);
|
||||
}
|
||||
}
|
||||
|
||||
+559
-381
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user