Add storage cache for child trie and notification internals (#2639)

* child cache, and test failing notifications

* fix tests and no listen child on top wildcard

* remove useless method

* bump impl version

* Update core/client/src/notifications.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update core/client/src/notifications.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update core/client/src/notifications.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update core/client/src/notifications.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* factoring notification methods to remove some redundant code.

* test child sub removal

* HStorage implementation and some type alias.

* Remove HStorage cache: does not fit

* fix removal

* Make cache use byte length (shared) instead of number of kv

* Make use of hashes cache in rpc

* applying ratio on different lru caches

* Fix format

* break a line

* Remove per element overhead of lru cache.

* typo
This commit is contained in:
cheme
2019-06-14 11:25:20 +02:00
committed by Gavin Wood
parent 5c3d1f82cd
commit e777038418
16 changed files with 679 additions and 196 deletions
+1 -1
View File
@@ -4023,8 +4023,8 @@ dependencies = [
"kvdb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)",
"kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)",
"kvdb-rocksdb 0.1.4 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb-rocksdb 0.1.4 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)",
"linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-primitives 2.0.0", "sr-primitives 2.0.0",
+1 -1
View File
@@ -11,7 +11,7 @@ kvdb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c
# FIXME replace with release as soon as our rocksdb changes are released upstream https://github.com/paritytech/parity-common/issues/88 # FIXME replace with release as soon as our rocksdb changes are released upstream https://github.com/paritytech/parity-common/issues/88
kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d", optional = true } kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d", optional = true }
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d" } kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d" }
lru-cache = "0.1.1" linked-hash-map = "0.5"
hash-db = { version = "0.12" } hash-db = { version = "0.12" }
primitives = { package = "substrate-primitives", path = "../../primitives" } primitives = { package = "substrate-primitives", path = "../../primitives" }
runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" }
+45 -15
View File
@@ -38,6 +38,7 @@ use std::collections::HashMap;
use client::backend::NewBlockState; use client::backend::NewBlockState;
use client::blockchain::HeaderBackend; use client::blockchain::HeaderBackend;
use client::ExecutionStrategies; use client::ExecutionStrategies;
use client::backend::{StorageCollection, ChildStorageCollection};
use parity_codec::{Decode, Encode}; use parity_codec::{Decode, Encode};
use hash_db::Hasher; use hash_db::Hasher;
use kvdb::{KeyValueDB, DBTransaction}; use kvdb::{KeyValueDB, DBTransaction};
@@ -69,6 +70,9 @@ use client::in_mem::Backend as InMemoryBackend;
const CANONICALIZATION_DELAY: u64 = 4096; const CANONICALIZATION_DELAY: u64 = 4096;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768; const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768;
/// Default value for storage cache child ratio.
const DEFAULT_CHILD_RATIO: (usize, usize) = (1, 10);
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit. /// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend<Arc<dyn state_machine::Storage<Blake2Hasher>>, Blake2Hasher>; pub type DbState = state_machine::TrieBackend<Arc<dyn state_machine::Storage<Blake2Hasher>>, Blake2Hasher>;
@@ -169,6 +173,8 @@ pub struct DatabaseSettings {
pub cache_size: Option<usize>, pub cache_size: Option<usize>,
/// State cache size. /// State cache size.
pub state_cache_size: usize, pub state_cache_size: usize,
/// Ratio of cache size dedicated to child tries.
pub state_cache_child_ratio: Option<(usize, usize)>,
/// Path to the database. /// Path to the database.
pub path: PathBuf, pub path: PathBuf,
/// Pruning mode. /// Pruning mode.
@@ -181,7 +187,10 @@ pub fn new_client<E, S, Block, RA>(
executor: E, executor: E,
genesis_storage: S, genesis_storage: S,
execution_strategies: ExecutionStrategies, execution_strategies: ExecutionStrategies,
) -> Result<client::Client<Backend<Block>, client::LocalCallExecutor<Backend<Block>, E>, Block, RA>, client::error::Error> ) -> Result<
client::Client<Backend<Block>,
client::LocalCallExecutor<Backend<Block>, E>, Block, RA>, client::error::Error
>
where where
Block: BlockT<Hash=H256>, Block: BlockT<Hash=H256>,
E: CodeExecutor<Blake2Hasher> + RuntimeInfo, E: CodeExecutor<Blake2Hasher> + RuntimeInfo,
@@ -363,7 +372,8 @@ impl<Block: BlockT> client::blockchain::ProvideCache<Block> for BlockchainDb<Blo
pub struct BlockImportOperation<Block: BlockT, H: Hasher> { pub struct BlockImportOperation<Block: BlockT, H: Hasher> {
old_state: CachingState<Blake2Hasher, RefTrackingState<Block>, Block>, old_state: CachingState<Blake2Hasher, RefTrackingState<Block>, Block>,
db_updates: PrefixedMemoryDB<H>, db_updates: PrefixedMemoryDB<H>,
storage_updates: Vec<(Vec<u8>, Option<Vec<u8>>)>, storage_updates: StorageCollection,
child_storage_updates: ChildStorageCollection,
changes_trie_updates: MemoryDB<H>, changes_trie_updates: MemoryDB<H>,
pending_block: Option<PendingBlock<Block>>, pending_block: Option<PendingBlock<Block>>,
aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>, aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
@@ -455,8 +465,13 @@ where Block: BlockT<Hash=H256>,
Ok(()) Ok(())
} }
fn update_storage(&mut self, update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> Result<(), client::error::Error> { fn update_storage(
&mut self,
update: StorageCollection,
child_update: ChildStorageCollection,
) -> Result<(), client::error::Error> {
self.storage_updates = update; self.storage_updates = update;
self.child_storage_updates = child_update;
Ok(()) Ok(())
} }
@@ -670,14 +685,14 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
#[cfg(feature = "kvdb-rocksdb")] #[cfg(feature = "kvdb-rocksdb")]
fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result<Self, client::error::Error> { fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
let db = crate::utils::open_database(&config, columns::META, "full")?; let db = crate::utils::open_database(&config, columns::META, "full")?;
Backend::from_kvdb(db as Arc<_>, config.pruning, canonicalization_delay, config.state_cache_size) Backend::from_kvdb(db as Arc<_>, canonicalization_delay, &config)
} }
#[cfg(not(feature = "kvdb-rocksdb"))] #[cfg(not(feature = "kvdb-rocksdb"))]
fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result<Self, client::error::Error> { fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
log::warn!("Running without the RocksDB feature. The database will NOT be saved."); log::warn!("Running without the RocksDB feature. The database will NOT be saved.");
let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS)); let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS));
Backend::from_kvdb(db as Arc<_>, config.pruning, canonicalization_delay, config.state_cache_size) Backend::from_kvdb(db as Arc<_>, canonicalization_delay, &config)
} }
#[cfg(any(test, feature = "test-helpers"))] #[cfg(any(test, feature = "test-helpers"))]
@@ -685,26 +700,36 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
use utils::NUM_COLUMNS; use utils::NUM_COLUMNS;
let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS)); let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS));
Self::new_test_db(keep_blocks, canonicalization_delay, db as Arc<_>)
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test_db(keep_blocks: u32, canonicalization_delay: u64, db: Arc<KeyValueDB>) -> Self {
let db_setting = DatabaseSettings {
cache_size: None,
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
path: Default::default(),
pruning: PruningMode::keep_blocks(keep_blocks),
};
Backend::from_kvdb( Backend::from_kvdb(
db as Arc<_>, db,
PruningMode::keep_blocks(keep_blocks),
canonicalization_delay, canonicalization_delay,
16777216, &db_setting,
).expect("failed to create test-db") ).expect("failed to create test-db")
} }
fn from_kvdb( fn from_kvdb(
db: Arc<dyn KeyValueDB>, db: Arc<dyn KeyValueDB>,
pruning: PruningMode,
canonicalization_delay: u64, canonicalization_delay: u64,
state_cache_size: usize config: &DatabaseSettings
) -> Result<Self, client::error::Error> { ) -> Result<Self, client::error::Error> {
let is_archive_pruning = pruning.is_archive(); let is_archive_pruning = config.pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?; let blockchain = BlockchainDb::new(db.clone())?;
let meta = blockchain.meta.clone(); let meta = blockchain.meta.clone();
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e)); let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<_, _> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?; let state_db: StateDb<_, _> = StateDb::new(config.pruning.clone(), &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb { let storage_db = StorageDb {
db: db.clone(), db: db.clone(),
state_db, state_db,
@@ -722,7 +747,10 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
changes_trie_config: Mutex::new(None), changes_trie_config: Mutex::new(None),
blockchain, blockchain,
canonicalization_delay, canonicalization_delay,
shared_cache: new_shared_cache(state_cache_size), shared_cache: new_shared_cache(
config.state_cache_size,
config.state_cache_child_ratio.unwrap_or(DEFAULT_CHILD_RATIO),
),
import_lock: Default::default(), import_lock: Default::default(),
}) })
} }
@@ -1094,6 +1122,7 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
&enacted, &enacted,
&retracted, &retracted,
operation.storage_updates, operation.storage_updates,
operation.child_storage_updates,
Some(hash), Some(hash),
Some(number), Some(number),
|| is_best, || is_best,
@@ -1200,6 +1229,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
old_state, old_state,
db_updates: PrefixedMemoryDB::default(), db_updates: PrefixedMemoryDB::default(),
storage_updates: Default::default(), storage_updates: Default::default(),
child_storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(), changes_trie_updates: MemoryDB::default(),
aux_ops: Vec::new(), aux_ops: Vec::new(),
finalized_blocks: Vec::new(), finalized_blocks: Vec::new(),
@@ -1348,7 +1378,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
fn destroy_state(&self, state: Self::State) -> Result<(), client::error::Error> { fn destroy_state(&self, state: Self::State) -> Result<(), client::error::Error> {
if let Some(hash) = state.cache.parent_hash.clone() { if let Some(hash) = state.cache.parent_hash.clone() {
let is_best = || self.blockchain.meta.read().best_hash == hash; let is_best = || self.blockchain.meta.read().best_hash == hash;
state.release().sync_cache(&[], &[], vec![], None, None, is_best); state.release().sync_cache(&[], &[], vec![], vec![], None, None, is_best);
} }
Ok(()) Ok(())
} }
@@ -1473,7 +1503,7 @@ mod tests {
db.storage.db.clone() db.storage.db.clone()
}; };
let backend = Backend::<Block>::from_kvdb(backing, PruningMode::keep_blocks(1), 0, 16777216).unwrap(); let backend = Backend::<Block>::new_test_db(1, 0, backing);
assert_eq!(backend.blockchain().info().best_number, 9); assert_eq!(backend.blockchain().info().best_number, 9);
for i in 0..10 { for i in 0..10 {
assert!(backend.blockchain().hash(i).unwrap().is_some()) assert!(backend.blockchain().hash(i).unwrap().is_some())
+258 -85
View File
@@ -19,60 +19,158 @@
use std::collections::{VecDeque, HashSet, HashMap}; use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc; use std::sync::Arc;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use lru_cache::LruCache; use linked_hash_map::{LinkedHashMap, Entry};
use hash_db::Hasher; use hash_db::Hasher;
use runtime_primitives::traits::{Block, Header}; use runtime_primitives::traits::{Block, Header};
use state_machine::{backend::Backend as StateBackend, TrieBackend}; use state_machine::{backend::Backend as StateBackend, TrieBackend};
use log::trace; use log::trace;
use super::{StorageCollection, ChildStorageCollection};
use std::hash::Hash as StdHash;
const STATE_CACHE_BLOCKS: usize = 12; const STATE_CACHE_BLOCKS: usize = 12;
type StorageKey = Vec<u8>; type StorageKey = Vec<u8>;
type ChildStorageKey = (Vec<u8>, Vec<u8>);
type StorageValue = Vec<u8>; type StorageValue = Vec<u8>;
/// Shared canonical state cache. /// Shared canonical state cache.
pub struct Cache<B: Block, H: Hasher> { pub struct Cache<B: Block, H: Hasher> {
/// Storage cache. `None` indicates that key is known to be missing. /// Storage cache. `None` indicates that key is known to be missing.
storage: LruCache<StorageKey, Option<StorageValue>>, lru_storage: LRUMap<StorageKey, Option<StorageValue>>,
/// Storage hashes cache. `None` indicates that key is known to be missing. /// Storage hashes cache. `None` indicates that key is known to be missing.
hashes: LruCache<StorageKey, Option<H::Out>>, lru_hashes: LRUMap<StorageKey, OptionHOut<H::Out>>,
/// Storage cache for child trie. `None` indicates that key is known to be missing.
lru_child_storage: LRUMap<ChildStorageKey, Option<StorageValue>>,
/// Information on the modifications in recently committed blocks; specifically which keys /// Information on the modifications in recently committed blocks; specifically which keys
/// changed in which block. Ordered by block number. /// changed in which block. Ordered by block number.
modifications: VecDeque<BlockChanges<B::Header>>, modifications: VecDeque<BlockChanges<B::Header>>,
/// Maximum cache size available, in Bytes. }
shared_cache_size: usize,
/// Used storage size, in Bytes. struct LRUMap<K, V>(LinkedHashMap<K, V>, usize, usize);
storage_used_size: usize,
/// Internal trait similar to `heapsize` but using
/// a simply estimation.
///
/// This should not be made public, it is implementation
/// detail trait. If it need to become public please
/// consider using `malloc_size_of`.
trait EstimateSize {
/// Return a size estimation of additional size needed
/// to cache this struct (in bytes).
fn estimate_size(&self) -> usize;
}
impl EstimateSize for Vec<u8> {
fn estimate_size(&self) -> usize {
self.capacity()
}
}
impl EstimateSize for Option<Vec<u8>> {
fn estimate_size(&self) -> usize {
self.as_ref().map(|v|v.capacity()).unwrap_or(0)
}
}
struct OptionHOut<T: AsRef<[u8]>>(Option<T>);
impl<T: AsRef<[u8]>> EstimateSize for OptionHOut<T> {
fn estimate_size(&self) -> usize {
// capacity would be better
self.0.as_ref().map(|v|v.as_ref().len()).unwrap_or(0)
}
}
impl<T: EstimateSize> EstimateSize for (T, T) {
fn estimate_size(&self) -> usize {
self.0.estimate_size() + self.1.estimate_size()
}
}
impl<K: EstimateSize + Eq + StdHash, V: EstimateSize> LRUMap<K, V> {
fn remove(&mut self, k: &K) {
let map = &mut self.0;
let storage_used_size = &mut self.1;
if let Some(v) = map.remove(k) {
*storage_used_size -= k.estimate_size();
*storage_used_size -= v.estimate_size();
}
}
fn add(&mut self, k: K, v: V) {
let lmap = &mut self.0;
let storage_used_size = &mut self.1;
let limit = self.2;
let klen = k.estimate_size();
*storage_used_size += v.estimate_size();
// TODO assert k v size fit into limit?? to avoid insert remove?
match lmap.entry(k) {
Entry::Occupied(mut entry) => {
// note that in this case we are not running pure lru as
// it would require to remove first
*storage_used_size -= entry.get().estimate_size();
entry.insert(v);
},
Entry::Vacant(entry) => {
*storage_used_size += klen;
entry.insert(v);
},
};
while *storage_used_size > limit {
if let Some((k,v)) = lmap.pop_front() {
*storage_used_size -= k.estimate_size();
*storage_used_size -= v.estimate_size();
} else {
// can happen fairly often as we get value from multiple lru
// and only remove from a single lru
break;
}
}
}
fn get<Q:?Sized>(&mut self, k: &Q) -> Option<&mut V>
where K: std::borrow::Borrow<Q>,
Q: StdHash + Eq {
self.0.get_refresh(k)
}
fn used_size(&self) -> usize {
self.1
}
fn clear(&mut self) {
self.0.clear();
self.1 = 0;
}
} }
impl<B: Block, H: Hasher> Cache<B, H> { impl<B: Block, H: Hasher> Cache<B, H> {
/// Returns the used memory size of the storage cache in bytes. /// Returns the used memory size of the storage cache in bytes.
pub fn used_storage_cache_size(&self) -> usize { pub fn used_storage_cache_size(&self) -> usize {
self.storage_used_size self.lru_storage.used_size()
+ self.lru_child_storage.used_size()
// ignore small hashes storage and self.lru_hashes.used_size()
} }
} }
pub type SharedCache<B, H> = Arc<Mutex<Cache<B, H>>>; pub type SharedCache<B, H> = Arc<Mutex<Cache<B, H>>>;
/// Create new shared cache instance with given max memory usage. /// Fix lru storage size for hash (small 64ko).
pub fn new_shared_cache<B: Block, H: Hasher>(shared_cache_size: usize) -> SharedCache<B, H> { const FIX_LRU_HASH_SIZE: usize = 65_536;
// we need to supply a max capacity to `LruCache`, but since
// we don't have any idea how large the size of each item
// that is stored will be we can't calculate the max amount
// of items properly from `shared_cache_size`.
//
// what we do instead is to supply `shared_cache_size` as the
// max upper bound capacity (this would only be reached if each
// item would be one byte).
// each time we store to the storage cache we verify the memory
// constraint and pop the lru item if space needs to be freed.
/// Create a new shared cache instance with given max memory usage.
pub fn new_shared_cache<B: Block, H: Hasher>(
shared_cache_size: usize,
child_ratio: (usize, usize),
) -> SharedCache<B, H> {
let top = child_ratio.1.saturating_sub(child_ratio.0);
Arc::new(Mutex::new(Cache { Arc::new(Mutex::new(Cache {
storage: LruCache::new(shared_cache_size), lru_storage: LRUMap(LinkedHashMap::new(), 0,
hashes: LruCache::new(shared_cache_size), shared_cache_size * top / child_ratio.1),
lru_hashes: LRUMap(LinkedHashMap::new(), 0, FIX_LRU_HASH_SIZE),
lru_child_storage: LRUMap(LinkedHashMap::new(), 0,
shared_cache_size * child_ratio.0 / child_ratio.1),
modifications: VecDeque::new(), modifications: VecDeque::new(),
shared_cache_size: shared_cache_size,
storage_used_size: 0,
})) }))
} }
@@ -87,6 +185,8 @@ struct BlockChanges<B: Header> {
parent: B::Hash, parent: B::Hash,
/// A set of modified storage keys. /// A set of modified storage keys.
storage: HashSet<StorageKey>, storage: HashSet<StorageKey>,
/// A set of modified child storage keys.
child_storage: HashSet<ChildStorageKey>,
/// Block is part of the canonical chain. /// Block is part of the canonical chain.
is_canon: bool, is_canon: bool,
} }
@@ -97,6 +197,8 @@ struct LocalCache<H: Hasher> {
storage: HashMap<StorageKey, Option<StorageValue>>, storage: HashMap<StorageKey, Option<StorageValue>>,
/// Storage hashes cache. `None` indicates that key is known to be missing. /// Storage hashes cache. `None` indicates that key is known to be missing.
hashes: HashMap<StorageKey, Option<H::Out>>, hashes: HashMap<StorageKey, Option<H::Out>>,
/// Child storage cache. `None` indicates that key is known to be missing.
child_storage: HashMap<ChildStorageKey, Option<StorageValue>>,
} }
/// Cache changes. /// Cache changes.
@@ -135,7 +237,8 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
&mut self, &mut self,
enacted: &[B::Hash], enacted: &[B::Hash],
retracted: &[B::Hash], retracted: &[B::Hash],
changes: Vec<(StorageKey, Option<StorageValue>)>, changes: StorageCollection,
child_changes: ChildStorageCollection,
commit_hash: Option<B::Hash>, commit_hash: Option<B::Hash>,
commit_number: Option<<B::Header as Header>::Number>, commit_number: Option<<B::Header as Header>::Number>,
is_best: F, is_best: F,
@@ -155,7 +258,11 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
m.is_canon = true; m.is_canon = true;
for a in &m.storage { for a in &m.storage {
trace!("Reverting enacted key {:?}", a); trace!("Reverting enacted key {:?}", a);
CacheChanges::<H, B>::storage_remove(&mut cache.storage, a, &mut cache.storage_used_size); cache.lru_storage.remove(a);
}
for a in &m.child_storage {
trace!("Reverting enacted child key {:?}", a);
cache.lru_child_storage.remove(a);
} }
false false
} else { } else {
@@ -171,7 +278,11 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
m.is_canon = false; m.is_canon = false;
for a in &m.storage { for a in &m.storage {
trace!("Retracted key {:?}", a); trace!("Retracted key {:?}", a);
CacheChanges::<H, B>::storage_remove(&mut cache.storage, a, &mut cache.storage_used_size); cache.lru_storage.remove(a);
}
for a in &m.child_storage {
trace!("Retracted child key {:?}", a);
cache.lru_child_storage.remove(a);
} }
false false
} else { } else {
@@ -182,7 +293,9 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
if clear { if clear {
// We don't know anything about the block; clear everything // We don't know anything about the block; clear everything
trace!("Wiping cache"); trace!("Wiping cache");
cache.storage.clear(); cache.lru_storage.clear();
cache.lru_child_storage.clear();
cache.lru_hashes.clear();
cache.modifications.clear(); cache.modifications.clear();
} }
@@ -192,12 +305,21 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
if let Some(_) = self.parent_hash { if let Some(_) = self.parent_hash {
let mut local_cache = self.local_cache.write(); let mut local_cache = self.local_cache.write();
if is_best { if is_best {
trace!("Committing {} local, {} hashes, {} modified entries", local_cache.storage.len(), local_cache.hashes.len(), changes.len()); trace!(
"Committing {} local, {} hashes, {} modified root entries, {} modified child entries",
local_cache.storage.len(),
local_cache.hashes.len(),
changes.len(),
child_changes.iter().map(|v|v.1.len()).sum::<usize>(),
);
for (k, v) in local_cache.storage.drain() { for (k, v) in local_cache.storage.drain() {
CacheChanges::<H, B>::storage_insert(cache, k, v); cache.lru_storage.add(k, v);
}
for (k, v) in local_cache.child_storage.drain() {
cache.lru_child_storage.add(k, v);
} }
for (k, v) in local_cache.hashes.drain() { for (k, v) in local_cache.hashes.drain() {
cache.hashes.insert(k, v); cache.lru_hashes.add(k, OptionHOut(v));
} }
} }
} }
@@ -210,16 +332,28 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
cache.modifications.pop_back(); cache.modifications.pop_back();
} }
let mut modifications = HashSet::new(); let mut modifications = HashSet::new();
let mut child_modifications = HashSet::new();
child_changes.into_iter().for_each(|(sk, changes)|
for (k, v) in changes.into_iter() { for (k, v) in changes.into_iter() {
modifications.insert(k.clone()); let k = (sk.clone(), k);
if is_best { if is_best {
cache.hashes.remove(&k); cache.lru_child_storage.add(k.clone(), v);
CacheChanges::<H, B>::storage_insert(cache, k, v);
} }
child_modifications.insert(k);
} }
);
for (k, v) in changes.into_iter() {
if is_best {
cache.lru_hashes.remove(&k);
cache.lru_storage.add(k.clone(), v);
}
modifications.insert(k);
}
// Save modified storage. These are ordered by the block number. // Save modified storage. These are ordered by the block number.
let block_changes = BlockChanges { let block_changes = BlockChanges {
storage: modifications, storage: modifications,
child_storage: child_modifications,
number: *number, number: *number,
hash: hash.clone(), hash: hash.clone(),
is_canon: is_best, is_canon: is_best,
@@ -238,32 +372,6 @@ impl<H: Hasher, B: Block> CacheChanges<H, B> {
} }
} }
fn storage_insert(cache: &mut Cache<B, H>, k: StorageValue, v: Option<StorageValue>) {
if let Some(v_) = &v {
while cache.storage_used_size + v_.len() > cache.shared_cache_size {
// pop until space constraint satisfied
match cache.storage.remove_lru() {
Some((_, Some(popped_v))) =>
cache.storage_used_size = cache.storage_used_size - popped_v.len(),
Some((_, None)) => continue,
None => break,
};
}
cache.storage_used_size = cache.storage_used_size + v_.len();
}
cache.storage.insert(k, v);
}
fn storage_remove(
storage: &mut LruCache<StorageKey, Option<StorageValue>>,
k: &StorageKey,
storage_used_size: &mut usize,
) {
let v = storage.remove(k);
if let Some(Some(v_)) = v {
*storage_used_size = *storage_used_size - v_.len();
}
}
} }
impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> { impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
@@ -276,6 +384,7 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
local_cache: RwLock::new(LocalCache { local_cache: RwLock::new(LocalCache {
storage: Default::default(), storage: Default::default(),
hashes: Default::default(), hashes: Default::default(),
child_storage: Default::default(),
}), }),
parent_hash: parent_hash, parent_hash: parent_hash,
}, },
@@ -285,7 +394,8 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
/// Check if the key can be returned from cache by matching current block parent hash against canonical /// Check if the key can be returned from cache by matching current block parent hash against canonical
/// state and filtering out entries modified in later blocks. /// state and filtering out entries modified in later blocks.
fn is_allowed( fn is_allowed(
key: &[u8], key: Option<&[u8]>,
child_key: Option<&ChildStorageKey>,
parent_hash: &Option<B::Hash>, parent_hash: &Option<B::Hash>,
modifications: modifications:
&VecDeque<BlockChanges<B::Header>> &VecDeque<BlockChanges<B::Header>>
@@ -314,11 +424,19 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
} }
parent = &m.parent; parent = &m.parent;
} }
if let Some(key) = key {
if m.storage.contains(key) { if m.storage.contains(key) {
trace!("Cache lookup skipped for {:?}: modified in a later block", key); trace!("Cache lookup skipped for {:?}: modified in a later block", key);
return false; return false;
} }
} }
if let Some(child_key) = child_key {
if m.child_storage.contains(child_key) {
trace!("Cache lookup skipped for {:?}: modified in a later block", child_key);
return false;
}
}
}
trace!("Cache lookup skipped for {:?}: parent hash is unknown", key); trace!("Cache lookup skipped for {:?}: parent hash is unknown", key);
false false
} }
@@ -336,13 +454,14 @@ impl<H: Hasher, S: StateBackend<H>, B:Block> StateBackend<H> for CachingState<H,
fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> { fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
let local_cache = self.cache.local_cache.upgradable_read(); let local_cache = self.cache.local_cache.upgradable_read();
// Note that local cache makes that lru is not refreshed
if let Some(entry) = local_cache.storage.get(key).cloned() { if let Some(entry) = local_cache.storage.get(key).cloned() {
trace!("Found in local cache: {:?}", key); trace!("Found in local cache: {:?}", key);
return Ok(entry) return Ok(entry)
} }
let mut cache = self.cache.shared_cache.lock(); let mut cache = self.cache.shared_cache.lock();
if Self::is_allowed(key, &self.cache.parent_hash, &cache.modifications) { if Self::is_allowed(Some(key), None, &self.cache.parent_hash, &cache.modifications) {
if let Some(entry) = cache.storage.get_mut(key).map(|a| a.clone()) { if let Some(entry) = cache.lru_storage.get(key).map(|a| a.clone()) {
trace!("Found in shared cache: {:?}", key); trace!("Found in shared cache: {:?}", key);
return Ok(entry) return Ok(entry)
} }
@@ -360,8 +479,8 @@ impl<H: Hasher, S: StateBackend<H>, B:Block> StateBackend<H> for CachingState<H,
return Ok(entry) return Ok(entry)
} }
let mut cache = self.cache.shared_cache.lock(); let mut cache = self.cache.shared_cache.lock();
if Self::is_allowed(key, &self.cache.parent_hash, &cache.modifications) { if Self::is_allowed(Some(key), None, &self.cache.parent_hash, &cache.modifications) {
if let Some(entry) = cache.hashes.get_mut(key).map(|a| a.clone()) { if let Some(entry) = cache.lru_hashes.get(key).map(|a| a.0.clone()) {
trace!("Found hash in shared cache: {:?}", key); trace!("Found hash in shared cache: {:?}", key);
return Ok(entry) return Ok(entry)
} }
@@ -373,7 +492,23 @@ impl<H: Hasher, S: StateBackend<H>, B:Block> StateBackend<H> for CachingState<H,
} }
fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> { fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.state.child_storage(storage_key, key) let key = (storage_key.to_vec(), key.to_vec());
let local_cache = self.cache.local_cache.upgradable_read();
if let Some(entry) = local_cache.child_storage.get(&key).cloned() {
trace!("Found in local cache: {:?}", key);
return Ok(entry)
}
let mut cache = self.cache.shared_cache.lock();
if Self::is_allowed(None, Some(&key), &self.cache.parent_hash, &cache.modifications) {
if let Some(entry) = cache.lru_child_storage.get(&key).map(|a| a.clone()) {
trace!("Found in shared cache: {:?}", key);
return Ok(entry)
}
}
trace!("Cache miss: {:?}", key);
let value = self.state.child_storage(storage_key, &key.1[..])?;
RwLockUpgradableReadGuard::upgrade(local_cache).child_storage.insert(key, value.clone());
Ok(value)
} }
fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> { fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
@@ -446,27 +581,27 @@ mod tests {
let h3a = H256::random(); let h3a = H256::random();
let h3b = H256::random(); let h3b = H256::random();
let shared = new_shared_cache::<Block, Blake2Hasher>(256*1024); let shared = new_shared_cache::<Block, Blake2Hasher>(256*1024, (0,1));
// blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ] // blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ]
// state [ 5 5 4 3 2 2 ] // state [ 5 5 4 3 2 2 ]
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone()));
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![2]))], Some(h0.clone()), Some(0), || true); s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![2]))], vec![], Some(h0.clone()), Some(0), || true);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h0.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h0.clone()));
s.cache.sync_cache(&[], &[], vec![], Some(h1a.clone()), Some(1), || true); s.cache.sync_cache(&[], &[], vec![], vec![], Some(h1a.clone()), Some(1), || true);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h0.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h0.clone()));
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![3]))], Some(h1b.clone()), Some(1), || false); s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![3]))], vec![], Some(h1b.clone()), Some(1), || false);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1b.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1b.clone()));
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![4]))], Some(h2b.clone()), Some(2), || false); s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![4]))], vec![], Some(h2b.clone()), Some(2), || false);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1a.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1a.clone()));
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![5]))], Some(h2a.clone()), Some(2), || true); s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![5]))], vec![], Some(h2a.clone()), Some(2), || true);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h2a.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h2a.clone()));
s.cache.sync_cache(&[], &[], vec![], Some(h3a.clone()), Some(3), || true); s.cache.sync_cache(&[], &[], vec![], vec![], Some(h3a.clone()), Some(3), || true);
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone())); let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone()));
assert_eq!(s.storage(&key).unwrap().unwrap(), vec![5]); assert_eq!(s.storage(&key).unwrap().unwrap(), vec![5]);
@@ -487,9 +622,10 @@ mod tests {
&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1b.clone(), h2b.clone(), h3b.clone()],
&[h1a.clone(), h2a.clone(), h3a.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()],
vec![], vec![],
vec![],
Some(h3b.clone()), Some(h3b.clone()),
Some(3), Some(3),
|| true || true,
); );
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone())); let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone()));
assert!(s.storage(&key).unwrap().is_none()); assert!(s.storage(&key).unwrap().is_none());
@@ -498,34 +634,71 @@ mod tests {
#[test] #[test]
fn should_track_used_size_correctly() { fn should_track_used_size_correctly() {
let root_parent = H256::random(); let root_parent = H256::random();
let shared = new_shared_cache::<Block, Blake2Hasher>(5); let shared = new_shared_cache::<Block, Blake2Hasher>(109, ((109-36), 109));
let h0 = H256::random(); let h0 = H256::random();
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone()));
let key = H256::random()[..].to_vec(); let key = H256::random()[..].to_vec();
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2, 3]))], Some(h0.clone()), Some(0), || true); let s_key = H256::random()[..].to_vec();
assert_eq!(shared.lock().used_storage_cache_size(), 3 /* bytes */); s.cache.sync_cache(
&[],
&[],
vec![(key.clone(), Some(vec![1, 2, 3]))],
vec![],
Some(h0.clone()),
Some(0),
|| true,
);
// 32 key, 3 byte size
assert_eq!(shared.lock().used_storage_cache_size(), 35 /* bytes */);
let key = H256::random()[..].to_vec(); let key = H256::random()[..].to_vec();
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2]))], Some(h0.clone()), Some(0), || true); s.cache.sync_cache(
assert_eq!(shared.lock().used_storage_cache_size(), 5 /* bytes */); &[],
&[],
vec![],
vec![(s_key.clone(), vec![(key.clone(), Some(vec![1, 2]))])],
Some(h0.clone()),
Some(0),
|| true,
);
// 35 + (2 * 32) key, 2 byte size
assert_eq!(shared.lock().used_storage_cache_size(), 101 /* bytes */);
} }
#[test] #[test]
fn should_remove_lru_items_based_on_tracking_used_size() { fn should_remove_lru_items_based_on_tracking_used_size() {
let root_parent = H256::random(); let root_parent = H256::random();
let shared = new_shared_cache::<Block, Blake2Hasher>(5); let shared = new_shared_cache::<Block, Blake2Hasher>(36*3, (2,3));
let h0 = H256::random(); let h0 = H256::random();
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone())); let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone()));
let key = H256::random()[..].to_vec(); let key = H256::random()[..].to_vec();
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2, 3, 4]))], Some(h0.clone()), Some(0), || true); s.cache.sync_cache(
assert_eq!(shared.lock().used_storage_cache_size(), 4 /* bytes */); &[],
&[],
vec![(key.clone(), Some(vec![1, 2, 3, 4]))],
vec![],
Some(h0.clone()),
Some(0),
|| true,
);
// 32 key, 4 byte size
assert_eq!(shared.lock().used_storage_cache_size(), 36 /* bytes */);
let key = H256::random()[..].to_vec(); let key = H256::random()[..].to_vec();
s.cache.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2]))], Some(h0.clone()), Some(0), || true); s.cache.sync_cache(
assert_eq!(shared.lock().used_storage_cache_size(), 2 /* bytes */); &[],
&[],
vec![(key.clone(), Some(vec![1, 2]))],
vec![],
Some(h0.clone()),
Some(0),
|| true,
);
// 32 key, 2 byte size
assert_eq!(shared.lock().used_storage_cache_size(), 34 /* bytes */);
} }
} }
+12 -2
View File
@@ -28,6 +28,12 @@ use hash_db::Hasher;
use trie::MemoryDB; use trie::MemoryDB;
use parking_lot::Mutex; use parking_lot::Mutex;
/// In memory array of storage values.
pub type StorageCollection = Vec<(Vec<u8>, Option<Vec<u8>>)>;
/// In memory arrays of storage values for multiple child tries.
pub type ChildStorageCollection = Vec<(Vec<u8>, StorageCollection)>;
/// State of a new block. /// State of a new block.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NewBlockState { pub enum NewBlockState {
@@ -82,8 +88,12 @@ pub trait BlockImportOperation<Block, H> where
fn update_db_storage(&mut self, update: <Self::State as StateBackend<H>>::Transaction) -> error::Result<()>; fn update_db_storage(&mut self, update: <Self::State as StateBackend<H>>::Transaction) -> error::Result<()>;
/// Inject storage data into the database replacing any existing data. /// Inject storage data into the database replacing any existing data.
fn reset_storage(&mut self, top: StorageOverlay, children: ChildrenStorageOverlay) -> error::Result<H::Out>; fn reset_storage(&mut self, top: StorageOverlay, children: ChildrenStorageOverlay) -> error::Result<H::Out>;
/// Set top level storage changes. /// Set storage changes.
fn update_storage(&mut self, update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> error::Result<()>; fn update_storage(
&mut self,
update: StorageCollection,
child_update: ChildStorageCollection,
) -> error::Result<()>;
/// Inject changes trie data into the database. /// Inject changes trie data into the database.
fn update_changes_trie(&mut self, update: MemoryDB<H>) -> error::Result<()>; fn update_changes_trie(&mut self, update: MemoryDB<H>) -> error::Result<()>;
/// Insert auxiliary keys. Values are `None` if should be deleted. /// Insert auxiliary keys. Values are `None` if should be deleted.
+66 -11
View File
@@ -59,7 +59,10 @@ use state_machine::{
}; };
use hash_db::Hasher; use hash_db::Hasher;
use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage}; use crate::backend::{
self, BlockImportOperation, PrunableStateChangesTrieStorage,
StorageCollection, ChildStorageCollection
};
use crate::blockchain::{ use crate::blockchain::{
self, Info as ChainInfo, Backend as ChainBackend, self, Info as ChainInfo, Backend as ChainBackend,
HeaderBackend as ChainHeaderBackend, ProvideCache, Cache, HeaderBackend as ChainHeaderBackend, ProvideCache, Cache,
@@ -77,6 +80,7 @@ use substrate_telemetry::{telemetry, SUBSTRATE_INFO};
use log::{info, trace, warn}; use log::{info, trace, warn};
/// Type that implements `futures::Stream` of block import events. /// Type that implements `futures::Stream` of block import events.
pub type ImportNotifications<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>; pub type ImportNotifications<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
@@ -133,7 +137,15 @@ pub struct Client<B, E, Block, RA> where Block: BlockT {
/// Client import operation, a wrapper for the backend. /// Client import operation, a wrapper for the backend.
pub struct ClientImportOperation<Block: BlockT, H: Hasher<Out=Block::Hash>, B: backend::Backend<Block, H>> { pub struct ClientImportOperation<Block: BlockT, H: Hasher<Out=Block::Hash>, B: backend::Backend<Block, H>> {
op: B::BlockImportOperation, op: B::BlockImportOperation,
notify_imported: Option<(Block::Hash, BlockOrigin, Block::Header, bool, Option<Vec<(Vec<u8>, Option<Vec<u8>>)>>)>, notify_imported: Option<(
Block::Hash,
BlockOrigin,
Block::Header,
bool,
Option<(
StorageCollection,
ChildStorageCollection,
)>)>,
notify_finalized: Vec<Block::Hash>, notify_finalized: Vec<Block::Hash>,
} }
@@ -150,8 +162,10 @@ pub trait BlockchainEvents<Block: BlockT> {
/// Get storage changes event stream. /// Get storage changes event stream.
/// ///
/// Passing `None` as `filter_keys` subscribes to all storage changes. /// Passing `None` as `filter_keys` subscribes to all storage changes.
fn storage_changes_notification_stream(&self, fn storage_changes_notification_stream(
filter_keys: Option<&[StorageKey]> &self,
filter_keys: Option<&[StorageKey]>,
child_filter_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> error::Result<StorageEventStream<Block::Hash>>; ) -> error::Result<StorageEventStream<Block::Hash>>;
} }
@@ -351,6 +365,14 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
.map(StorageData)) .map(StorageData))
} }
/// Given a `BlockId` and a key, return the value under the hash in that block.
pub fn storage_hash(&self, id: &BlockId<Block>, key: &StorageKey)
-> error::Result<Option<Block::Hash>> {
Ok(self.state_at(id)?
.storage_hash(&key.0).map_err(|e| error::Error::from_state(Box::new(e)))?
)
}
/// Given a `BlockId`, a key prefix, and a child storage key, return the matching child storage keys. /// Given a `BlockId`, a key prefix, and a child storage key, return the matching child storage keys.
pub fn child_storage_keys( pub fn child_storage_keys(
&self, &self,
@@ -378,6 +400,18 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
.map(StorageData)) .map(StorageData))
} }
/// Given a `BlockId`, a key and a child storage key, return the hash under the key in that block.
pub fn child_storage_hash(
&self,
id: &BlockId<Block>,
child_storage_key: &StorageKey,
key: &StorageKey
) -> error::Result<Option<Block::Hash>> {
Ok(self.state_at(id)?
.child_storage_hash(&child_storage_key.0, &key.0).map_err(|e| error::Error::from_state(Box::new(e)))?
)
}
/// Get the code at a given block. /// Get the code at a given block.
pub fn code_at(&self, id: &BlockId<Block>) -> error::Result<Vec<u8>> { pub fn code_at(&self, id: &BlockId<Block>) -> error::Result<Vec<u8>> {
Ok(self.storage(id, &StorageKey(well_known_keys::CODE.to_vec()))? Ok(self.storage(id, &StorageKey(well_known_keys::CODE.to_vec()))?
@@ -913,7 +947,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
operation.op.update_db_storage(storage_update)?; operation.op.update_db_storage(storage_update)?;
} }
if let Some(storage_changes) = storage_changes.clone() { if let Some(storage_changes) = storage_changes.clone() {
operation.op.update_storage(storage_changes)?; operation.op.update_storage(storage_changes.0, storage_changes.1)?;
} }
if let Some(Some(changes_update)) = changes_update { if let Some(Some(changes_update)) = changes_update {
operation.op.update_changes_trie(changes_update)?; operation.op.update_changes_trie(changes_update)?;
@@ -942,7 +976,10 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
) -> error::Result<( ) -> error::Result<(
Option<StorageUpdate<B, Block>>, Option<StorageUpdate<B, Block>>,
Option<Option<ChangesUpdate>>, Option<Option<ChangesUpdate>>,
Option<Vec<(Vec<u8>, Option<Vec<u8>>)>>, Option<(
Vec<(Vec<u8>, Option<Vec<u8>>)>,
Vec<(Vec<u8>, Vec<(Vec<u8>, Option<Vec<u8>>)>)>
)>
)> )>
where where
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone, E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone,
@@ -985,7 +1022,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
overlay.commit_prospective(); overlay.commit_prospective();
Ok((Some(storage_update), Some(changes_update), Some(overlay.into_committed().collect()))) let (top, children) = overlay.into_committed();
let children = children.map(|(sk, it)| (sk, it.collect())).collect();
Ok((Some(storage_update), Some(changes_update), Some((top.collect(), children))))
}, },
None => Ok((None, None, None)) None => Ok((None, None, None))
} }
@@ -1084,14 +1123,26 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
fn notify_imported( fn notify_imported(
&self, &self,
notify_import: (Block::Hash, BlockOrigin, Block::Header, bool, Option<Vec<(Vec<u8>, Option<Vec<u8>>)>>), notify_import: (
Block::Hash, BlockOrigin,
Block::Header,
bool,
Option<(
Vec<(Vec<u8>, Option<Vec<u8>>)>,
Vec<(Vec<u8>, Vec<(Vec<u8>, Option<Vec<u8>>)>)>,
)
>),
) -> error::Result<()> { ) -> error::Result<()> {
let (hash, origin, header, is_new_best, storage_changes) = notify_import; let (hash, origin, header, is_new_best, storage_changes) = notify_import;
if let Some(storage_changes) = storage_changes { if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock() self.storage_notifications.lock()
.trigger(&hash, storage_changes.into_iter()); .trigger(
&hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
} }
let notification = BlockImportNotification::<Block> { let notification = BlockImportNotification::<Block> {
@@ -1467,8 +1518,12 @@ where
} }
/// Get storage changes event stream. /// Get storage changes event stream.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>> { fn storage_changes_notification_stream(
Ok(self.storage_notifications.lock().listen(filter_keys)) &self,
filter_keys: Option<&[StorageKey]>,
child_filter_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> error::Result<StorageEventStream<Block::Hash>> {
Ok(self.storage_notifications.lock().listen(filter_keys, child_filter_keys))
} }
} }
+6 -2
View File
@@ -33,7 +33,7 @@ use trie::MemoryDB;
use consensus::well_known_cache_keys::Id as CacheKeyId; use consensus::well_known_cache_keys::Id as CacheKeyId;
use crate::error; use crate::error;
use crate::backend::{self, NewBlockState}; use crate::backend::{self, NewBlockState, StorageCollection, ChildStorageCollection};
use crate::light; use crate::light;
use crate::leaves::LeafSet; use crate::leaves::LeafSet;
use crate::blockchain::{self, BlockStatus, HeaderBackend}; use crate::blockchain::{self, BlockStatus, HeaderBackend};
@@ -515,7 +515,11 @@ where
Ok(()) Ok(())
} }
fn update_storage(&mut self, _update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> error::Result<()> { fn update_storage(
&mut self,
_update: StorageCollection,
_child_update: ChildStorageCollection,
) -> error::Result<()> {
Ok(()) Ok(())
} }
+1 -1
View File
@@ -59,7 +59,7 @@ pub use crate::client::{
new_in_mem, new_in_mem,
BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents, BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents,
BlockImportNotification, Client, ClientInfo, ExecutionStrategies, BlockImportNotification, Client, ClientInfo, ExecutionStrategies,
LongestChain LongestChain,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use crate::notifications::{StorageEventStream, StorageChangeSet}; pub use crate::notifications::{StorageEventStream, StorageChangeSet};
+9 -2
View File
@@ -26,7 +26,10 @@ use runtime_primitives::{generic::BlockId, Justification, StorageOverlay, Childr
use state_machine::{Backend as StateBackend, TrieBackend, backend::InMemory as InMemoryState}; use state_machine::{Backend as StateBackend, TrieBackend, backend::InMemory as InMemoryState};
use runtime_primitives::traits::{Block as BlockT, NumberFor, Zero, Header}; use runtime_primitives::traits::{Block as BlockT, NumberFor, Zero, Header};
use crate::in_mem::{self, check_genesis_storage}; use crate::in_mem::{self, check_genesis_storage};
use crate::backend::{AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState}; use crate::backend::{
AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState,
StorageCollection, ChildStorageCollection,
};
use crate::blockchain::HeaderBackend as BlockchainHeaderBackend; use crate::blockchain::HeaderBackend as BlockchainHeaderBackend;
use crate::error::{Error as ClientError, Result as ClientResult}; use crate::error::{Error as ClientError, Result as ClientResult};
use crate::light::blockchain::{Blockchain, Storage as BlockchainStorage}; use crate::light::blockchain::{Blockchain, Storage as BlockchainStorage};
@@ -310,7 +313,11 @@ where
Ok(()) Ok(())
} }
fn update_storage(&mut self, _update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> ClientResult<()> { fn update_storage(
&mut self,
_update: StorageCollection,
_child_update: ChildStorageCollection,
) -> ClientResult<()> {
// we're not storing anything locally => ignore changes // we're not storing anything locally => ignore changes
Ok(()) Ok(())
} }
+227 -50
View File
@@ -30,18 +30,39 @@ use runtime_primitives::traits::Block as BlockT;
#[derive(Debug)] #[derive(Debug)]
pub struct StorageChangeSet { pub struct StorageChangeSet {
changes: Arc<Vec<(StorageKey, Option<StorageData>)>>, changes: Arc<Vec<(StorageKey, Option<StorageData>)>>,
child_changes: Arc<Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>>,
filter: Option<HashSet<StorageKey>>, filter: Option<HashSet<StorageKey>>,
child_filters: Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
} }
impl StorageChangeSet { impl StorageChangeSet {
/// Convert the change set into iterator over storage items. /// Convert the change set into iterator over storage items.
pub fn iter<'a>(&'a self) -> impl Iterator<Item=&'a (StorageKey, Option<StorageData>)> + 'a { pub fn iter<'a>(&'a self)
self.changes -> impl Iterator<Item=(Option<&'a StorageKey>, &'a StorageKey, Option<&'a StorageData>)> + 'a {
let top = self.changes
.iter() .iter()
.filter(move |&(key, _)| match self.filter { .filter(move |&(key, _)| match self.filter {
Some(ref filter) => filter.contains(key), Some(ref filter) => filter.contains(key),
None => true, None => true,
}) })
.map(move |(k,v)| (None, k, v.as_ref()));
let children = self.child_changes
.iter()
.filter_map(move |(sk, changes)| {
if let Some(cf) = self.child_filters.as_ref() {
if let Some(filter) = cf.get(sk) {
Some(changes
.iter()
.filter(move |&(key, _)| match filter {
Some(ref filter) => filter.contains(key),
None => true,
})
.map(move |(k,v)| (Some(sk), k, v.as_ref())))
} else { None }
} else { None }
})
.flatten();
top.chain(children)
} }
} }
@@ -56,9 +77,14 @@ pub struct StorageNotifications<Block: BlockT> {
next_id: SubscriberId, next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>, wildcard_listeners: FnvHashSet<SubscriberId>,
listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>, listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
child_listeners: HashMap<StorageKey, (
HashMap<StorageKey, FnvHashSet<SubscriberId>>,
FnvHashSet<SubscriberId>
)>,
sinks: FnvHashMap<SubscriberId, ( sinks: FnvHashMap<SubscriberId, (
mpsc::UnboundedSender<(Block::Hash, StorageChangeSet)>, mpsc::UnboundedSender<(Block::Hash, StorageChangeSet)>,
Option<HashSet<StorageKey>>, Option<HashSet<StorageKey>>,
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
)>, )>,
} }
@@ -68,6 +94,7 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
next_id: Default::default(), next_id: Default::default(),
wildcard_listeners: Default::default(), wildcard_listeners: Default::default(),
listeners: Default::default(), listeners: Default::default(),
child_listeners: Default::default(),
sinks: Default::default(), sinks: Default::default(),
} }
} }
@@ -78,16 +105,24 @@ impl<Block: BlockT> StorageNotifications<Block> {
/// ///
/// Note the changes are going to be filtered by listener's filter key. /// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes. /// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(&mut self, hash: &Block::Hash, changeset: impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>) { pub fn trigger(
&mut self,
hash: &Block::Hash,
changeset: impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item=(Vec<u8>, impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>)
>,
) {
let has_wildcard = !self.wildcard_listeners.is_empty(); let has_wildcard = !self.wildcard_listeners.is_empty();
// early exit if no listeners // early exit if no listeners
if !has_wildcard && self.listeners.is_empty() { if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() {
return; return;
} }
let mut subscribers = self.wildcard_listeners.clone(); let mut subscribers = self.wildcard_listeners.clone();
let mut changes = Vec::new(); let mut changes = Vec::new();
let mut child_changes = Vec::new();
// Collect subscribers and changes // Collect subscribers and changes
for (k, v) in changeset { for (k, v) in changeset {
@@ -102,21 +137,47 @@ impl<Block: BlockT> StorageNotifications<Block> {
changes.push((k, v.map(StorageData))); changes.push((k, v.map(StorageData)));
} }
} }
for (sk, changeset) in child_changeset {
let sk = StorageKey(sk);
if let Some((cl, cw)) = self.child_listeners.get(&sk) {
let mut changes = Vec::new();
for (k, v) in changeset {
let k = StorageKey(k);
let listeners = cl.get(&k);
if let Some(ref listeners) = listeners {
subscribers.extend(listeners.iter());
}
subscribers.extend(cw.iter());
if !cw.is_empty() || listeners.is_some() {
changes.push((k, v.map(StorageData)));
}
}
if !changes.is_empty() {
child_changes.push((sk, changes));
}
}
}
// Don't send empty notifications // Don't send empty notifications
if changes.is_empty() { if changes.is_empty() && child_changes.is_empty() {
return; return;
} }
let changes = Arc::new(changes); let changes = Arc::new(changes);
let child_changes = Arc::new(child_changes);
// Trigger the events // Trigger the events
for subscriber in subscribers { for subscriber in subscribers {
let should_remove = { let should_remove = {
let &(ref sink, ref filter) = self.sinks.get(&subscriber) let &(ref sink, ref filter, ref child_filters) = self.sinks.get(&subscriber)
.expect("subscribers returned from self.listeners are always in self.sinks; qed"); .expect("subscribers returned from self.listeners are always in self.sinks; qed");
sink.unbounded_send((hash.clone(), StorageChangeSet { sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(), changes: changes.clone(),
child_changes: child_changes.clone(),
filter: filter.clone(), filter: filter.clone(),
child_filters: child_filters.clone(),
})).is_err() })).is_err()
}; };
@@ -126,53 +187,120 @@ impl<Block: BlockT> StorageNotifications<Block> {
} }
} }
fn remove_subscriber(&mut self, subscriber: SubscriberId) { fn remove_subscriber_from(
if let Some((_, filters)) = self.sinks.remove(&subscriber) { subscriber: &SubscriberId,
filters: &Option<HashSet<StorageKey>>,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
){
match filters { match filters {
None => { None => {
self.wildcard_listeners.remove(&subscriber); wildcards.remove(subscriber);
}, },
Some(filters) => { Some(filters) => {
for key in filters {
let remove_key = match self.listeners.get_mut(&key) { for key in filters.iter() {
let remove_key = match listeners.get_mut(key) {
Some(ref mut set) => { Some(ref mut set) => {
set.remove(&subscriber); set.remove(subscriber);
set.is_empty() set.is_empty()
}, },
None => false, None => false,
}; };
if remove_key { if remove_key {
self.listeners.remove(&key); listeners.remove(key);
} }
} }
}
}
}
fn remove_subscriber(&mut self, subscriber: SubscriberId) {
if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut self.listeners,
&mut self.wildcard_listeners,
);
if let Some(child_filters) = child_filters.as_ref() {
for (c_key, filters) in child_filters {
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut *listeners,
&mut *wildcards,
);
if listeners.is_empty() && wildcards.is_empty() {
self.child_listeners.remove(&c_key);
}
}
}
}
}
}
fn listen_from(
current_id: SubscriberId,
filter_keys: &Option<impl AsRef<[StorageKey]>>,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
) -> Option<HashSet<StorageKey>>
{
match filter_keys {
None => {
wildcards.insert(current_id);
None
}, },
} Some(keys) => Some(keys.as_ref().iter().map(|key| {
listeners
.entry(key.clone())
.or_insert_with(Default::default)
.insert(current_id);
key.clone()
}).collect())
} }
} }
/// Start listening for particular storage keys. /// Start listening for particular storage keys.
pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> StorageEventStream<Block::Hash> { pub fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
self.next_id += 1; self.next_id += 1;
let current_id = self.next_id;
// add subscriber for every key // add subscriber for every key
let keys = match filter_keys { let keys = Self::listen_from(
None => { current_id,
self.wildcard_listeners.insert(self.next_id); &filter_keys,
None &mut self.listeners,
}, &mut self.wildcard_listeners,
Some(keys) => Some(keys.iter().map(|key| { );
self.listeners let child_keys = filter_child_keys.map(|filter_child_keys| {
.entry(key.clone()) filter_child_keys.iter().map(|(c_key, o_keys)| {
.or_insert_with(Default::default) let (c_listeners, c_wildcards) = self.child_listeners
.insert(self.next_id); .entry(c_key.clone())
key.clone() .or_insert_with(Default::default);
}).collect())
}; (c_key.clone(), Self::listen_from(
current_id,
o_keys,
&mut *c_listeners,
&mut *c_wildcards,
))
}).collect()
});
// insert sink // insert sink
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
self.sinks.insert(self.next_id, (tx, keys)); self.sinks.insert(current_id, (tx, keys, child_keys));
rx rx
} }
} }
@@ -182,13 +310,26 @@ mod tests {
use runtime_primitives::testing::{H256 as Hash, Block as RawBlock, ExtrinsicWrapper}; use runtime_primitives::testing::{H256 as Hash, Block as RawBlock, ExtrinsicWrapper};
use super::*; use super::*;
use futures::Stream; use futures::Stream;
use std::iter::{empty, Empty};
type TestChangeSet = (
Vec<(StorageKey, Option<StorageData>)>,
Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>,
);
#[cfg(test)] #[cfg(test)]
impl From<Vec<(StorageKey, Option<StorageData>)>> for StorageChangeSet { impl From<TestChangeSet> for StorageChangeSet {
fn from(changes: Vec<(StorageKey, Option<StorageData>)>) -> Self { fn from(changes: TestChangeSet) -> Self {
// warning hardcoded child trie wildcard to test upon
let child_filters = Some([
(StorageKey(vec![4]), None),
(StorageKey(vec![5]), None),
].into_iter().cloned().collect());
StorageChangeSet { StorageChangeSet {
changes: Arc::new(changes), changes: Arc::new(changes.0),
child_changes: Arc::new(changes.1),
filter: None, filter: None,
child_filters,
} }
} }
} }
@@ -206,43 +347,73 @@ mod tests {
fn triggering_change_should_notify_wildcard_listeners() { fn triggering_change_should_notify_wildcard_listeners() {
// given // given
let mut notifications = StorageNotifications::<Block>::default(); let mut notifications = StorageNotifications::<Block>::default();
let mut recv = notifications.listen(None).wait(); let child_filter = [(StorageKey(vec![4]), None)];
let mut recv = notifications.listen(None, Some(&child_filter[..])).wait();
// when // when
let changeset = vec![ let changeset = vec![
(vec![2], Some(vec![3])), (vec![2], Some(vec![3])),
(vec![3], None), (vec![3], None),
]; ];
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); let c_changeset_1 = vec![
(vec![5], Some(vec![4])),
(vec![6], None),
];
let c_changeset = vec![(vec![4], c_changeset_1)];
notifications.trigger(
&Hash::from_low_u64_be(1),
changeset.into_iter(),
c_changeset.into_iter().map(|(a,b)| (a, b.into_iter())),
);
// then // then
assert_eq!(recv.next().unwrap(), Ok((Hash::from_low_u64_be(1), vec![ assert_eq!(recv.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![
(StorageKey(vec![2]), Some(StorageData(vec![3]))), (StorageKey(vec![2]), Some(StorageData(vec![3]))),
(StorageKey(vec![3]), None), (StorageKey(vec![3]), None),
].into()))); ], vec![(StorageKey(vec![4]), vec![
(StorageKey(vec![5]), Some(StorageData(vec![4]))),
(StorageKey(vec![6]), None),
])]).into())));
} }
#[test] #[test]
fn should_only_notify_interested_listeners() { fn should_only_notify_interested_listeners() {
// given // given
let mut notifications = StorageNotifications::<Block>::default(); let mut notifications = StorageNotifications::<Block>::default();
let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])]), None).wait();
let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])]), None).wait();
let mut recv3 = notifications.listen(Some(&[]), Some(&child_filter)).wait();
// when // when
let changeset = vec![ let changeset = vec![
(vec![2], Some(vec![3])), (vec![2], Some(vec![3])),
(vec![1], None), (vec![1], None),
]; ];
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); let c_changeset_1 = vec![
(vec![5], Some(vec![4])),
(vec![6], None),
];
let c_changeset = vec![(vec![4], c_changeset_1)];
notifications.trigger(
&Hash::from_low_u64_be(1),
changeset.into_iter(),
c_changeset.into_iter().map(|(a,b)| (a, b.into_iter())),
);
// then // then
assert_eq!(recv1.next().unwrap(), Ok((Hash::from_low_u64_be(1), vec![ assert_eq!(recv1.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![
(StorageKey(vec![1]), None), (StorageKey(vec![1]), None),
].into()))); ], vec![]).into())));
assert_eq!(recv2.next().unwrap(), Ok((Hash::from_low_u64_be(1), vec![ assert_eq!(recv2.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![
(StorageKey(vec![2]), Some(StorageData(vec![3]))), (StorageKey(vec![2]), Some(StorageData(vec![3]))),
].into()))); ], vec![]).into())));
assert_eq!(recv3.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![],
vec![
(StorageKey(vec![4]), vec![(StorageKey(vec![5]), Some(StorageData(vec![4])))]),
]).into())));
} }
#[test] #[test]
@@ -250,11 +421,14 @@ mod tests {
// given // given
let mut notifications = StorageNotifications::<Block>::default(); let mut notifications = StorageNotifications::<Block>::default();
{ {
let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])]), None).wait();
let _recv3 = notifications.listen(None).wait(); let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])]), None).wait();
let _recv3 = notifications.listen(None, None).wait();
let _recv4 = notifications.listen(None, Some(&child_filter)).wait();
assert_eq!(notifications.listeners.len(), 2); assert_eq!(notifications.listeners.len(), 2);
assert_eq!(notifications.wildcard_listeners.len(), 1); assert_eq!(notifications.wildcard_listeners.len(), 2);
assert_eq!(notifications.child_listeners.len(), 1);
} }
// when // when
@@ -262,11 +436,13 @@ mod tests {
(vec![2], Some(vec![3])), (vec![2], Some(vec![3])),
(vec![1], None), (vec![1], None),
]; ];
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); let c_changeset = empty::<(_, Empty<_>)>();
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);
// then // then
assert_eq!(notifications.listeners.len(), 0); assert_eq!(notifications.listeners.len(), 0);
assert_eq!(notifications.wildcard_listeners.len(), 0); assert_eq!(notifications.wildcard_listeners.len(), 0);
assert_eq!(notifications.child_listeners.len(), 0);
} }
#[test] #[test]
@@ -274,11 +450,12 @@ mod tests {
// given // given
let mut recv = { let mut recv = {
let mut notifications = StorageNotifications::<Block>::default(); let mut notifications = StorageNotifications::<Block>::default();
let recv = notifications.listen(None).wait(); let recv = notifications.listen(None, None).wait();
// when // when
let changeset = vec![]; let changeset = vec![];
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter()); let c_changeset = empty::<(_, Empty<_>)>();
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);
recv recv
}; };
+20 -10
View File
@@ -362,8 +362,9 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
} }
fn storage_hash(&self, key: StorageKey, block: Option<Block::Hash>) -> Result<Option<Block::Hash>> { fn storage_hash(&self, key: StorageKey, block: Option<Block::Hash>) -> Result<Option<Block::Hash>> {
use runtime_primitives::traits::{Hash, Header as HeaderT}; let block = self.unwrap_or_best(block)?;
Ok(self.storage(key, block)?.map(|x| <Block::Header as HeaderT>::Hashing::hash(&x.0))) trace!(target: "rpc", "Querying storage hash at {:?} for key {}", block, HexDisplay::from(&key.0));
Ok(self.client.storage_hash(&BlockId::Hash(block), &key)?)
} }
fn storage_size(&self, key: StorageKey, block: Option<Block::Hash>) -> Result<Option<u64>> { fn storage_size(&self, key: StorageKey, block: Option<Block::Hash>) -> Result<Option<u64>> {
@@ -398,11 +399,13 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
key: StorageKey, key: StorageKey,
block: Option<Block::Hash> block: Option<Block::Hash>
) -> Result<Option<Block::Hash>> { ) -> Result<Option<Block::Hash>> {
use runtime_primitives::traits::{Hash, Header as HeaderT}; let block = self.unwrap_or_best(block)?;
Ok( trace!(
self.child_storage(child_storage_key, key, block)? target: "rpc", "Querying child storage hash at {:?} for key {}",
.map(|x| <Block::Header as HeaderT>::Hashing::hash(&x.0)) block,
) HexDisplay::from(&key.0),
);
Ok(self.client.child_storage_hash(&BlockId::Hash(block), &child_storage_key, &key)?)
} }
fn child_storage_size( fn child_storage_size(
@@ -439,7 +442,10 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
keys: Option<Vec<StorageKey>> keys: Option<Vec<StorageKey>>
) { ) {
let keys = Into::<Option<Vec<_>>>::into(keys); let keys = Into::<Option<Vec<_>>>::into(keys);
let stream = match self.client.storage_changes_notification_stream(keys.as_ref().map(|x| &**x)) { let stream = match self.client.storage_changes_notification_stream(
keys.as_ref().map(|x| &**x),
None
) {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
let _ = subscriber.reject(error::Error::from(err).into()); let _ = subscriber.reject(error::Error::from(err).into());
@@ -466,7 +472,10 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
.map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) .map_err(|e| warn!("Error creating storage notification stream: {:?}", e))
.map(|(block, changes)| Ok(StorageChangeSet { .map(|(block, changes)| Ok(StorageChangeSet {
block, block,
changes: changes.iter().cloned().collect(), changes: changes.iter()
.filter_map(|(o_sk, k, v)| if o_sk.is_none() {
Some((k.clone(),v.cloned()))
} else { None }).collect(),
})); }));
sink sink
@@ -488,7 +497,8 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: Subscriber<RuntimeVersion>) { fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: Subscriber<RuntimeVersion>) {
let stream = match self.client.storage_changes_notification_stream( let stream = match self.client.storage_changes_notification_stream(
Some(&[StorageKey(storage::well_known_keys::CODE.to_vec())]) Some(&[StorageKey(storage::well_known_keys::CODE.to_vec())]),
None,
) { ) {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
+4
View File
@@ -499,6 +499,8 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
let db_settings = client_db::DatabaseSettings { let db_settings = client_db::DatabaseSettings {
cache_size: config.database_cache_size.map(|u| u as usize), cache_size: config.database_cache_size.map(|u| u as usize),
state_cache_size: config.state_cache_size, state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
path: config.database_path.as_str().into(), path: config.database_path.as_str().into(),
pruning: config.pruning.clone(), pruning: config.pruning.clone(),
}; };
@@ -591,6 +593,8 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
let db_settings = client_db::DatabaseSettings { let db_settings = client_db::DatabaseSettings {
cache_size: None, cache_size: None,
state_cache_size: config.state_cache_size, state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
path: config.database_path.as_str().into(), path: config.database_path.as_str().into(),
pruning: config.pruning.clone(), pruning: config.pruning.clone(),
}; };
+3
View File
@@ -50,6 +50,8 @@ pub struct Configuration<C, G: Serialize + DeserializeOwned + BuildStorage> {
pub database_cache_size: Option<u32>, pub database_cache_size: Option<u32>,
/// Size of internal state cache in Bytes /// Size of internal state cache in Bytes
pub state_cache_size: usize, pub state_cache_size: usize,
/// Size in percent of cache size dedicated to child tries
pub state_cache_child_ratio: Option<usize>,
/// Pruning settings. /// Pruning settings.
pub pruning: PruningMode, pub pruning: PruningMode,
/// Additional key seeds. /// Additional key seeds.
@@ -100,6 +102,7 @@ impl<C: Default, G: Serialize + DeserializeOwned + BuildStorage> Configuration<C
database_path: Default::default(), database_path: Default::default(),
database_cache_size: Default::default(), database_cache_size: Default::default(),
state_cache_size: Default::default(), state_cache_size: Default::default(),
state_cache_child_ratio: Default::default(),
keys: Default::default(), keys: Default::default(),
custom: Default::default(), custom: Default::default(),
pruning: PruningMode::default(), pruning: PruningMode::default(),
+1
View File
@@ -139,6 +139,7 @@ fn node_config<F: ServiceFactory> (
database_path: root.join("db").to_str().unwrap().into(), database_path: root.join("db").to_str().unwrap().into(),
database_cache_size: None, database_cache_size: None,
state_cache_size: 16777216, state_cache_size: 16777216,
state_cache_child_ratio: None,
pruning: Default::default(), pruning: Default::default(),
keys: keys, keys: keys,
chain_spec: (*spec).clone(), chain_spec: (*spec).clone(),
@@ -51,6 +51,11 @@ pub trait Backend<H: Hasher> {
/// Get keyed child storage or None if there is nothing associated. /// Get keyed child storage or None if there is nothing associated.
fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>; fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
/// Get child keyed storage value hash or None if there is nothing associated.
fn child_storage_hash(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<H::Out>, Self::Error> {
self.child_storage(storage_key, key).map(|v| v.map(|v| H::hash(&v)))
}
/// true if a key exists in storage. /// true if a key exists in storage.
fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> { fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
Ok(self.storage(key)?.is_some()) Ok(self.storage(key)?.is_some())
@@ -255,9 +255,13 @@ impl OverlayedChanges {
/// ///
/// Panics: /// Panics:
/// Will panic if there are any uncommitted prospective changes. /// Will panic if there are any uncommitted prospective changes.
pub fn into_committed(self) -> impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)> { pub fn into_committed(self) -> (
impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
impl Iterator<Item=(Vec<u8>, impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>)>,
){
assert!(self.prospective.is_empty()); assert!(self.prospective.is_empty());
self.committed.top.into_iter().map(|(k, v)| (k, v.value)) (self.committed.top.into_iter().map(|(k, v)| (k, v.value)),
self.committed.children.into_iter().map(|(sk, v)| (sk, v.1.into_iter())))
} }
/// Inserts storage entry responsible for current extrinsic index. /// Inserts storage entry responsible for current extrinsic index.