Revert storage cache optimization (#8535)

* Revert "Fixes `storage_hash` caching issue and enables better caching for Cumulus (#8518)"

This reverts commit 85eef08bf23453a06758acbb4b17068ca982b8a2.

* Fix reverting storage_hash

* Restore test
This commit is contained in:
Arkadiy Paronyan
2021-04-06 14:04:32 +03:00
committed by GitHub
parent bf8a1d8a1a
commit d51127f956
+133 -274
View File
@@ -25,6 +25,7 @@ use std::sync::Arc;
use std::hash::Hash as StdHash;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use linked_hash_map::{LinkedHashMap, Entry};
use hash_db::Hasher;
use sp_runtime::traits::{Block as BlockT, Header, HashFor, NumberFor};
use sp_core::hexdisplay::HexDisplay;
use sp_core::storage::ChildInfo;
@@ -49,8 +50,6 @@ pub struct Cache<B: BlockT> {
lru_child_storage: LRUMap<ChildStorageKey, Option<StorageValue>>,
/// Information on the modifications in recently committed blocks; specifically which keys
/// changed in which block. Ordered by block number.
///
/// The latest committed block is always at the front.
modifications: VecDeque<BlockChanges<B::Header>>,
}
@@ -200,6 +199,7 @@ impl<B: BlockT> Cache<B> {
for a in &m.storage {
trace!("Retracted key {:?}", HexDisplay::from(a));
self.lru_storage.remove(a);
self.lru_hashes.remove(a);
}
for a in &m.child_storage {
trace!("Retracted child key {:?}", a);
@@ -220,27 +220,6 @@ impl<B: BlockT> Cache<B> {
self.modifications.clear();
}
}
fn add_modifications(&mut self, block_changes: BlockChanges<B::Header>) {
let insert_at = self.modifications.iter()
.enumerate()
.find(|(_, m)| m.number < block_changes.number)
.map(|(i, _)| i);
trace!("Inserting modifications at {:?}", insert_at);
if let Some(insert_at) = insert_at {
self.modifications.insert(insert_at, block_changes);
} else {
self.modifications.push_back(block_changes);
}
}
/// Returns if this is the first modification at the given block height.
///
/// If there already exists a modification for a higher block height, `false` is returned.
fn has_no_modification_at_block_height(&self, number: NumberFor<B>) -> bool {
self.modifications.get(0).map(|c| c.number < number).unwrap_or(true)
}
}
pub type SharedCache<B> = Arc<Mutex<Cache<B>>>;
@@ -270,15 +249,15 @@ pub fn new_shared_cache<B: BlockT>(
)
}
/// Accumulates a list of storage changed in a block.
#[derive(Debug)]
struct BlockChanges<H: Header> {
/// Accumulates a list of storage changed in a block.
struct BlockChanges<B: Header> {
/// Block number.
number: H::Number,
number: B::Number,
/// Block hash.
hash: H::Hash,
hash: B::Hash,
/// Parent block hash.
parent: H::Hash,
parent: B::Hash,
/// A set of modified storage keys.
storage: HashSet<StorageKey>,
/// A set of modified child storage keys.
@@ -288,7 +267,7 @@ struct BlockChanges<H: Header> {
}
/// Cached values specific to a state.
struct LocalCache<B: BlockT> {
struct LocalCache<H: Hasher> {
/// Storage cache.
///
/// `None` indicates that key is known to be missing.
@@ -296,41 +275,19 @@ struct LocalCache<B: BlockT> {
/// Storage hashes cache.
///
/// `None` indicates that key is known to be missing.
hashes: HashMap<StorageKey, Option<B::Hash>>,
hashes: HashMap<StorageKey, Option<H::Out>>,
/// Child storage cache.
///
/// `None` indicates that key is known to be missing.
child_storage: HashMap<ChildStorageKey, Option<StorageValue>>,
}
impl<B: BlockT> LocalCache<B> {
/// Commit all cached values to the given shared `Cache`.
///
/// After calling this method, the internal state is reset.
fn commit_to(&mut self, cache: &mut Cache<B>) {
trace!(
"Committing {} local, {} hashes to shared cache",
self.storage.len(),
self.hashes.len(),
);
for (k, v) in self.storage.drain() {
cache.lru_storage.add(k, v);
}
for (k, v) in self.child_storage.drain() {
cache.lru_child_storage.add(k, v);
}
for (k, v) in self.hashes.drain() {
cache.lru_hashes.add(k, OptionHOut(v));
}
}
}
/// Cache changes.
pub struct CacheChanges<B: BlockT> {
/// Shared canonical state cache.
shared_cache: SharedCache<B>,
/// Local cache of values for this state.
local_cache: RwLock<LocalCache<B>>,
local_cache: RwLock<LocalCache<HashFor<B>>>,
/// Hash of the block on top of which this instance was created or
/// `None` if cache is disabled
pub parent_hash: Option<B::Hash>,
@@ -396,105 +353,90 @@ impl<B: BlockT> CacheChanges<B> {
.cloned()
.collect();
let has_no_modification_at_block_height = if let Some(num) = commit_number {
cache.has_no_modification_at_block_height(num)
} else {
false
};
let mut retracted = std::borrow::Cow::Borrowed(retracted);
let (update_cache, modification_index) = if let Some((i, m)) = commit_hash
.as_ref()
.and_then(|ch| cache.modifications.iter_mut().enumerate().find(|m| &m.1.hash == ch))
{
let res = if m.is_canon != is_best {
if is_best && i == 0 {
// The block was imported as the first block of a height as non-best block.
// Now it is enacted as best block and we need to update the modifications with
// these informations.
m.is_canon = is_best;
// If this is the best block now and also the latest we have imported,
// we only need to update the cache if there are any new changes.
!changes.is_empty() || !child_changes.is_empty()
} else if is_best {
enacted.push(m.hash.clone());
true
} else {
retracted.to_mut().push(m.hash.clone());
true
if let Some(commit_hash) = &commit_hash {
if let Some(m) = cache.modifications.iter_mut().find(|m| &m.hash == commit_hash) {
if m.is_canon != is_best {
// Same block comitted twice with different state changes.
// Treat it as reenacted/retracted.
if is_best {
enacted.push(commit_hash.clone());
} else {
retracted.to_mut().push(commit_hash.clone());
}
}
} else {
true
};
(res, Some(i))
} else {
(true, None)
};
}
}
cache.sync(&enacted, &retracted);
// Propagate cache only if committing on top of the latest canonical state
// blocks are ordered by number and only one block with a given number is marked as canonical
// (contributed to canonical state cache)
if let Some(_) = self.parent_hash {
let mut local_cache = self.local_cache.write();
if is_best {
trace!(
"Committing {} local, {} hashes, {} modified root entries, {} modified child entries",
local_cache.storage.len(),
local_cache.hashes.len(),
changes.len(),
child_changes.iter().map(|v|v.1.len()).sum::<usize>(),
);
for (k, v) in local_cache.storage.drain() {
cache.lru_storage.add(k, v);
}
for (k, v) in local_cache.child_storage.drain() {
cache.lru_child_storage.add(k, v);
}
for (k, v) in local_cache.hashes.drain() {
cache.lru_hashes.add(k, OptionHOut(v));
}
}
}
if let (Some(ref parent_hash), true) = (self.parent_hash, update_cache) {
let commit_to_shared_cache = is_best || has_no_modification_at_block_height;
// Propagate cache only if committing on top of the latest canonical state
// blocks are ordered by number and only one block with a given number is
// marked as canonical (contributed to canonical state cache)
if commit_to_shared_cache {
self.local_cache.write().commit_to(cache);
if let (
Some(ref number), Some(ref hash), Some(ref parent))
= (commit_number, commit_hash, self.parent_hash)
{
if cache.modifications.len() == STATE_CACHE_BLOCKS {
cache.modifications.pop_back();
}
let mut modifications = HashSet::new();
let mut child_modifications = HashSet::new();
child_changes.into_iter().for_each(|(sk, changes)|
for (k, v) in changes.into_iter() {
let k = (sk.clone(), k);
if is_best {
cache.lru_child_storage.add(k.clone(), v);
}
child_modifications.insert(k);
}
);
for (k, v) in changes.into_iter() {
if is_best {
cache.lru_hashes.remove(&k);
cache.lru_storage.add(k.clone(), v);
}
modifications.insert(k);
}
if let (Some(ref number), Some(hash)) = (commit_number, commit_hash) {
if commit_to_shared_cache {
trace!(
"Committing {} modified root entries, {} modified child entries to shared cache",
changes.len(),
child_changes.iter().map(|v|v.1.len()).sum::<usize>(),
);
}
if cache.modifications.len() == STATE_CACHE_BLOCKS {
cache.modifications.pop_back();
}
let mut modifications = HashSet::new();
let mut child_modifications = HashSet::new();
child_changes.into_iter().for_each(|(sk, changes)|
for (k, v) in changes.into_iter() {
let k = (sk.clone(), k);
if commit_to_shared_cache {
cache.lru_child_storage.add(k.clone(), v);
}
child_modifications.insert(k);
}
);
for (k, v) in changes.into_iter() {
if commit_to_shared_cache {
cache.lru_hashes.remove(&k);
cache.lru_storage.add(k.clone(), v);
}
modifications.insert(k);
}
if let Some(modification_index) = modification_index {
trace!("Modifying modifications at {}", modification_index);
// Only modify the already stored block changes.
let mut block_changes = &mut cache.modifications[modification_index];
block_changes.is_canon = is_best;
block_changes.storage.extend(modifications);
block_changes.child_storage.extend(child_modifications);
} else {
// Save modified storage. These are ordered by the block number in reverse.
let block_changes = BlockChanges {
storage: modifications,
child_storage: child_modifications,
number: *number,
hash,
is_canon: is_best,
parent: parent_hash.clone(),
};
cache.add_modifications(block_changes);
}
// Save modified storage. These are ordered by the block number in reverse.
let block_changes = BlockChanges {
storage: modifications,
child_storage: child_modifications,
number: *number,
hash: hash.clone(),
is_canon: is_best,
parent: parent.clone(),
};
let insert_at = cache.modifications.iter()
.enumerate()
.find(|(_, m)| m.number < *number)
.map(|(i, _)| i);
trace!("Inserting modifications at {:?}", insert_at);
if let Some(insert_at) = insert_at {
cache.modifications.insert(insert_at, block_changes);
} else {
cache.modifications.push_back(block_changes);
}
}
}
@@ -533,10 +475,7 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
) -> bool {
let mut parent = match *parent_hash {
None => {
trace!(
"Cache lookup skipped for {:?}: no parent hash",
key.as_ref().map(HexDisplay::from),
);
trace!("Cache lookup skipped for {:?}: no parent hash", key.as_ref().map(HexDisplay::from));
return false;
}
Some(ref parent) => parent,
@@ -555,19 +494,13 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
}
if let Some(key) = key {
if m.storage.contains(key) {
trace!(
"Cache lookup skipped for {:?}: modified in a later block",
HexDisplay::from(&key),
);
trace!("Cache lookup skipped for {:?}: modified in a later block", HexDisplay::from(&key));
return false;
}
}
if let Some(child_key) = child_key {
if m.child_storage.contains(child_key) {
trace!(
"Cache lookup skipped for {:?}: modified in a later block",
child_key,
);
trace!("Cache lookup skipped for {:?}: modified in a later block", child_key);
return false;
}
}
@@ -1254,6 +1187,47 @@ mod tests {
assert_eq!(s.storage(&key).unwrap().unwrap(), vec![2]);
}
#[test]
fn reverts_storage_hash() {
let root_parent = H256::random();
let key = H256::random()[..].to_vec();
let h1a = H256::random();
let h1b = H256::random();
let shared = new_shared_cache::<Block>(256*1024, (0,1));
let mut backend = InMemoryBackend::<BlakeTwo256>::default();
backend.insert(std::iter::once((None, vec![(key.clone(), Some(vec![1]))])));
let mut s = CachingState::new(
backend.clone(),
shared.clone(),
Some(root_parent),
);
s.cache.sync_cache(
&[],
&[],
vec![(key.clone(), Some(vec![2]))],
vec![],
Some(h1a),
Some(1),
true,
);
let mut s = CachingState::new(
backend.clone(),
shared.clone(),
Some(root_parent),
);
s.cache.sync_cache(&[], &[h1a], vec![], vec![], Some(h1b), Some(1), true);
let s = CachingState::new(
backend.clone(),
shared.clone(),
Some(h1b),
);
assert_eq!(s.storage_hash(&key).unwrap().unwrap(), BlakeTwo256::hash(&vec![1]));
}
#[test]
fn should_track_used_size_correctly() {
let root_parent = H256::random();
@@ -1447,7 +1421,7 @@ mod tests {
false,
);
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![2]));
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![1]));
let mut s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
@@ -1467,121 +1441,6 @@ mod tests {
);
assert_eq!(s.storage(&key).unwrap(), None);
}
#[test]
fn import_multiple_forks_as_non_best_caches_first_fork() {
sp_tracing::try_init_simple();
let root_parent = H256::random();
let key = H256::random()[..].to_vec();
let h1 = H256::random();
let h2a = H256::random();
let h2b = H256::random();
let h2c = H256::random();
for (commit_as_best, cached_value) in vec![(h2a, Some(vec![2])), (h2b, None), (h2c, None)] {
let shared = new_shared_cache::<Block>(256*1024, (0,1));
let mut s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
shared.clone(),
Some(root_parent),
);
s.cache.sync_cache(
&[],
&[],
vec![(key.clone(), Some(vec![1]))],
vec![],
Some(h1),
Some(1),
true,
);
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![1]));
{
let mut s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
shared.clone(),
Some(h1),
);
// commit all forks as non-best
s.cache.sync_cache(
&[],
&[],
vec![(key.clone(), Some(vec![2]))],
vec![],
Some(h2a),
Some(2),
false,
);
}
{
let mut s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
shared.clone(),
Some(h1),
);
s.cache.sync_cache(
&[],
&[],
vec![(key.clone(), Some(vec![3]))],
vec![],
Some(h2b),
Some(2),
false,
);
}
{
let mut s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
shared.clone(),
Some(h1),
);
s.cache.sync_cache(
&[],
&[],
vec![(key.clone(), Some(vec![4]))],
vec![],
Some(h2c),
Some(2),
false,
);
}
// We should have the value of the first block cached.
assert_eq!(shared.lock().lru_storage.get(&key).unwrap(), &Some(vec![2]));
let mut s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
shared.clone(),
Some(h1),
);
// commit again as best with no changes
s.cache.sync_cache(
&[],
&[],
vec![],
vec![],
Some(commit_as_best),
Some(2),
true,
);
let s = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
shared.clone(),
Some(commit_as_best),
);
assert_eq!(s.storage(&key).unwrap(), cached_value);
}
}
}
#[cfg(test)]