mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 05:47:58 +00:00
Fixes storage_hash caching issue and enables better caching for Cumulus (#8518)
* Fixes `storage_hash` caching issue and enables better caching for Cumulus There was a caching issue with `storage_hash` that resulted in not reverting cached storage hashes when required. In Cumulus this resulted in nodes failing to import new blocks after a runtime upgrade, because they were using the old runtime version. Besides that, this pr optimizes for the Cumulus use case. In particular that we always import blocks first as non-best blocks and enact them later. In current version of the caching that would mean we would always throw away the complete cache of the latest imported block. Now, we always update the cache for the first block of a new block height. This enables us to use the cache if this block will enacted as best block later. If there is a fork and that is enacted as best, we revert all the changes to the cache. * Apply suggestions from code review Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com> * Indentation * Update client/db/src/storage_cache.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com> Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
@@ -25,7 +25,6 @@ use std::sync::Arc;
|
||||
use std::hash::Hash as StdHash;
|
||||
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
|
||||
use linked_hash_map::{LinkedHashMap, Entry};
|
||||
use hash_db::Hasher;
|
||||
use sp_runtime::traits::{Block as BlockT, Header, HashFor, NumberFor};
|
||||
use sp_core::hexdisplay::HexDisplay;
|
||||
use sp_core::storage::ChildInfo;
|
||||
@@ -50,6 +49,8 @@ pub struct Cache<B: BlockT> {
|
||||
lru_child_storage: LRUMap<ChildStorageKey, Option<StorageValue>>,
|
||||
/// Information on the modifications in recently committed blocks; specifically which keys
|
||||
/// changed in which block. Ordered by block number.
|
||||
///
|
||||
/// The latest committed block is always at the front.
|
||||
modifications: VecDeque<BlockChanges<B::Header>>,
|
||||
}
|
||||
|
||||
@@ -178,6 +179,7 @@ impl<B: BlockT> Cache<B> {
|
||||
for a in &m.storage {
|
||||
trace!("Reverting enacted key {:?}", HexDisplay::from(a));
|
||||
self.lru_storage.remove(a);
|
||||
self.lru_hashes.remove(a);
|
||||
}
|
||||
for a in &m.child_storage {
|
||||
trace!("Reverting enacted child key {:?}", a);
|
||||
@@ -218,6 +220,27 @@ impl<B: BlockT> Cache<B> {
|
||||
self.modifications.clear();
|
||||
}
|
||||
}
|
||||
|
||||
fn add_modifications(&mut self, block_changes: BlockChanges<B::Header>) {
|
||||
let insert_at = self.modifications.iter()
|
||||
.enumerate()
|
||||
.find(|(_, m)| m.number < block_changes.number)
|
||||
.map(|(i, _)| i);
|
||||
|
||||
trace!("Inserting modifications at {:?}", insert_at);
|
||||
if let Some(insert_at) = insert_at {
|
||||
self.modifications.insert(insert_at, block_changes);
|
||||
} else {
|
||||
self.modifications.push_back(block_changes);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns if this is the first modification at the given block height.
|
||||
///
|
||||
/// If there already exists a modification for a higher block height, `false` is returned.
|
||||
fn has_no_modification_at_block_height(&self, number: NumberFor<B>) -> bool {
|
||||
self.modifications.get(0).map(|c| c.number < number).unwrap_or(true)
|
||||
}
|
||||
}
|
||||
|
||||
pub type SharedCache<B> = Arc<Mutex<Cache<B>>>;
|
||||
@@ -247,15 +270,15 @@ pub fn new_shared_cache<B: BlockT>(
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Accumulates a list of storage changed in a block.
|
||||
struct BlockChanges<B: Header> {
|
||||
#[derive(Debug)]
|
||||
struct BlockChanges<H: Header> {
|
||||
/// Block number.
|
||||
number: B::Number,
|
||||
number: H::Number,
|
||||
/// Block hash.
|
||||
hash: B::Hash,
|
||||
hash: H::Hash,
|
||||
/// Parent block hash.
|
||||
parent: B::Hash,
|
||||
parent: H::Hash,
|
||||
/// A set of modified storage keys.
|
||||
storage: HashSet<StorageKey>,
|
||||
/// A set of modified child storage keys.
|
||||
@@ -265,7 +288,7 @@ struct BlockChanges<B: Header> {
|
||||
}
|
||||
|
||||
/// Cached values specific to a state.
|
||||
struct LocalCache<H: Hasher> {
|
||||
struct LocalCache<B: BlockT> {
|
||||
/// Storage cache.
|
||||
///
|
||||
/// `None` indicates that key is known to be missing.
|
||||
@@ -273,19 +296,41 @@ struct LocalCache<H: Hasher> {
|
||||
/// Storage hashes cache.
|
||||
///
|
||||
/// `None` indicates that key is known to be missing.
|
||||
hashes: HashMap<StorageKey, Option<H::Out>>,
|
||||
hashes: HashMap<StorageKey, Option<B::Hash>>,
|
||||
/// Child storage cache.
|
||||
///
|
||||
/// `None` indicates that key is known to be missing.
|
||||
child_storage: HashMap<ChildStorageKey, Option<StorageValue>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> LocalCache<B> {
|
||||
/// Commit all cached values to the given shared `Cache`.
|
||||
///
|
||||
/// After calling this method, the internal state is reset.
|
||||
fn commit_to(&mut self, cache: &mut Cache<B>) {
|
||||
trace!(
|
||||
"Committing {} local, {} hashes to shared cache",
|
||||
self.storage.len(),
|
||||
self.hashes.len(),
|
||||
);
|
||||
for (k, v) in self.storage.drain() {
|
||||
cache.lru_storage.add(k, v);
|
||||
}
|
||||
for (k, v) in self.child_storage.drain() {
|
||||
cache.lru_child_storage.add(k, v);
|
||||
}
|
||||
for (k, v) in self.hashes.drain() {
|
||||
cache.lru_hashes.add(k, OptionHOut(v));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache changes.
|
||||
pub struct CacheChanges<B: BlockT> {
|
||||
/// Shared canonical state cache.
|
||||
shared_cache: SharedCache<B>,
|
||||
/// Local cache of values for this state.
|
||||
local_cache: RwLock<LocalCache<HashFor<B>>>,
|
||||
local_cache: RwLock<LocalCache<B>>,
|
||||
/// Hash of the block on top of which this instance was created or
|
||||
/// `None` if cache is disabled
|
||||
pub parent_hash: Option<B::Hash>,
|
||||
@@ -351,90 +396,105 @@ impl<B: BlockT> CacheChanges<B> {
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let has_no_modification_at_block_height = if let Some(num) = commit_number {
|
||||
cache.has_no_modification_at_block_height(num)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let mut retracted = std::borrow::Cow::Borrowed(retracted);
|
||||
if let Some(commit_hash) = &commit_hash {
|
||||
if let Some(m) = cache.modifications.iter_mut().find(|m| &m.hash == commit_hash) {
|
||||
if m.is_canon != is_best {
|
||||
// Same block comitted twice with different state changes.
|
||||
// Treat it as reenacted/retracted.
|
||||
if is_best {
|
||||
enacted.push(commit_hash.clone());
|
||||
} else {
|
||||
retracted.to_mut().push(commit_hash.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
cache.sync(&enacted, &retracted);
|
||||
// Propagate cache only if committing on top of the latest canonical state
|
||||
// blocks are ordered by number and only one block with a given number is marked as canonical
|
||||
// (contributed to canonical state cache)
|
||||
if let Some(_) = self.parent_hash {
|
||||
let mut local_cache = self.local_cache.write();
|
||||
if is_best {
|
||||
trace!(
|
||||
"Committing {} local, {} hashes, {} modified root entries, {} modified child entries",
|
||||
local_cache.storage.len(),
|
||||
local_cache.hashes.len(),
|
||||
changes.len(),
|
||||
child_changes.iter().map(|v|v.1.len()).sum::<usize>(),
|
||||
);
|
||||
for (k, v) in local_cache.storage.drain() {
|
||||
cache.lru_storage.add(k, v);
|
||||
}
|
||||
for (k, v) in local_cache.child_storage.drain() {
|
||||
cache.lru_child_storage.add(k, v);
|
||||
}
|
||||
for (k, v) in local_cache.hashes.drain() {
|
||||
cache.lru_hashes.add(k, OptionHOut(v));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let (
|
||||
Some(ref number), Some(ref hash), Some(ref parent))
|
||||
= (commit_number, commit_hash, self.parent_hash)
|
||||
let (update_cache, modification_index) = if let Some((i, m)) = commit_hash
|
||||
.as_ref()
|
||||
.and_then(|ch| cache.modifications.iter_mut().enumerate().find(|m| &m.1.hash == ch))
|
||||
{
|
||||
if cache.modifications.len() == STATE_CACHE_BLOCKS {
|
||||
cache.modifications.pop_back();
|
||||
}
|
||||
let mut modifications = HashSet::new();
|
||||
let mut child_modifications = HashSet::new();
|
||||
child_changes.into_iter().for_each(|(sk, changes)|
|
||||
for (k, v) in changes.into_iter() {
|
||||
let k = (sk.clone(), k);
|
||||
if is_best {
|
||||
cache.lru_child_storage.add(k.clone(), v);
|
||||
}
|
||||
child_modifications.insert(k);
|
||||
let res = if m.is_canon != is_best {
|
||||
if is_best && i == 0 {
|
||||
// The block was imported as the first block of a height as non-best block.
|
||||
// Now it is enacted as best block and we need to update the modifications with
|
||||
// these informations.
|
||||
m.is_canon = is_best;
|
||||
|
||||
// If this is the best block now and also the latest we have imported,
|
||||
// we only need to update the cache if there are any new changes.
|
||||
!changes.is_empty() || !child_changes.is_empty()
|
||||
} else if is_best {
|
||||
enacted.push(m.hash.clone());
|
||||
true
|
||||
} else {
|
||||
retracted.to_mut().push(m.hash.clone());
|
||||
true
|
||||
}
|
||||
);
|
||||
for (k, v) in changes.into_iter() {
|
||||
if is_best {
|
||||
cache.lru_hashes.remove(&k);
|
||||
cache.lru_storage.add(k.clone(), v);
|
||||
}
|
||||
modifications.insert(k);
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
(res, Some(i))
|
||||
} else {
|
||||
(true, None)
|
||||
};
|
||||
|
||||
cache.sync(&enacted, &retracted);
|
||||
|
||||
if let (Some(ref parent_hash), true) = (self.parent_hash, update_cache) {
|
||||
let commit_to_shared_cache = is_best || has_no_modification_at_block_height;
|
||||
// Propagate cache only if committing on top of the latest canonical state
|
||||
// blocks are ordered by number and only one block with a given number is
|
||||
// marked as canonical (contributed to canonical state cache)
|
||||
if commit_to_shared_cache {
|
||||
self.local_cache.write().commit_to(cache);
|
||||
}
|
||||
|
||||
// Save modified storage. These are ordered by the block number in reverse.
|
||||
let block_changes = BlockChanges {
|
||||
storage: modifications,
|
||||
child_storage: child_modifications,
|
||||
number: *number,
|
||||
hash: hash.clone(),
|
||||
is_canon: is_best,
|
||||
parent: parent.clone(),
|
||||
};
|
||||
let insert_at = cache.modifications.iter()
|
||||
.enumerate()
|
||||
.find(|(_, m)| m.number < *number)
|
||||
.map(|(i, _)| i);
|
||||
trace!("Inserting modifications at {:?}", insert_at);
|
||||
if let Some(insert_at) = insert_at {
|
||||
cache.modifications.insert(insert_at, block_changes);
|
||||
} else {
|
||||
cache.modifications.push_back(block_changes);
|
||||
if let (Some(ref number), Some(hash)) = (commit_number, commit_hash) {
|
||||
if commit_to_shared_cache {
|
||||
trace!(
|
||||
"Committing {} modified root entries, {} modified child entries to shared cache",
|
||||
changes.len(),
|
||||
child_changes.iter().map(|v|v.1.len()).sum::<usize>(),
|
||||
);
|
||||
}
|
||||
|
||||
if cache.modifications.len() == STATE_CACHE_BLOCKS {
|
||||
cache.modifications.pop_back();
|
||||
}
|
||||
let mut modifications = HashSet::new();
|
||||
let mut child_modifications = HashSet::new();
|
||||
child_changes.into_iter().for_each(|(sk, changes)|
|
||||
for (k, v) in changes.into_iter() {
|
||||
let k = (sk.clone(), k);
|
||||
if commit_to_shared_cache {
|
||||
cache.lru_child_storage.add(k.clone(), v);
|
||||
}
|
||||
child_modifications.insert(k);
|
||||
}
|
||||
);
|
||||
for (k, v) in changes.into_iter() {
|
||||
if commit_to_shared_cache {
|
||||
cache.lru_hashes.remove(&k);
|
||||
cache.lru_storage.add(k.clone(), v);
|
||||
}
|
||||
modifications.insert(k);
|
||||
}
|
||||
|
||||
if let Some(modification_index) = modification_index {
|
||||
trace!("Modifying modifications at {}", modification_index);
|
||||
// Only modify the already stored block changes.
|
||||
let mut block_changes = &mut cache.modifications[modification_index];
|
||||
block_changes.is_canon = is_best;
|
||||
block_changes.storage.extend(modifications);
|
||||
block_changes.child_storage.extend(child_modifications);
|
||||
} else {
|
||||
// Save modified storage. These are ordered by the block number in reverse.
|
||||
let block_changes = BlockChanges {
|
||||
storage: modifications,
|
||||
child_storage: child_modifications,
|
||||
number: *number,
|
||||
hash,
|
||||
is_canon: is_best,
|
||||
parent: parent_hash.clone(),
|
||||
};
|
||||
|
||||
cache.add_modifications(block_changes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -473,7 +533,10 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
|
||||
) -> bool {
|
||||
let mut parent = match *parent_hash {
|
||||
None => {
|
||||
trace!("Cache lookup skipped for {:?}: no parent hash", key.as_ref().map(HexDisplay::from));
|
||||
trace!(
|
||||
"Cache lookup skipped for {:?}: no parent hash",
|
||||
key.as_ref().map(HexDisplay::from),
|
||||
);
|
||||
return false;
|
||||
}
|
||||
Some(ref parent) => parent,
|
||||
@@ -492,13 +555,19 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
|
||||
}
|
||||
if let Some(key) = key {
|
||||
if m.storage.contains(key) {
|
||||
trace!("Cache lookup skipped for {:?}: modified in a later block", HexDisplay::from(&key));
|
||||
trace!(
|
||||
"Cache lookup skipped for {:?}: modified in a later block",
|
||||
HexDisplay::from(&key),
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if let Some(child_key) = child_key {
|
||||
if m.child_storage.contains(child_key) {
|
||||
trace!("Cache lookup skipped for {:?}: modified in a later block", child_key);
|
||||
trace!(
|
||||
"Cache lookup skipped for {:?}: modified in a later block",
|
||||
child_key,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1378,7 +1447,7 @@ mod tests {
|
||||
false,
|
||||
);
|
||||
|
||||
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![1]));
|
||||
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![2]));
|
||||
|
||||
let mut s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
@@ -1398,6 +1467,121 @@ mod tests {
|
||||
);
|
||||
assert_eq!(s.storage(&key).unwrap(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_multiple_forks_as_non_best_caches_first_fork() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let root_parent = H256::random();
|
||||
let key = H256::random()[..].to_vec();
|
||||
let h1 = H256::random();
|
||||
let h2a = H256::random();
|
||||
let h2b = H256::random();
|
||||
let h2c = H256::random();
|
||||
|
||||
for (commit_as_best, cached_value) in vec![(h2a, Some(vec![2])), (h2b, None), (h2c, None)] {
|
||||
let shared = new_shared_cache::<Block>(256*1024, (0,1));
|
||||
|
||||
let mut s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
shared.clone(),
|
||||
Some(root_parent),
|
||||
);
|
||||
s.cache.sync_cache(
|
||||
&[],
|
||||
&[],
|
||||
vec![(key.clone(), Some(vec![1]))],
|
||||
vec![],
|
||||
Some(h1),
|
||||
Some(1),
|
||||
true,
|
||||
);
|
||||
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![1]));
|
||||
|
||||
{
|
||||
let mut s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
shared.clone(),
|
||||
Some(h1),
|
||||
);
|
||||
|
||||
// commit all forks as non-best
|
||||
s.cache.sync_cache(
|
||||
&[],
|
||||
&[],
|
||||
vec![(key.clone(), Some(vec![2]))],
|
||||
vec![],
|
||||
Some(h2a),
|
||||
Some(2),
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let mut s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
shared.clone(),
|
||||
Some(h1),
|
||||
);
|
||||
|
||||
s.cache.sync_cache(
|
||||
&[],
|
||||
&[],
|
||||
vec![(key.clone(), Some(vec![3]))],
|
||||
vec![],
|
||||
Some(h2b),
|
||||
Some(2),
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let mut s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
shared.clone(),
|
||||
Some(h1),
|
||||
);
|
||||
|
||||
s.cache.sync_cache(
|
||||
&[],
|
||||
&[],
|
||||
vec![(key.clone(), Some(vec![4]))],
|
||||
vec![],
|
||||
Some(h2c),
|
||||
Some(2),
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
// We should have the value of the first block cached.
|
||||
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![2]));
|
||||
|
||||
let mut s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
shared.clone(),
|
||||
Some(h1),
|
||||
);
|
||||
|
||||
// commit again as best with no changes
|
||||
s.cache.sync_cache(
|
||||
&[],
|
||||
&[],
|
||||
vec![],
|
||||
vec![],
|
||||
Some(commit_as_best),
|
||||
Some(2),
|
||||
true,
|
||||
);
|
||||
|
||||
let s = CachingState::new(
|
||||
InMemoryBackend::<BlakeTwo256>::default(),
|
||||
shared.clone(),
|
||||
Some(commit_as_best),
|
||||
);
|
||||
|
||||
assert_eq!(s.storage(&key).unwrap(), cached_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user