From e777038418405f4c761b8240d7536e402b931299 Mon Sep 17 00:00:00 2001 From: cheme Date: Fri, 14 Jun 2019 11:25:20 +0200 Subject: [PATCH] Add storage cache for child trie and notification internals (#2639) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * child cache, and test failing notifications * fix tests and no listen child on top wildcard * remove useless method * bump impl version * Update core/client/src/notifications.rs Co-Authored-By: Tomasz Drwięga * Update core/client/src/notifications.rs Co-Authored-By: Tomasz Drwięga * Update core/client/src/notifications.rs Co-Authored-By: Tomasz Drwięga * Update core/client/src/notifications.rs Co-Authored-By: Tomasz Drwięga * factoring notification methods to remove some redundant code. * test child sub removal * HStorage implementation and some type alias. * Remove HStorage cache: does not fit * fix removal * Make cache use byte length (shared) instead of number of kv * Make use of hashes cache in rpc * applying ratio on different lru caches * Fix format * break a line * Remove per element overhead of lru cache. * typo --- substrate/Cargo.lock | 2 +- substrate/core/client/db/Cargo.toml | 2 +- substrate/core/client/db/src/lib.rs | 62 ++- substrate/core/client/db/src/storage_cache.rs | 355 +++++++++++++----- substrate/core/client/src/backend.rs | 14 +- substrate/core/client/src/client.rs | 77 +++- substrate/core/client/src/in_mem.rs | 8 +- substrate/core/client/src/lib.rs | 2 +- substrate/core/client/src/light/backend.rs | 11 +- substrate/core/client/src/notifications.rs | 291 +++++++++++--- substrate/core/rpc/src/state/mod.rs | 30 +- substrate/core/service/src/components.rs | 4 + substrate/core/service/src/config.rs | 3 + substrate/core/service/test/src/lib.rs | 1 + substrate/core/state-machine/src/backend.rs | 5 + .../state-machine/src/overlayed_changes.rs | 8 +- 16 files changed, 679 insertions(+), 196 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 6e30fb4a75..a2b315a559 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4023,8 +4023,8 @@ dependencies = [ "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb-rocksdb 0.1.4 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", diff --git a/substrate/core/client/db/Cargo.toml b/substrate/core/client/db/Cargo.toml index e40b1568f6..bfc7108db7 100644 --- a/substrate/core/client/db/Cargo.toml +++ b/substrate/core/client/db/Cargo.toml @@ -11,7 +11,7 @@ kvdb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c # FIXME replace with release as soon as our rocksdb changes are released upstream https://github.com/paritytech/parity-common/issues/88 kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d", optional = true } kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d" } -lru-cache = "0.1.1" +linked-hash-map = "0.5" hash-db = { version = "0.12" } primitives = { package = "substrate-primitives", path = "../../primitives" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index 674e8022dc..b487770fb3 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -38,6 +38,7 @@ use std::collections::HashMap; use client::backend::NewBlockState; use client::blockchain::HeaderBackend; use client::ExecutionStrategies; +use client::backend::{StorageCollection, ChildStorageCollection}; use parity_codec::{Decode, Encode}; use hash_db::Hasher; use kvdb::{KeyValueDB, DBTransaction}; @@ -69,6 +70,9 @@ use client::in_mem::Backend as InMemoryBackend; const CANONICALIZATION_DELAY: u64 = 4096; const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768; +/// Default value for storage cache child ratio. +const DEFAULT_CHILD_RATIO: (usize, usize) = (1, 10); + /// DB-backed patricia trie state, transaction type is an overlay of changes to commit. pub type DbState = state_machine::TrieBackend>, Blake2Hasher>; @@ -169,6 +173,8 @@ pub struct DatabaseSettings { pub cache_size: Option, /// State cache size. pub state_cache_size: usize, + /// Ratio of cache size dedicated to child tries. + pub state_cache_child_ratio: Option<(usize, usize)>, /// Path to the database. pub path: PathBuf, /// Pruning mode. @@ -181,7 +187,10 @@ pub fn new_client( executor: E, genesis_storage: S, execution_strategies: ExecutionStrategies, -) -> Result, client::LocalCallExecutor, E>, Block, RA>, client::error::Error> +) -> Result< + client::Client, + client::LocalCallExecutor, E>, Block, RA>, client::error::Error +> where Block: BlockT, E: CodeExecutor + RuntimeInfo, @@ -363,7 +372,8 @@ impl client::blockchain::ProvideCache for BlockchainDb { old_state: CachingState, Block>, db_updates: PrefixedMemoryDB, - storage_updates: Vec<(Vec, Option>)>, + storage_updates: StorageCollection, + child_storage_updates: ChildStorageCollection, changes_trie_updates: MemoryDB, pending_block: Option>, aux_ops: Vec<(Vec, Option>)>, @@ -455,8 +465,13 @@ where Block: BlockT, Ok(()) } - fn update_storage(&mut self, update: Vec<(Vec, Option>)>) -> Result<(), client::error::Error> { + fn update_storage( + &mut self, + update: StorageCollection, + child_update: ChildStorageCollection, + ) -> Result<(), client::error::Error> { self.storage_updates = update; + self.child_storage_updates = child_update; Ok(()) } @@ -670,14 +685,14 @@ impl> Backend { #[cfg(feature = "kvdb-rocksdb")] fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result { let db = crate::utils::open_database(&config, columns::META, "full")?; - Backend::from_kvdb(db as Arc<_>, config.pruning, canonicalization_delay, config.state_cache_size) + Backend::from_kvdb(db as Arc<_>, canonicalization_delay, &config) } #[cfg(not(feature = "kvdb-rocksdb"))] fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result { log::warn!("Running without the RocksDB feature. The database will NOT be saved."); let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS)); - Backend::from_kvdb(db as Arc<_>, config.pruning, canonicalization_delay, config.state_cache_size) + Backend::from_kvdb(db as Arc<_>, canonicalization_delay, &config) } #[cfg(any(test, feature = "test-helpers"))] @@ -685,26 +700,36 @@ impl> Backend { use utils::NUM_COLUMNS; let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS)); + Self::new_test_db(keep_blocks, canonicalization_delay, db as Arc<_>) + } + #[cfg(any(test, feature = "test-helpers"))] + pub fn new_test_db(keep_blocks: u32, canonicalization_delay: u64, db: Arc) -> Self { + + let db_setting = DatabaseSettings { + cache_size: None, + state_cache_size: 16777216, + state_cache_child_ratio: Some((50, 100)), + path: Default::default(), + pruning: PruningMode::keep_blocks(keep_blocks), + }; Backend::from_kvdb( - db as Arc<_>, - PruningMode::keep_blocks(keep_blocks), + db, canonicalization_delay, - 16777216, + &db_setting, ).expect("failed to create test-db") } fn from_kvdb( db: Arc, - pruning: PruningMode, canonicalization_delay: u64, - state_cache_size: usize + config: &DatabaseSettings ) -> Result { - let is_archive_pruning = pruning.is_archive(); + let is_archive_pruning = config.pruning.is_archive(); let blockchain = BlockchainDb::new(db.clone())?; let meta = blockchain.meta.clone(); let map_e = |e: state_db::Error| ::client::error::Error::from(format!("State database error: {:?}", e)); - let state_db: StateDb<_, _> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?; + let state_db: StateDb<_, _> = StateDb::new(config.pruning.clone(), &StateMetaDb(&*db)).map_err(map_e)?; let storage_db = StorageDb { db: db.clone(), state_db, @@ -722,7 +747,10 @@ impl> Backend { changes_trie_config: Mutex::new(None), blockchain, canonicalization_delay, - shared_cache: new_shared_cache(state_cache_size), + shared_cache: new_shared_cache( + config.state_cache_size, + config.state_cache_child_ratio.unwrap_or(DEFAULT_CHILD_RATIO), + ), import_lock: Default::default(), }) } @@ -1094,6 +1122,7 @@ impl> Backend { &enacted, &retracted, operation.storage_updates, + operation.child_storage_updates, Some(hash), Some(number), || is_best, @@ -1200,6 +1229,7 @@ impl client::backend::Backend for Backend whe old_state, db_updates: PrefixedMemoryDB::default(), storage_updates: Default::default(), + child_storage_updates: Default::default(), changes_trie_updates: MemoryDB::default(), aux_ops: Vec::new(), finalized_blocks: Vec::new(), @@ -1282,7 +1312,7 @@ impl client::backend::Backend for Backend whe || client::error::Error::UnknownBlock( format!("Error reverting to {}. Block hash not found.", best)))?; - best -= One::one(); // prev block + best -= One::one(); // prev block let hash = self.blockchain.hash(best)?.ok_or_else( || client::error::Error::UnknownBlock( format!("Error reverting to {}. Block hash not found.", best)))?; @@ -1348,7 +1378,7 @@ impl client::backend::Backend for Backend whe fn destroy_state(&self, state: Self::State) -> Result<(), client::error::Error> { if let Some(hash) = state.cache.parent_hash.clone() { let is_best = || self.blockchain.meta.read().best_hash == hash; - state.release().sync_cache(&[], &[], vec![], None, None, is_best); + state.release().sync_cache(&[], &[], vec![], vec![], None, None, is_best); } Ok(()) } @@ -1473,7 +1503,7 @@ mod tests { db.storage.db.clone() }; - let backend = Backend::::from_kvdb(backing, PruningMode::keep_blocks(1), 0, 16777216).unwrap(); + let backend = Backend::::new_test_db(1, 0, backing); assert_eq!(backend.blockchain().info().best_number, 9); for i in 0..10 { assert!(backend.blockchain().hash(i).unwrap().is_some()) diff --git a/substrate/core/client/db/src/storage_cache.rs b/substrate/core/client/db/src/storage_cache.rs index cc4670866c..53e4594856 100644 --- a/substrate/core/client/db/src/storage_cache.rs +++ b/substrate/core/client/db/src/storage_cache.rs @@ -19,60 +19,158 @@ use std::collections::{VecDeque, HashSet, HashMap}; use std::sync::Arc; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; -use lru_cache::LruCache; +use linked_hash_map::{LinkedHashMap, Entry}; use hash_db::Hasher; use runtime_primitives::traits::{Block, Header}; use state_machine::{backend::Backend as StateBackend, TrieBackend}; use log::trace; - +use super::{StorageCollection, ChildStorageCollection}; +use std::hash::Hash as StdHash; const STATE_CACHE_BLOCKS: usize = 12; type StorageKey = Vec; +type ChildStorageKey = (Vec, Vec); type StorageValue = Vec; /// Shared canonical state cache. pub struct Cache { /// Storage cache. `None` indicates that key is known to be missing. - storage: LruCache>, + lru_storage: LRUMap>, /// Storage hashes cache. `None` indicates that key is known to be missing. - hashes: LruCache>, + lru_hashes: LRUMap>, + /// Storage cache for child trie. `None` indicates that key is known to be missing. + lru_child_storage: LRUMap>, /// Information on the modifications in recently committed blocks; specifically which keys /// changed in which block. Ordered by block number. modifications: VecDeque>, - /// Maximum cache size available, in Bytes. - shared_cache_size: usize, - /// Used storage size, in Bytes. - storage_used_size: usize, } +struct LRUMap(LinkedHashMap, usize, usize); + +/// Internal trait similar to `heapsize` but using +/// a simply estimation. +/// +/// This should not be made public, it is implementation +/// detail trait. If it need to become public please +/// consider using `malloc_size_of`. +trait EstimateSize { + /// Return a size estimation of additional size needed + /// to cache this struct (in bytes). + fn estimate_size(&self) -> usize; +} + +impl EstimateSize for Vec { + fn estimate_size(&self) -> usize { + self.capacity() + } +} + +impl EstimateSize for Option> { + fn estimate_size(&self) -> usize { + self.as_ref().map(|v|v.capacity()).unwrap_or(0) + } +} + +struct OptionHOut>(Option); + +impl> EstimateSize for OptionHOut { + fn estimate_size(&self) -> usize { + // capacity would be better + self.0.as_ref().map(|v|v.as_ref().len()).unwrap_or(0) + } +} + +impl EstimateSize for (T, T) { + fn estimate_size(&self) -> usize { + self.0.estimate_size() + self.1.estimate_size() + } +} + +impl LRUMap { + fn remove(&mut self, k: &K) { + let map = &mut self.0; + let storage_used_size = &mut self.1; + if let Some(v) = map.remove(k) { + *storage_used_size -= k.estimate_size(); + *storage_used_size -= v.estimate_size(); + } + } + + fn add(&mut self, k: K, v: V) { + let lmap = &mut self.0; + let storage_used_size = &mut self.1; + let limit = self.2; + let klen = k.estimate_size(); + *storage_used_size += v.estimate_size(); + // TODO assert k v size fit into limit?? to avoid insert remove? + match lmap.entry(k) { + Entry::Occupied(mut entry) => { + // note that in this case we are not running pure lru as + // it would require to remove first + *storage_used_size -= entry.get().estimate_size(); + entry.insert(v); + }, + Entry::Vacant(entry) => { + *storage_used_size += klen; + entry.insert(v); + }, + }; + + while *storage_used_size > limit { + if let Some((k,v)) = lmap.pop_front() { + *storage_used_size -= k.estimate_size(); + *storage_used_size -= v.estimate_size(); + } else { + // can happen fairly often as we get value from multiple lru + // and only remove from a single lru + break; + } + } + } + + fn get(&mut self, k: &Q) -> Option<&mut V> + where K: std::borrow::Borrow, + Q: StdHash + Eq { + self.0.get_refresh(k) + } + + fn used_size(&self) -> usize { + self.1 + } + fn clear(&mut self) { + self.0.clear(); + self.1 = 0; + } + +} + impl Cache { /// Returns the used memory size of the storage cache in bytes. pub fn used_storage_cache_size(&self) -> usize { - self.storage_used_size + self.lru_storage.used_size() + + self.lru_child_storage.used_size() + // ignore small hashes storage and self.lru_hashes.used_size() } } pub type SharedCache = Arc>>; -/// Create new shared cache instance with given max memory usage. -pub fn new_shared_cache(shared_cache_size: usize) -> SharedCache { - // we need to supply a max capacity to `LruCache`, but since - // we don't have any idea how large the size of each item - // that is stored will be we can't calculate the max amount - // of items properly from `shared_cache_size`. - // - // what we do instead is to supply `shared_cache_size` as the - // max upper bound capacity (this would only be reached if each - // item would be one byte). - // each time we store to the storage cache we verify the memory - // constraint and pop the lru item if space needs to be freed. +/// Fix lru storage size for hash (small 64ko). +const FIX_LRU_HASH_SIZE: usize = 65_536; +/// Create a new shared cache instance with given max memory usage. +pub fn new_shared_cache( + shared_cache_size: usize, + child_ratio: (usize, usize), +) -> SharedCache { + let top = child_ratio.1.saturating_sub(child_ratio.0); Arc::new(Mutex::new(Cache { - storage: LruCache::new(shared_cache_size), - hashes: LruCache::new(shared_cache_size), + lru_storage: LRUMap(LinkedHashMap::new(), 0, + shared_cache_size * top / child_ratio.1), + lru_hashes: LRUMap(LinkedHashMap::new(), 0, FIX_LRU_HASH_SIZE), + lru_child_storage: LRUMap(LinkedHashMap::new(), 0, + shared_cache_size * child_ratio.0 / child_ratio.1), modifications: VecDeque::new(), - shared_cache_size: shared_cache_size, - storage_used_size: 0, })) } @@ -87,6 +185,8 @@ struct BlockChanges { parent: B::Hash, /// A set of modified storage keys. storage: HashSet, + /// A set of modified child storage keys. + child_storage: HashSet, /// Block is part of the canonical chain. is_canon: bool, } @@ -97,6 +197,8 @@ struct LocalCache { storage: HashMap>, /// Storage hashes cache. `None` indicates that key is known to be missing. hashes: HashMap>, + /// Child storage cache. `None` indicates that key is known to be missing. + child_storage: HashMap>, } /// Cache changes. @@ -135,7 +237,8 @@ impl CacheChanges { &mut self, enacted: &[B::Hash], retracted: &[B::Hash], - changes: Vec<(StorageKey, Option)>, + changes: StorageCollection, + child_changes: ChildStorageCollection, commit_hash: Option, commit_number: Option<::Number>, is_best: F, @@ -155,7 +258,11 @@ impl CacheChanges { m.is_canon = true; for a in &m.storage { trace!("Reverting enacted key {:?}", a); - CacheChanges::::storage_remove(&mut cache.storage, a, &mut cache.storage_used_size); + cache.lru_storage.remove(a); + } + for a in &m.child_storage { + trace!("Reverting enacted child key {:?}", a); + cache.lru_child_storage.remove(a); } false } else { @@ -171,7 +278,11 @@ impl CacheChanges { m.is_canon = false; for a in &m.storage { trace!("Retracted key {:?}", a); - CacheChanges::::storage_remove(&mut cache.storage, a, &mut cache.storage_used_size); + cache.lru_storage.remove(a); + } + for a in &m.child_storage { + trace!("Retracted child key {:?}", a); + cache.lru_child_storage.remove(a); } false } else { @@ -182,7 +293,9 @@ impl CacheChanges { if clear { // We don't know anything about the block; clear everything trace!("Wiping cache"); - cache.storage.clear(); + cache.lru_storage.clear(); + cache.lru_child_storage.clear(); + cache.lru_hashes.clear(); cache.modifications.clear(); } @@ -192,12 +305,21 @@ impl CacheChanges { if let Some(_) = self.parent_hash { let mut local_cache = self.local_cache.write(); if is_best { - trace!("Committing {} local, {} hashes, {} modified entries", local_cache.storage.len(), local_cache.hashes.len(), changes.len()); + trace!( + "Committing {} local, {} hashes, {} modified root entries, {} modified child entries", + local_cache.storage.len(), + local_cache.hashes.len(), + changes.len(), + child_changes.iter().map(|v|v.1.len()).sum::(), + ); for (k, v) in local_cache.storage.drain() { - CacheChanges::::storage_insert(cache, k, v); + cache.lru_storage.add(k, v); + } + for (k, v) in local_cache.child_storage.drain() { + cache.lru_child_storage.add(k, v); } for (k, v) in local_cache.hashes.drain() { - cache.hashes.insert(k, v); + cache.lru_hashes.add(k, OptionHOut(v)); } } } @@ -210,16 +332,28 @@ impl CacheChanges { cache.modifications.pop_back(); } let mut modifications = HashSet::new(); - for (k, v) in changes.into_iter() { - modifications.insert(k.clone()); - if is_best { - cache.hashes.remove(&k); - CacheChanges::::storage_insert(cache, k, v); + let mut child_modifications = HashSet::new(); + child_changes.into_iter().for_each(|(sk, changes)| + for (k, v) in changes.into_iter() { + let k = (sk.clone(), k); + if is_best { + cache.lru_child_storage.add(k.clone(), v); + } + child_modifications.insert(k); } + ); + for (k, v) in changes.into_iter() { + if is_best { + cache.lru_hashes.remove(&k); + cache.lru_storage.add(k.clone(), v); + } + modifications.insert(k); } + // Save modified storage. These are ordered by the block number. let block_changes = BlockChanges { storage: modifications, + child_storage: child_modifications, number: *number, hash: hash.clone(), is_canon: is_best, @@ -238,32 +372,6 @@ impl CacheChanges { } } - fn storage_insert(cache: &mut Cache, k: StorageValue, v: Option) { - if let Some(v_) = &v { - while cache.storage_used_size + v_.len() > cache.shared_cache_size { - // pop until space constraint satisfied - match cache.storage.remove_lru() { - Some((_, Some(popped_v))) => - cache.storage_used_size = cache.storage_used_size - popped_v.len(), - Some((_, None)) => continue, - None => break, - }; - } - cache.storage_used_size = cache.storage_used_size + v_.len(); - } - cache.storage.insert(k, v); - } - - fn storage_remove( - storage: &mut LruCache>, - k: &StorageKey, - storage_used_size: &mut usize, - ) { - let v = storage.remove(k); - if let Some(Some(v_)) = v { - *storage_used_size = *storage_used_size - v_.len(); - } - } } impl, B: Block> CachingState { @@ -276,6 +384,7 @@ impl, B: Block> CachingState { local_cache: RwLock::new(LocalCache { storage: Default::default(), hashes: Default::default(), + child_storage: Default::default(), }), parent_hash: parent_hash, }, @@ -285,7 +394,8 @@ impl, B: Block> CachingState { /// Check if the key can be returned from cache by matching current block parent hash against canonical /// state and filtering out entries modified in later blocks. fn is_allowed( - key: &[u8], + key: Option<&[u8]>, + child_key: Option<&ChildStorageKey>, parent_hash: &Option, modifications: &VecDeque> @@ -314,9 +424,17 @@ impl, B: Block> CachingState { } parent = &m.parent; } - if m.storage.contains(key) { - trace!("Cache lookup skipped for {:?}: modified in a later block", key); - return false; + if let Some(key) = key { + if m.storage.contains(key) { + trace!("Cache lookup skipped for {:?}: modified in a later block", key); + return false; + } + } + if let Some(child_key) = child_key { + if m.child_storage.contains(child_key) { + trace!("Cache lookup skipped for {:?}: modified in a later block", child_key); + return false; + } } } trace!("Cache lookup skipped for {:?}: parent hash is unknown", key); @@ -330,19 +448,20 @@ impl, B: Block> CachingState { } impl, B:Block> StateBackend for CachingState { - type Error = S::Error; + type Error = S::Error; type Transaction = S::Transaction; type TrieBackendStorage = S::TrieBackendStorage; fn storage(&self, key: &[u8]) -> Result>, Self::Error> { let local_cache = self.cache.local_cache.upgradable_read(); + // Note that local cache makes that lru is not refreshed if let Some(entry) = local_cache.storage.get(key).cloned() { trace!("Found in local cache: {:?}", key); return Ok(entry) } let mut cache = self.cache.shared_cache.lock(); - if Self::is_allowed(key, &self.cache.parent_hash, &cache.modifications) { - if let Some(entry) = cache.storage.get_mut(key).map(|a| a.clone()) { + if Self::is_allowed(Some(key), None, &self.cache.parent_hash, &cache.modifications) { + if let Some(entry) = cache.lru_storage.get(key).map(|a| a.clone()) { trace!("Found in shared cache: {:?}", key); return Ok(entry) } @@ -360,8 +479,8 @@ impl, B:Block> StateBackend for CachingState, B:Block> StateBackend for CachingState Result>, Self::Error> { - self.state.child_storage(storage_key, key) + let key = (storage_key.to_vec(), key.to_vec()); + let local_cache = self.cache.local_cache.upgradable_read(); + if let Some(entry) = local_cache.child_storage.get(&key).cloned() { + trace!("Found in local cache: {:?}", key); + return Ok(entry) + } + let mut cache = self.cache.shared_cache.lock(); + if Self::is_allowed(None, Some(&key), &self.cache.parent_hash, &cache.modifications) { + if let Some(entry) = cache.lru_child_storage.get(&key).map(|a| a.clone()) { + trace!("Found in shared cache: {:?}", key); + return Ok(entry) + } + } + trace!("Cache miss: {:?}", key); + let value = self.state.child_storage(storage_key, &key.1[..])?; + RwLockUpgradableReadGuard::upgrade(local_cache).child_storage.insert(key, value.clone()); + Ok(value) } fn exists_storage(&self, key: &[u8]) -> Result { @@ -446,27 +581,27 @@ mod tests { let h3a = H256::random(); let h3b = H256::random(); - let shared = new_shared_cache::(256*1024); + let shared = new_shared_cache::(256*1024, (0,1)); // blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ] // state [ 5 5 4 3 2 2 ] let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(root_parent.clone())); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![2]))], Some(h0.clone()), Some(0), || true); + s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![2]))], vec![], Some(h0.clone()), Some(0), || true); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(h0.clone())); - s.cache.sync_cache(&[], &[], vec![], Some(h1a.clone()), Some(1), || true); + s.cache.sync_cache(&[], &[], vec![], vec![], Some(h1a.clone()), Some(1), || true); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(h0.clone())); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![3]))], Some(h1b.clone()), Some(1), || false); + s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![3]))], vec![], Some(h1b.clone()), Some(1), || false); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(h1b.clone())); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![4]))], Some(h2b.clone()), Some(2), || false); + s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![4]))], vec![], Some(h2b.clone()), Some(2), || false); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(h1a.clone())); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![5]))], Some(h2a.clone()), Some(2), || true); + s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![5]))], vec![], Some(h2a.clone()), Some(2), || true); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(h2a.clone())); - s.cache.sync_cache(&[], &[], vec![], Some(h3a.clone()), Some(3), || true); + s.cache.sync_cache(&[], &[], vec![], vec![], Some(h3a.clone()), Some(3), || true); let s = CachingState::new(InMemory::::default(), shared.clone(), Some(h3a.clone())); assert_eq!(s.storage(&key).unwrap().unwrap(), vec![5]); @@ -487,9 +622,10 @@ mod tests { &[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], vec![], + vec![], Some(h3b.clone()), Some(3), - || true + || true, ); let s = CachingState::new(InMemory::::default(), shared.clone(), Some(h3a.clone())); assert!(s.storage(&key).unwrap().is_none()); @@ -498,34 +634,71 @@ mod tests { #[test] fn should_track_used_size_correctly() { let root_parent = H256::random(); - let shared = new_shared_cache::(5); + let shared = new_shared_cache::(109, ((109-36), 109)); let h0 = H256::random(); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(root_parent.clone())); let key = H256::random()[..].to_vec(); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2, 3]))], Some(h0.clone()), Some(0), || true); - assert_eq!(shared.lock().used_storage_cache_size(), 3 /* bytes */); + let s_key = H256::random()[..].to_vec(); + s.cache.sync_cache( + &[], + &[], + vec![(key.clone(), Some(vec![1, 2, 3]))], + vec![], + Some(h0.clone()), + Some(0), + || true, + ); + // 32 key, 3 byte size + assert_eq!(shared.lock().used_storage_cache_size(), 35 /* bytes */); let key = H256::random()[..].to_vec(); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2]))], Some(h0.clone()), Some(0), || true); - assert_eq!(shared.lock().used_storage_cache_size(), 5 /* bytes */); + s.cache.sync_cache( + &[], + &[], + vec![], + vec![(s_key.clone(), vec![(key.clone(), Some(vec![1, 2]))])], + Some(h0.clone()), + Some(0), + || true, + ); + // 35 + (2 * 32) key, 2 byte size + assert_eq!(shared.lock().used_storage_cache_size(), 101 /* bytes */); } #[test] fn should_remove_lru_items_based_on_tracking_used_size() { let root_parent = H256::random(); - let shared = new_shared_cache::(5); + let shared = new_shared_cache::(36*3, (2,3)); let h0 = H256::random(); let mut s = CachingState::new(InMemory::::default(), shared.clone(), Some(root_parent.clone())); let key = H256::random()[..].to_vec(); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2, 3, 4]))], Some(h0.clone()), Some(0), || true); - assert_eq!(shared.lock().used_storage_cache_size(), 4 /* bytes */); + s.cache.sync_cache( + &[], + &[], + vec![(key.clone(), Some(vec![1, 2, 3, 4]))], + vec![], + Some(h0.clone()), + Some(0), + || true, + ); + // 32 key, 4 byte size + assert_eq!(shared.lock().used_storage_cache_size(), 36 /* bytes */); let key = H256::random()[..].to_vec(); - s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2]))], Some(h0.clone()), Some(0), || true); - assert_eq!(shared.lock().used_storage_cache_size(), 2 /* bytes */); + s.cache.sync_cache( + &[], + &[], + vec![(key.clone(), Some(vec![1, 2]))], + vec![], + Some(h0.clone()), + Some(0), + || true, + ); + // 32 key, 2 byte size + assert_eq!(shared.lock().used_storage_cache_size(), 34 /* bytes */); } } diff --git a/substrate/core/client/src/backend.rs b/substrate/core/client/src/backend.rs index 5028b9dde7..8860f61c47 100644 --- a/substrate/core/client/src/backend.rs +++ b/substrate/core/client/src/backend.rs @@ -28,6 +28,12 @@ use hash_db::Hasher; use trie::MemoryDB; use parking_lot::Mutex; +/// In memory array of storage values. +pub type StorageCollection = Vec<(Vec, Option>)>; + +/// In memory arrays of storage values for multiple child tries. +pub type ChildStorageCollection = Vec<(Vec, StorageCollection)>; + /// State of a new block. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum NewBlockState { @@ -82,8 +88,12 @@ pub trait BlockImportOperation where fn update_db_storage(&mut self, update: >::Transaction) -> error::Result<()>; /// Inject storage data into the database replacing any existing data. fn reset_storage(&mut self, top: StorageOverlay, children: ChildrenStorageOverlay) -> error::Result; - /// Set top level storage changes. - fn update_storage(&mut self, update: Vec<(Vec, Option>)>) -> error::Result<()>; + /// Set storage changes. + fn update_storage( + &mut self, + update: StorageCollection, + child_update: ChildStorageCollection, + ) -> error::Result<()>; /// Inject changes trie data into the database. fn update_changes_trie(&mut self, update: MemoryDB) -> error::Result<()>; /// Insert auxiliary keys. Values are `None` if should be deleted. diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 8336a90ca4..ea1dfe6a45 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -59,7 +59,10 @@ use state_machine::{ }; use hash_db::Hasher; -use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage}; +use crate::backend::{ + self, BlockImportOperation, PrunableStateChangesTrieStorage, + StorageCollection, ChildStorageCollection +}; use crate::blockchain::{ self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend, ProvideCache, Cache, @@ -77,6 +80,7 @@ use substrate_telemetry::{telemetry, SUBSTRATE_INFO}; use log::{info, trace, warn}; + /// Type that implements `futures::Stream` of block import events. pub type ImportNotifications = mpsc::UnboundedReceiver>; @@ -133,7 +137,15 @@ pub struct Client where Block: BlockT { /// Client import operation, a wrapper for the backend. pub struct ClientImportOperation, B: backend::Backend> { op: B::BlockImportOperation, - notify_imported: Option<(Block::Hash, BlockOrigin, Block::Header, bool, Option, Option>)>>)>, + notify_imported: Option<( + Block::Hash, + BlockOrigin, + Block::Header, + bool, + Option<( + StorageCollection, + ChildStorageCollection, + )>)>, notify_finalized: Vec, } @@ -150,8 +162,10 @@ pub trait BlockchainEvents { /// Get storage changes event stream. /// /// Passing `None` as `filter_keys` subscribes to all storage changes. - fn storage_changes_notification_stream(&self, - filter_keys: Option<&[StorageKey]> + fn storage_changes_notification_stream( + &self, + filter_keys: Option<&[StorageKey]>, + child_filter_keys: Option<&[(StorageKey, Option>)]>, ) -> error::Result>; } @@ -351,6 +365,14 @@ impl Client where .map(StorageData)) } + /// Given a `BlockId` and a key, return the value under the hash in that block. + pub fn storage_hash(&self, id: &BlockId, key: &StorageKey) + -> error::Result> { + Ok(self.state_at(id)? + .storage_hash(&key.0).map_err(|e| error::Error::from_state(Box::new(e)))? + ) + } + /// Given a `BlockId`, a key prefix, and a child storage key, return the matching child storage keys. pub fn child_storage_keys( &self, @@ -378,6 +400,18 @@ impl Client where .map(StorageData)) } + /// Given a `BlockId`, a key and a child storage key, return the hash under the key in that block. + pub fn child_storage_hash( + &self, + id: &BlockId, + child_storage_key: &StorageKey, + key: &StorageKey + ) -> error::Result> { + Ok(self.state_at(id)? + .child_storage_hash(&child_storage_key.0, &key.0).map_err(|e| error::Error::from_state(Box::new(e)))? + ) + } + /// Get the code at a given block. pub fn code_at(&self, id: &BlockId) -> error::Result> { Ok(self.storage(id, &StorageKey(well_known_keys::CODE.to_vec()))? @@ -913,7 +947,7 @@ impl Client where operation.op.update_db_storage(storage_update)?; } if let Some(storage_changes) = storage_changes.clone() { - operation.op.update_storage(storage_changes)?; + operation.op.update_storage(storage_changes.0, storage_changes.1)?; } if let Some(Some(changes_update)) = changes_update { operation.op.update_changes_trie(changes_update)?; @@ -942,7 +976,10 @@ impl Client where ) -> error::Result<( Option>, Option>, - Option, Option>)>>, + Option<( + Vec<(Vec, Option>)>, + Vec<(Vec, Vec<(Vec, Option>)>)> + )> )> where E: CallExecutor + Send + Sync + Clone, @@ -985,7 +1022,9 @@ impl Client where overlay.commit_prospective(); - Ok((Some(storage_update), Some(changes_update), Some(overlay.into_committed().collect()))) + let (top, children) = overlay.into_committed(); + let children = children.map(|(sk, it)| (sk, it.collect())).collect(); + Ok((Some(storage_update), Some(changes_update), Some((top.collect(), children)))) }, None => Ok((None, None, None)) } @@ -1084,14 +1123,26 @@ impl Client where fn notify_imported( &self, - notify_import: (Block::Hash, BlockOrigin, Block::Header, bool, Option, Option>)>>), + notify_import: ( + Block::Hash, BlockOrigin, + Block::Header, + bool, + Option<( + Vec<(Vec, Option>)>, + Vec<(Vec, Vec<(Vec, Option>)>)>, + ) + >), ) -> error::Result<()> { let (hash, origin, header, is_new_best, storage_changes) = notify_import; if let Some(storage_changes) = storage_changes { // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? self.storage_notifications.lock() - .trigger(&hash, storage_changes.into_iter()); + .trigger( + &hash, + storage_changes.0.into_iter(), + storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), + ); } let notification = BlockImportNotification:: { @@ -1467,8 +1518,12 @@ where } /// Get storage changes event stream. - fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result> { - Ok(self.storage_notifications.lock().listen(filter_keys)) + fn storage_changes_notification_stream( + &self, + filter_keys: Option<&[StorageKey]>, + child_filter_keys: Option<&[(StorageKey, Option>)]>, + ) -> error::Result> { + Ok(self.storage_notifications.lock().listen(filter_keys, child_filter_keys)) } } diff --git a/substrate/core/client/src/in_mem.rs b/substrate/core/client/src/in_mem.rs index dec10d40f7..54e1ac027b 100644 --- a/substrate/core/client/src/in_mem.rs +++ b/substrate/core/client/src/in_mem.rs @@ -33,7 +33,7 @@ use trie::MemoryDB; use consensus::well_known_cache_keys::Id as CacheKeyId; use crate::error; -use crate::backend::{self, NewBlockState}; +use crate::backend::{self, NewBlockState, StorageCollection, ChildStorageCollection}; use crate::light; use crate::leaves::LeafSet; use crate::blockchain::{self, BlockStatus, HeaderBackend}; @@ -515,7 +515,11 @@ where Ok(()) } - fn update_storage(&mut self, _update: Vec<(Vec, Option>)>) -> error::Result<()> { + fn update_storage( + &mut self, + _update: StorageCollection, + _child_update: ChildStorageCollection, + ) -> error::Result<()> { Ok(()) } diff --git a/substrate/core/client/src/lib.rs b/substrate/core/client/src/lib.rs index 574ec95dc0..8062fae500 100644 --- a/substrate/core/client/src/lib.rs +++ b/substrate/core/client/src/lib.rs @@ -59,7 +59,7 @@ pub use crate::client::{ new_in_mem, BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents, BlockImportNotification, Client, ClientInfo, ExecutionStrategies, - LongestChain + LongestChain, }; #[cfg(feature = "std")] pub use crate::notifications::{StorageEventStream, StorageChangeSet}; diff --git a/substrate/core/client/src/light/backend.rs b/substrate/core/client/src/light/backend.rs index 60b4c66c17..f71366808e 100644 --- a/substrate/core/client/src/light/backend.rs +++ b/substrate/core/client/src/light/backend.rs @@ -26,7 +26,10 @@ use runtime_primitives::{generic::BlockId, Justification, StorageOverlay, Childr use state_machine::{Backend as StateBackend, TrieBackend, backend::InMemory as InMemoryState}; use runtime_primitives::traits::{Block as BlockT, NumberFor, Zero, Header}; use crate::in_mem::{self, check_genesis_storage}; -use crate::backend::{AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState}; +use crate::backend::{ + AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState, + StorageCollection, ChildStorageCollection, +}; use crate::blockchain::HeaderBackend as BlockchainHeaderBackend; use crate::error::{Error as ClientError, Result as ClientResult}; use crate::light::blockchain::{Blockchain, Storage as BlockchainStorage}; @@ -310,7 +313,11 @@ where Ok(()) } - fn update_storage(&mut self, _update: Vec<(Vec, Option>)>) -> ClientResult<()> { + fn update_storage( + &mut self, + _update: StorageCollection, + _child_update: ChildStorageCollection, + ) -> ClientResult<()> { // we're not storing anything locally => ignore changes Ok(()) } diff --git a/substrate/core/client/src/notifications.rs b/substrate/core/client/src/notifications.rs index 139238f343..931a40f20d 100644 --- a/substrate/core/client/src/notifications.rs +++ b/substrate/core/client/src/notifications.rs @@ -30,18 +30,39 @@ use runtime_primitives::traits::Block as BlockT; #[derive(Debug)] pub struct StorageChangeSet { changes: Arc)>>, + child_changes: Arc)>)>>, filter: Option>, + child_filters: Option>>>, } impl StorageChangeSet { /// Convert the change set into iterator over storage items. - pub fn iter<'a>(&'a self) -> impl Iterator)> + 'a { - self.changes + pub fn iter<'a>(&'a self) + -> impl Iterator, &'a StorageKey, Option<&'a StorageData>)> + 'a { + let top = self.changes .iter() .filter(move |&(key, _)| match self.filter { Some(ref filter) => filter.contains(key), None => true, }) + .map(move |(k,v)| (None, k, v.as_ref())); + let children = self.child_changes + .iter() + .filter_map(move |(sk, changes)| { + if let Some(cf) = self.child_filters.as_ref() { + if let Some(filter) = cf.get(sk) { + Some(changes + .iter() + .filter(move |&(key, _)| match filter { + Some(ref filter) => filter.contains(key), + None => true, + }) + .map(move |(k,v)| (Some(sk), k, v.as_ref()))) + } else { None } + } else { None } + }) + .flatten(); + top.chain(children) } } @@ -56,9 +77,14 @@ pub struct StorageNotifications { next_id: SubscriberId, wildcard_listeners: FnvHashSet, listeners: HashMap>, + child_listeners: HashMap>, + FnvHashSet + )>, sinks: FnvHashMap, Option>, + Option>>>, )>, } @@ -68,6 +94,7 @@ impl Default for StorageNotifications { next_id: Default::default(), wildcard_listeners: Default::default(), listeners: Default::default(), + child_listeners: Default::default(), sinks: Default::default(), } } @@ -78,16 +105,24 @@ impl StorageNotifications { /// /// Note the changes are going to be filtered by listener's filter key. /// In fact no event might be sent if clients are not interested in the changes. - pub fn trigger(&mut self, hash: &Block::Hash, changeset: impl Iterator, Option>)>) { + pub fn trigger( + &mut self, + hash: &Block::Hash, + changeset: impl Iterator, Option>)>, + child_changeset: impl Iterator< + Item=(Vec, impl Iterator, Option>)>) + >, + ) { let has_wildcard = !self.wildcard_listeners.is_empty(); // early exit if no listeners - if !has_wildcard && self.listeners.is_empty() { + if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() { return; } let mut subscribers = self.wildcard_listeners.clone(); let mut changes = Vec::new(); + let mut child_changes = Vec::new(); // Collect subscribers and changes for (k, v) in changeset { @@ -102,21 +137,47 @@ impl StorageNotifications { changes.push((k, v.map(StorageData))); } } + for (sk, changeset) in child_changeset { + let sk = StorageKey(sk); + if let Some((cl, cw)) = self.child_listeners.get(&sk) { + let mut changes = Vec::new(); + for (k, v) in changeset { + let k = StorageKey(k); + let listeners = cl.get(&k); + + if let Some(ref listeners) = listeners { + subscribers.extend(listeners.iter()); + } + + subscribers.extend(cw.iter()); + + if !cw.is_empty() || listeners.is_some() { + changes.push((k, v.map(StorageData))); + } + } + if !changes.is_empty() { + child_changes.push((sk, changes)); + } + } + } // Don't send empty notifications - if changes.is_empty() { + if changes.is_empty() && child_changes.is_empty() { return; } let changes = Arc::new(changes); + let child_changes = Arc::new(child_changes); // Trigger the events for subscriber in subscribers { let should_remove = { - let &(ref sink, ref filter) = self.sinks.get(&subscriber) + let &(ref sink, ref filter, ref child_filters) = self.sinks.get(&subscriber) .expect("subscribers returned from self.listeners are always in self.sinks; qed"); sink.unbounded_send((hash.clone(), StorageChangeSet { changes: changes.clone(), + child_changes: child_changes.clone(), filter: filter.clone(), + child_filters: child_filters.clone(), })).is_err() }; @@ -126,53 +187,120 @@ impl StorageNotifications { } } - fn remove_subscriber(&mut self, subscriber: SubscriberId) { - if let Some((_, filters)) = self.sinks.remove(&subscriber) { - match filters { - None => { - self.wildcard_listeners.remove(&subscriber); - }, - Some(filters) => { - for key in filters { - let remove_key = match self.listeners.get_mut(&key) { - Some(ref mut set) => { - set.remove(&subscriber); - set.is_empty() - }, - None => false, - }; + fn remove_subscriber_from( + subscriber: &SubscriberId, + filters: &Option>, + listeners: &mut HashMap>, + wildcards: &mut FnvHashSet, + ){ + match filters { + None => { + wildcards.remove(subscriber); + }, + Some(filters) => { - if remove_key { - self.listeners.remove(&key); - } + for key in filters.iter() { + let remove_key = match listeners.get_mut(key) { + Some(ref mut set) => { + set.remove(subscriber); + set.is_empty() + }, + None => false, + }; + + if remove_key { + listeners.remove(key); } - }, + } } } } - /// Start listening for particular storage keys. - pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> StorageEventStream { - self.next_id += 1; + fn remove_subscriber(&mut self, subscriber: SubscriberId) { + if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) { + Self::remove_subscriber_from( + &subscriber, + &filters, + &mut self.listeners, + &mut self.wildcard_listeners, + ); + if let Some(child_filters) = child_filters.as_ref() { + for (c_key, filters) in child_filters { - // add subscriber for every key - let keys = match filter_keys { + if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { + Self::remove_subscriber_from( + &subscriber, + &filters, + &mut *listeners, + &mut *wildcards, + ); + + if listeners.is_empty() && wildcards.is_empty() { + self.child_listeners.remove(&c_key); + } + } + } + } + } + } + + fn listen_from( + current_id: SubscriberId, + filter_keys: &Option>, + listeners: &mut HashMap>, + wildcards: &mut FnvHashSet, + ) -> Option> + { + match filter_keys { None => { - self.wildcard_listeners.insert(self.next_id); + wildcards.insert(current_id); None }, - Some(keys) => Some(keys.iter().map(|key| { - self.listeners + Some(keys) => Some(keys.as_ref().iter().map(|key| { + listeners .entry(key.clone()) .or_insert_with(Default::default) - .insert(self.next_id); + .insert(current_id); key.clone() }).collect()) - }; + } + } + + /// Start listening for particular storage keys. + pub fn listen( + &mut self, + filter_keys: Option<&[StorageKey]>, + filter_child_keys: Option<&[(StorageKey, Option>)]>, + ) -> StorageEventStream { + self.next_id += 1; + let current_id = self.next_id; + + // add subscriber for every key + let keys = Self::listen_from( + current_id, + &filter_keys, + &mut self.listeners, + &mut self.wildcard_listeners, + ); + let child_keys = filter_child_keys.map(|filter_child_keys| { + filter_child_keys.iter().map(|(c_key, o_keys)| { + let (c_listeners, c_wildcards) = self.child_listeners + .entry(c_key.clone()) + .or_insert_with(Default::default); + + (c_key.clone(), Self::listen_from( + current_id, + o_keys, + &mut *c_listeners, + &mut *c_wildcards, + )) + }).collect() + }); + // insert sink let (tx, rx) = mpsc::unbounded(); - self.sinks.insert(self.next_id, (tx, keys)); + self.sinks.insert(current_id, (tx, keys, child_keys)); rx } } @@ -182,13 +310,26 @@ mod tests { use runtime_primitives::testing::{H256 as Hash, Block as RawBlock, ExtrinsicWrapper}; use super::*; use futures::Stream; + use std::iter::{empty, Empty}; + + type TestChangeSet = ( + Vec<(StorageKey, Option)>, + Vec<(StorageKey, Vec<(StorageKey, Option)>)>, + ); #[cfg(test)] - impl From)>> for StorageChangeSet { - fn from(changes: Vec<(StorageKey, Option)>) -> Self { + impl From for StorageChangeSet { + fn from(changes: TestChangeSet) -> Self { + // warning hardcoded child trie wildcard to test upon + let child_filters = Some([ + (StorageKey(vec![4]), None), + (StorageKey(vec![5]), None), + ].into_iter().cloned().collect()); StorageChangeSet { - changes: Arc::new(changes), + changes: Arc::new(changes.0), + child_changes: Arc::new(changes.1), filter: None, + child_filters, } } } @@ -206,43 +347,73 @@ mod tests { fn triggering_change_should_notify_wildcard_listeners() { // given let mut notifications = StorageNotifications::::default(); - let mut recv = notifications.listen(None).wait(); + let child_filter = [(StorageKey(vec![4]), None)]; + let mut recv = notifications.listen(None, Some(&child_filter[..])).wait(); // when let changeset = vec![ (vec![2], Some(vec![3])), (vec![3], None), ]; - notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); + let c_changeset_1 = vec![ + (vec![5], Some(vec![4])), + (vec![6], None), + ]; + let c_changeset = vec![(vec![4], c_changeset_1)]; + notifications.trigger( + &Hash::from_low_u64_be(1), + changeset.into_iter(), + c_changeset.into_iter().map(|(a,b)| (a, b.into_iter())), + ); // then - assert_eq!(recv.next().unwrap(), Ok((Hash::from_low_u64_be(1), vec![ + assert_eq!(recv.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![ (StorageKey(vec![2]), Some(StorageData(vec![3]))), (StorageKey(vec![3]), None), - ].into()))); + ], vec![(StorageKey(vec![4]), vec![ + (StorageKey(vec![5]), Some(StorageData(vec![4]))), + (StorageKey(vec![6]), None), + ])]).into()))); } #[test] fn should_only_notify_interested_listeners() { // given let mut notifications = StorageNotifications::::default(); - let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); - let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); + let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; + let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])]), None).wait(); + let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])]), None).wait(); + let mut recv3 = notifications.listen(Some(&[]), Some(&child_filter)).wait(); // when let changeset = vec![ (vec![2], Some(vec![3])), (vec![1], None), ]; - notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); + let c_changeset_1 = vec![ + (vec![5], Some(vec![4])), + (vec![6], None), + ]; + + let c_changeset = vec![(vec![4], c_changeset_1)]; + notifications.trigger( + &Hash::from_low_u64_be(1), + changeset.into_iter(), + c_changeset.into_iter().map(|(a,b)| (a, b.into_iter())), + ); // then - assert_eq!(recv1.next().unwrap(), Ok((Hash::from_low_u64_be(1), vec![ + assert_eq!(recv1.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![ (StorageKey(vec![1]), None), - ].into()))); - assert_eq!(recv2.next().unwrap(), Ok((Hash::from_low_u64_be(1), vec![ + ], vec![]).into()))); + assert_eq!(recv2.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![ (StorageKey(vec![2]), Some(StorageData(vec![3]))), - ].into()))); + ], vec![]).into()))); + assert_eq!(recv3.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![], + vec![ + (StorageKey(vec![4]), vec![(StorageKey(vec![5]), Some(StorageData(vec![4])))]), + ]).into()))); + } #[test] @@ -250,11 +421,14 @@ mod tests { // given let mut notifications = StorageNotifications::::default(); { - let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); - let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); - let _recv3 = notifications.listen(None).wait(); + let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; + let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])]), None).wait(); + let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])]), None).wait(); + let _recv3 = notifications.listen(None, None).wait(); + let _recv4 = notifications.listen(None, Some(&child_filter)).wait(); assert_eq!(notifications.listeners.len(), 2); - assert_eq!(notifications.wildcard_listeners.len(), 1); + assert_eq!(notifications.wildcard_listeners.len(), 2); + assert_eq!(notifications.child_listeners.len(), 1); } // when @@ -262,11 +436,13 @@ mod tests { (vec![2], Some(vec![3])), (vec![1], None), ]; - notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); + let c_changeset = empty::<(_, Empty<_>)>(); + notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); // then assert_eq!(notifications.listeners.len(), 0); assert_eq!(notifications.wildcard_listeners.len(), 0); + assert_eq!(notifications.child_listeners.len(), 0); } #[test] @@ -274,11 +450,12 @@ mod tests { // given let mut recv = { let mut notifications = StorageNotifications::::default(); - let recv = notifications.listen(None).wait(); + let recv = notifications.listen(None, None).wait(); // when let changeset = vec![]; - notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); + let c_changeset = empty::<(_, Empty<_>)>(); + notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); recv }; diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs index e52d318d9f..474dfe10ed 100644 --- a/substrate/core/rpc/src/state/mod.rs +++ b/substrate/core/rpc/src/state/mod.rs @@ -362,8 +362,9 @@ impl StateApi for State where } fn storage_hash(&self, key: StorageKey, block: Option) -> Result> { - use runtime_primitives::traits::{Hash, Header as HeaderT}; - Ok(self.storage(key, block)?.map(|x| ::Hashing::hash(&x.0))) + let block = self.unwrap_or_best(block)?; + trace!(target: "rpc", "Querying storage hash at {:?} for key {}", block, HexDisplay::from(&key.0)); + Ok(self.client.storage_hash(&BlockId::Hash(block), &key)?) } fn storage_size(&self, key: StorageKey, block: Option) -> Result> { @@ -398,11 +399,13 @@ impl StateApi for State where key: StorageKey, block: Option ) -> Result> { - use runtime_primitives::traits::{Hash, Header as HeaderT}; - Ok( - self.child_storage(child_storage_key, key, block)? - .map(|x| ::Hashing::hash(&x.0)) - ) + let block = self.unwrap_or_best(block)?; + trace!( + target: "rpc", "Querying child storage hash at {:?} for key {}", + block, + HexDisplay::from(&key.0), + ); + Ok(self.client.child_storage_hash(&BlockId::Hash(block), &child_storage_key, &key)?) } fn child_storage_size( @@ -439,7 +442,10 @@ impl StateApi for State where keys: Option> ) { let keys = Into::>>::into(keys); - let stream = match self.client.storage_changes_notification_stream(keys.as_ref().map(|x| &**x)) { + let stream = match self.client.storage_changes_notification_stream( + keys.as_ref().map(|x| &**x), + None + ) { Ok(stream) => stream, Err(err) => { let _ = subscriber.reject(error::Error::from(err).into()); @@ -466,7 +472,10 @@ impl StateApi for State where .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) .map(|(block, changes)| Ok(StorageChangeSet { block, - changes: changes.iter().cloned().collect(), + changes: changes.iter() + .filter_map(|(o_sk, k, v)| if o_sk.is_none() { + Some((k.clone(),v.cloned())) + } else { None }).collect(), })); sink @@ -488,7 +497,8 @@ impl StateApi for State where fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: Subscriber) { let stream = match self.client.storage_changes_notification_stream( - Some(&[StorageKey(storage::well_known_keys::CODE.to_vec())]) + Some(&[StorageKey(storage::well_known_keys::CODE.to_vec())]), + None, ) { Ok(stream) => stream, Err(err) => { diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 0f5afb7772..c095bdb7f4 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -499,6 +499,8 @@ impl Components for FullComponents { let db_settings = client_db::DatabaseSettings { cache_size: config.database_cache_size.map(|u| u as usize), state_cache_size: config.state_cache_size, + state_cache_child_ratio: + config.state_cache_child_ratio.map(|v| (v, 100)), path: config.database_path.as_str().into(), pruning: config.pruning.clone(), }; @@ -591,6 +593,8 @@ impl Components for LightComponents { let db_settings = client_db::DatabaseSettings { cache_size: None, state_cache_size: config.state_cache_size, + state_cache_child_ratio: + config.state_cache_child_ratio.map(|v| (v, 100)), path: config.database_path.as_str().into(), pruning: config.pruning.clone(), }; diff --git a/substrate/core/service/src/config.rs b/substrate/core/service/src/config.rs index 5996ec837d..3a49630898 100644 --- a/substrate/core/service/src/config.rs +++ b/substrate/core/service/src/config.rs @@ -50,6 +50,8 @@ pub struct Configuration { pub database_cache_size: Option, /// Size of internal state cache in Bytes pub state_cache_size: usize, + /// Size in percent of cache size dedicated to child tries + pub state_cache_child_ratio: Option, /// Pruning settings. pub pruning: PruningMode, /// Additional key seeds. @@ -100,6 +102,7 @@ impl Configuration ( database_path: root.join("db").to_str().unwrap().into(), database_cache_size: None, state_cache_size: 16777216, + state_cache_child_ratio: None, pruning: Default::default(), keys: keys, chain_spec: (*spec).clone(), diff --git a/substrate/core/state-machine/src/backend.rs b/substrate/core/state-machine/src/backend.rs index 81529c6da3..c86c802bfb 100644 --- a/substrate/core/state-machine/src/backend.rs +++ b/substrate/core/state-machine/src/backend.rs @@ -51,6 +51,11 @@ pub trait Backend { /// Get keyed child storage or None if there is nothing associated. fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result>, Self::Error>; + /// Get child keyed storage value hash or None if there is nothing associated. + fn child_storage_hash(&self, storage_key: &[u8], key: &[u8]) -> Result, Self::Error> { + self.child_storage(storage_key, key).map(|v| v.map(|v| H::hash(&v))) + } + /// true if a key exists in storage. fn exists_storage(&self, key: &[u8]) -> Result { Ok(self.storage(key)?.is_some()) diff --git a/substrate/core/state-machine/src/overlayed_changes.rs b/substrate/core/state-machine/src/overlayed_changes.rs index 663e4ff72e..7d6d6081bd 100644 --- a/substrate/core/state-machine/src/overlayed_changes.rs +++ b/substrate/core/state-machine/src/overlayed_changes.rs @@ -255,9 +255,13 @@ impl OverlayedChanges { /// /// Panics: /// Will panic if there are any uncommitted prospective changes. - pub fn into_committed(self) -> impl Iterator, Option>)> { + pub fn into_committed(self) -> ( + impl Iterator, Option>)>, + impl Iterator, impl Iterator, Option>)>)>, + ){ assert!(self.prospective.is_empty()); - self.committed.top.into_iter().map(|(k, v)| (k, v.value)) + (self.committed.top.into_iter().map(|(k, v)| (k, v.value)), + self.committed.children.into_iter().map(|(sk, v)| (sk, v.1.into_iter()))) } /// Inserts storage entry responsible for current extrinsic index.