State cache and other performance optimizations (#1345)

* State caching

* Better code caching

* Execution optimizaton

* More optimizations

* Updated wasmi

* Caching test

* Style

* Style

* Reverted some minor changes

* Style and typos

* Style and typos

* Removed panics on missing memory
This commit is contained in:
Arkadiy Paronyan
2019-01-08 15:13:13 +03:00
committed by Gav Wood
parent e0639c435b
commit b104c02eb6
26 changed files with 796 additions and 266 deletions
+1
View File
@@ -8,6 +8,7 @@ parking_lot = "0.7.1"
log = "0.4"
kvdb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
lru-cache = "0.1"
hash-db = { git = "https://github.com/paritytech/trie" }
substrate-primitives = { path = "../../primitives" }
sr-primitives = { path = "../../sr-primitives" }
+63 -25
View File
@@ -29,6 +29,7 @@ extern crate kvdb_rocksdb;
extern crate kvdb;
extern crate hash_db;
extern crate parking_lot;
extern crate lru_cache;
extern crate substrate_state_machine as state_machine;
extern crate substrate_primitives as primitives;
extern crate sr_primitives as runtime_primitives;
@@ -52,6 +53,7 @@ extern crate kvdb_memorydb;
pub mod light;
mod cache;
mod storage_cache;
mod utils;
use std::sync::Arc;
@@ -75,10 +77,12 @@ use state_machine::{CodeExecutor, DBValue, ExecutionStrategy};
use utils::{Meta, db_err, meta_keys, open_database, read_db, block_id_to_lookup_key, read_meta};
use client::LeafSet;
use state_db::StateDb;
use storage_cache::{CachingState, SharedCache, new_shared_cache};
pub use state_db::PruningMode;
const CANONICALIZATION_DELAY: u64 = 256;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u64 = 32768;
const STATE_CACHE_SIZE_BYTES: usize = 16 * 1024 * 1024;
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2Hasher>>, Blake2Hasher>;
@@ -270,8 +274,9 @@ impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> {
/// Database transaction
pub struct BlockImportOperation<Block: BlockT, H: Hasher> {
old_state: DbState,
updates: MemoryDB<H>,
old_state: CachingState<Blake2Hasher, DbState, Block>,
db_updates: MemoryDB<H>,
storage_updates: Vec<(Vec<u8>, Option<Vec<u8>>)>,
changes_trie_updates: MemoryDB<H>,
pending_block: Option<PendingBlock<Block>>,
aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
@@ -292,7 +297,7 @@ impl<Block> client::backend::BlockImportOperation<Block, Blake2Hasher>
for BlockImportOperation<Block, Blake2Hasher>
where Block: BlockT<Hash=H256>,
{
type State = DbState;
type State = CachingState<Blake2Hasher, DbState, Block>;
fn state(&self) -> Result<Option<&Self::State>, client::error::Error> {
Ok(Some(&self.old_state))
@@ -319,8 +324,8 @@ where Block: BlockT<Hash=H256>,
// currently authorities are not cached on full nodes
}
fn update_storage(&mut self, update: MemoryDB<Blake2Hasher>) -> Result<(), client::error::Error> {
self.updates = update;
fn update_db_storage(&mut self, update: MemoryDB<Blake2Hasher>) -> Result<(), client::error::Error> {
self.db_updates = update;
Ok(())
}
@@ -349,7 +354,7 @@ where Block: BlockT<Hash=H256>,
let (root, update) = self.old_state.storage_root(top.into_iter().map(|(k, v)| (k, Some(v))));
transaction.consolidate(update);
self.updates = transaction;
self.db_updates = transaction;
Ok(root)
}
@@ -364,6 +369,11 @@ where Block: BlockT<Hash=H256>,
self.aux_ops = ops.into_iter().collect();
Ok(())
}
fn update_storage(&mut self, update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> Result<(), client::error::Error> {
self.storage_updates = update;
Ok(())
}
}
struct StorageDb<Block: BlockT> {
@@ -503,6 +513,7 @@ pub struct Backend<Block: BlockT> {
changes_tries_storage: DbChangesTrieStorage<Block>,
blockchain: BlockchainDb<Block>,
canonicalization_delay: u64,
shared_cache: SharedCache<Block, Blake2Hasher>,
}
impl<Block: BlockT> Backend<Block> {
@@ -550,6 +561,7 @@ impl<Block: BlockT> Backend<Block> {
changes_tries_storage,
blockchain,
canonicalization_delay,
shared_cache: new_shared_cache(STATE_CACHE_SIZE_BYTES),
})
}
@@ -669,7 +681,7 @@ impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Has
impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
type State = DbState;
type State = CachingState<Blake2Hasher, DbState, Block>;
type ChangesTrieStorage = DbChangesTrieStorage<Block>;
fn begin_operation(&self, block: BlockId<Block>) -> Result<Self::BlockImportOperation, client::error::Error> {
@@ -677,7 +689,8 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
Ok(BlockImportOperation {
pending_block: None,
old_state: state,
updates: MemoryDB::default(),
db_updates: MemoryDB::default(),
storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(),
aux_ops: Vec::new(),
})
@@ -697,6 +710,9 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
// blocks are keyed by number + hash.
let lookup_key = ::utils::number_and_hash_to_lookup_key(number, hash);
let mut enacted = Vec::default();
let mut retracted = Vec::default();
if pending_block.leaf_state.is_best() {
let meta = self.blockchain.meta.read();
@@ -710,10 +726,11 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
// uncanonicalize: check safety violations and ensure the numbers no longer
// point to these block hashes in the key mapping.
for retracted in tree_route.retracted() {
if retracted.hash == meta.finalized_hash {
for r in tree_route.retracted() {
retracted.push(r.hash.clone());
if r.hash == meta.finalized_hash {
warn!("Potential safety failure: reverting finalized block {:?}",
(&retracted.number, &retracted.hash));
(&r.number, &r.hash));
return Err(::client::error::ErrorKind::NotInFinalizedChain.into());
}
@@ -721,17 +738,18 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
::utils::remove_number_to_key_mapping(
&mut transaction,
columns::KEY_LOOKUP,
retracted.number
r.number
);
}
// canonicalize: set the number lookup to map to this block's hash.
for enacted in tree_route.enacted() {
for e in tree_route.enacted() {
enacted.push(e.hash.clone());
::utils::insert_number_to_key_mapping(
&mut transaction,
columns::KEY_LOOKUP,
enacted.number,
enacted.hash
e.number,
e.hash
);
}
}
@@ -766,7 +784,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
}
let mut changeset: state_db::ChangeSet<H256> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.updates.drain() {
for (key, (val, rc)) in operation.db_updates.drain() {
if rc > 0 {
changeset.inserted.push((key, val.to_vec()));
} else if rc < 0 {
@@ -792,8 +810,8 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
self.force_delayed_canonicalize(&mut transaction, hash, *pending_block.header.number())?
}
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number,
pending_block.leaf_state.is_best());
let is_best = pending_block.leaf_state.is_best();
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);
{
let mut leaves = self.blockchain.leaves.write();
@@ -817,6 +835,16 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
pending_block.leaf_state.is_best(),
finalized,
);
// sync canonical state cache
operation.old_state.sync_cache(
&enacted,
&retracted,
operation.storage_updates,
Some(hash),
Some(number),
|| is_best
);
}
Ok(())
}
@@ -898,7 +926,8 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
BlockId::Hash(h) if h == Default::default() => {
let genesis_storage = DbGenesisStorage::new();
let root = genesis_storage.0.clone();
return Ok(DbState::new(Arc::new(genesis_storage), root));
let state = DbState::new(Arc::new(genesis_storage), root);
return Ok(CachingState::new(state, self.shared_cache.clone(), None));
},
_ => {}
}
@@ -906,12 +935,21 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
match self.blockchain.header(block) {
Ok(Some(ref hdr)) if !self.storage.state_db.is_pruned(hdr.number().as_()) => {
let root = H256::from_slice(hdr.state_root().as_ref());
Ok(DbState::new(self.storage.clone(), root))
let state = DbState::new(self.storage.clone(), root);
Ok(CachingState::new(state, self.shared_cache.clone(), Some(hdr.hash())))
},
Err(e) => Err(e),
_ => Err(client::error::ErrorKind::UnknownBlock(format!("{:?}", block)).into()),
}
}
fn destroy_state(&self, mut state: Self::State) -> Result<(), client::error::Error> {
if let Some(hash) = state.parent_hash.clone() {
let is_best = || self.blockchain.meta.read().best_hash == hash;
state.sync_cache(&[], &[], vec![], None, None, is_best);
}
Ok(())
}
}
impl<Block> client::backend::LocalBackend<Block, Blake2Hasher> for Backend<Block>
@@ -1092,7 +1130,7 @@ mod tests {
];
let (root, overlay) = op.old_state.storage_root(storage.iter().cloned());
op.update_storage(overlay).unwrap();
op.update_db_storage(overlay).unwrap();
header.state_root = root.into();
op.set_block_data(
@@ -1138,7 +1176,7 @@ mod tests {
op.reset_storage(storage.iter().cloned().collect(), Default::default()).unwrap();
key = op.updates.insert(b"hello");
key = op.db_updates.insert(b"hello");
op.set_block_data(
header,
Some(vec![]),
@@ -1171,8 +1209,8 @@ mod tests {
).0.into();
let hash = header.hash();
op.updates.insert(b"hello");
op.updates.remove(&key);
op.db_updates.insert(b"hello");
op.db_updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
@@ -1204,7 +1242,7 @@ mod tests {
.map(|(x, y)| (x, Some(y)))
).0.into();
op.updates.remove(&key);
op.db_updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
@@ -0,0 +1,416 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Global cache state.
use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use lru_cache::LruCache;
use hash_db::Hasher;
use runtime_primitives::traits::{Block, Header};
use state_machine::{backend::Backend as StateBackend, TrieBackend};
const STATE_CACHE_BLOCKS: usize = 12;
type StorageKey = Vec<u8>;
type StorageValue = Vec<u8>;
/// Shared canonical state cache.
pub struct Cache<B: Block, H: Hasher> {
/// Storage cache. `None` indicates that key is known to be missing.
storage: LruCache<StorageKey, Option<StorageValue>>,
/// Storage hashes cache. `None` indicates that key is known to be missing.
hashes: LruCache<StorageKey, Option<H::Out>>,
/// Information on the modifications in recently committed blocks; specifically which keys
/// changed in which block. Ordered by block number.
modifications: VecDeque<BlockChanges<B::Header>>,
}
pub type SharedCache<B, H> = Arc<Mutex<Cache<B, H>>>;
/// Create new shared cache instance with given max memory usage.
pub fn new_shared_cache<B: Block, H: Hasher>(shared_cache_size: usize) -> SharedCache<B, H> {
let cache_items = shared_cache_size / 100; // Estimated average item size. TODO: more accurate tracking
Arc::new(Mutex::new(Cache {
storage: LruCache::new(cache_items),
hashes: LruCache::new(cache_items),
modifications: VecDeque::new(),
}))
}
#[derive(Debug)]
/// Accumulates a list of storage changed in a block.
struct BlockChanges<B: Header> {
/// Block number.
number: B::Number,
/// Block hash.
hash: B::Hash,
/// Parent block hash.
parent: B::Hash,
/// A set of modified storage keys.
storage: HashSet<StorageKey>,
/// Block is part of the canonical chain.
is_canon: bool,
}
/// Cached values specific to a state.
struct LocalCache<H: Hasher> {
/// Storage cache. `None` indicates that key is known to be missing.
storage: HashMap<StorageKey, Option<StorageValue>>,
/// Storage hashes cache. `None` indicates that key is known to be missing.
hashes: HashMap<StorageKey, Option<H::Out>>,
}
/// State abstraction.
/// Manages shared global state cache which reflects the canonical
/// state as it is on the disk.
/// A instance of `CachingState` may be created as canonical or not.
/// For canonical instances local cache is accumulated and applied
/// in `sync_cache` along with the change overlay.
/// For non-canonical clones local cache and changes are dropped.
pub struct CachingState<H: Hasher, S: StateBackend<H>, B: Block> {
/// Backing state.
state: S,
/// Shared canonical state cache.
shared_cache: SharedCache<B, H>,
/// Local cache of values for this state.
local_cache: RwLock<LocalCache<H>>,
/// Hash of the block on top of which this instance was created or
/// `None` if cache is disabled
pub parent_hash: Option<B::Hash>,
}
impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
/// Create a new instance wrapping generic State and shared cache.
pub fn new(state: S, shared_cache: SharedCache<B, H>, parent_hash: Option<B::Hash>) -> CachingState<H, S, B> {
CachingState {
state,
shared_cache,
local_cache: RwLock::new(LocalCache {
storage: Default::default(),
hashes: Default::default(),
}),
parent_hash: parent_hash,
}
}
/// Propagate local cache into the shared cache and synchonize
/// the shared cache with the best block state.
/// This function updates the shared cache by removing entries
/// that are invalidated by chain reorganization. `sync_cache`
/// should be called after the block has been committed and the
/// blockchain route has been calculated.
pub fn sync_cache<F: FnOnce() -> bool> (
&mut self,
enacted: &[B::Hash],
retracted: &[B::Hash],
changes: Vec<(StorageKey, Option<StorageValue>)>,
commit_hash: Option<B::Hash>,
commit_number: Option<<B::Header as Header>::Number>,
is_best: F,
) {
let mut cache = self.shared_cache.lock();
let is_best = is_best();
trace!("Syncing cache, id = (#{:?}, {:?}), parent={:?}, best={}", commit_number, commit_hash, self.parent_hash, is_best);
let cache = &mut *cache;
// Purge changes from re-enacted and retracted blocks.
// Filter out commiting block if any.
let mut clear = false;
for block in enacted.iter().filter(|h| commit_hash.as_ref().map_or(true, |p| *h != p)) {
clear = clear || {
if let Some(ref mut m) = cache.modifications.iter_mut().find(|m| &m.hash == block) {
trace!("Reverting enacted block {:?}", block);
m.is_canon = true;
for a in &m.storage {
trace!("Reverting enacted key {:?}", a);
cache.storage.remove(a);
}
false
} else {
true
}
};
}
for block in retracted {
clear = clear || {
if let Some(ref mut m) = cache.modifications.iter_mut().find(|m| &m.hash == block) {
trace!("Retracting block {:?}", block);
m.is_canon = false;
for a in &m.storage {
trace!("Retracted key {:?}", a);
cache.storage.remove(a);
}
false
} else {
true
}
};
}
if clear {
// We don't know anything about the block; clear everything
trace!("Wiping cache");
cache.storage.clear();
cache.modifications.clear();
}
// Propagate cache only if committing on top of the latest canonical state
// blocks are ordered by number and only one block with a given number is marked as canonical
// (contributed to canonical state cache)
if let Some(_) = self.parent_hash {
let mut local_cache = self.local_cache.write();
if is_best {
trace!("Committing {} local, {} hashes, {} modified entries", local_cache.storage.len(), local_cache.hashes.len(), changes.len());
for (k, v) in local_cache.storage.drain() {
cache.storage.insert(k, v);
}
for (k, v) in local_cache.hashes.drain() {
cache.hashes.insert(k, v);
}
}
}
if let (
Some(ref number), Some(ref hash), Some(ref parent))
= (commit_number, commit_hash, self.parent_hash)
{
if cache.modifications.len() == STATE_CACHE_BLOCKS {
cache.modifications.pop_back();
}
let mut modifications = HashSet::new();
for (k, v) in changes.into_iter() {
modifications.insert(k.clone());
if is_best {
cache.hashes.remove(&k);
cache.storage.insert(k, v);
}
}
// Save modified storage. These are ordered by the block number.
let block_changes = BlockChanges {
storage: modifications,
number: *number,
hash: hash.clone(),
is_canon: is_best,
parent: parent.clone(),
};
let insert_at = cache.modifications.iter()
.enumerate()
.find(|&(_, m)| m.number < *number)
.map(|(i, _)| i);
trace!("Inserting modifications at {:?}", insert_at);
if let Some(insert_at) = insert_at {
cache.modifications.insert(insert_at, block_changes);
} else {
cache.modifications.push_back(block_changes);
}
}
}
/// Check if the key can be returned from cache by matching current block parent hash against canonical
/// state and filtering out entries modified in later blocks.
fn is_allowed(
key: &[u8],
parent_hash: &Option<B::Hash>,
modifications:
&VecDeque<BlockChanges<B::Header>>
) -> bool
{
let mut parent = match *parent_hash {
None => {
trace!("Cache lookup skipped for {:?}: no parent hash", key);
return false;
}
Some(ref parent) => parent,
};
if modifications.is_empty() {
trace!("Cache lookup allowed for {:?}", key);
return true;
}
// Ignore all storage modified in later blocks
// Modifications contains block ordered by the number
// We search for our parent in that list first and then for
// all its parent until we hit the canonical block,
// checking against all the intermediate modifications.
for m in modifications {
if &m.hash == parent {
if m.is_canon {
return true;
}
parent = &m.parent;
}
if m.storage.contains(key) {
trace!("Cache lookup skipped for {:?}: modified in a later block", key);
return false;
}
}
trace!("Cache lookup skipped for {:?}: parent hash is unknown", key);
false
}
}
impl<H: Hasher, S: StateBackend<H>, B:Block> StateBackend<H> for CachingState<H, S, B> {
type Error = S::Error;
type Transaction = S::Transaction;
type TrieBackendStorage = S::TrieBackendStorage;
fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
let local_cache = self.local_cache.upgradable_read();
if let Some(entry) = local_cache.storage.get(key).cloned() {
trace!("Found in local cache: {:?}", key);
return Ok(entry)
}
let mut cache = self.shared_cache.lock();
if Self::is_allowed(key, &self.parent_hash, &cache.modifications) {
if let Some(entry) = cache.storage.get_mut(key).map(|a| a.clone()) {
trace!("Found in shared cache: {:?}", key);
return Ok(entry)
}
}
trace!("Cache miss: {:?}", key);
let value = self.state.storage(key)?;
RwLockUpgradableReadGuard::upgrade(local_cache).storage.insert(key.to_vec(), value.clone());
Ok(value)
}
fn storage_hash(&self, key: &[u8]) -> Result<Option<H::Out>, Self::Error> {
let local_cache = self.local_cache.upgradable_read();
if let Some(entry) = local_cache.hashes.get(key).cloned() {
trace!("Found hash in local cache: {:?}", key);
return Ok(entry)
}
let mut cache = self.shared_cache.lock();
if Self::is_allowed(key, &self.parent_hash, &cache.modifications) {
if let Some(entry) = cache.hashes.get_mut(key).map(|a| a.clone()) {
trace!("Found hash in shared cache: {:?}", key);
return Ok(entry)
}
}
trace!("Cache hash miss: {:?}", key);
let hash = self.state.storage_hash(key)?;
RwLockUpgradableReadGuard::upgrade(local_cache).hashes.insert(key.to_vec(), hash.clone());
Ok(hash)
}
fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.state.child_storage(storage_key, key)
}
fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
Ok(self.storage(key)?.is_some())
}
fn exists_child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<bool, Self::Error> {
self.state.exists_child_storage(storage_key, key)
}
fn for_keys_with_prefix<F: FnMut(&[u8])>(&self, prefix: &[u8], f: F) {
self.state.for_keys_with_prefix(prefix, f)
}
fn for_keys_in_child_storage<F: FnMut(&[u8])>(&self, storage_key: &[u8], f: F) {
self.state.for_keys_in_child_storage(storage_key, f)
}
fn storage_root<I>(&self, delta: I) -> (H::Out, Self::Transaction)
where
I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
H::Out: Ord
{
self.state.storage_root(delta)
}
fn child_storage_root<I>(&self, storage_key: &[u8], delta: I) -> (Vec<u8>, bool, Self::Transaction)
where
I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
H::Out: Ord
{
self.state.child_storage_root(storage_key, delta)
}
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.state.pairs()
}
fn try_into_trie_backend(self) -> Option<TrieBackend<Self::TrieBackendStorage, H>> {
self.state.try_into_trie_backend()
}
}
#[cfg(test)]
mod tests {
use super::*;
use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use state_machine::backend::InMemory;
use primitives::Blake2Hasher;
type Block = RawBlock<ExtrinsicWrapper<u32>>;
#[test]
fn smoke() {
//init_log();
let root_parent = H256::random();
let key = H256::random()[..].to_vec();
let h0 = H256::random();
let h1a = H256::random();
let h1b = H256::random();
let h2a = H256::random();
let h2b = H256::random();
let h3a = H256::random();
let h3b = H256::random();
let shared = new_shared_cache::<Block, Blake2Hasher>(256*1024);
// blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ]
// state [ 5 5 4 3 2 2 ]
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone()));
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![2]))], Some(h0.clone()), Some(0), || true);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h0.clone()));
s.sync_cache(&[], &[], vec![], Some(h1a.clone()), Some(1), || true);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h0.clone()));
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![3]))], Some(h1b.clone()), Some(1), || false);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1b.clone()));
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![4]))], Some(h2b.clone()), Some(2), || false);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1a.clone()));
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![5]))], Some(h2a.clone()), Some(2), || true);
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h2a.clone()));
s.sync_cache(&[], &[], vec![], Some(h3a.clone()), Some(3), || true);
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone()));
assert_eq!(s.storage(&key).unwrap().unwrap(), vec![5]);
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1a.clone()));
assert!(s.storage(&key).unwrap().is_none());
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h2b.clone()));
assert!(s.storage(&key).unwrap().is_none());
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h1b.clone()));
assert!(s.storage(&key).unwrap().is_none());
// reorg to 3b
// blocks [ 3b(c) 3a 2a 2b(c) 1b 1a 0 ]
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h2b.clone()));
s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], vec![], Some(h3b.clone()), Some(3), || true);
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone()));
assert!(s.storage(&key).unwrap().is_none());
}
}
+7 -1
View File
@@ -68,9 +68,11 @@ pub trait BlockImportOperation<Block, H> where
/// has been used to check justification of this block).
fn update_authorities(&mut self, authorities: Vec<AuthorityIdFor<Block>>);
/// Inject storage data into the database.
fn update_storage(&mut self, update: <Self::State as StateBackend<H>>::Transaction) -> error::Result<()>;
fn update_db_storage(&mut self, update: <Self::State as StateBackend<H>>::Transaction) -> error::Result<()>;
/// Inject storage data into the database replacing any existing data.
fn reset_storage(&mut self, top: StorageMap, children: ChildrenStorageMap) -> error::Result<H::Out>;
/// Set top level storage changes.
fn update_storage(&mut self, update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> error::Result<()>;
/// Inject changes trie data into the database.
fn update_changes_trie(&mut self, update: MemoryDB<H>) -> error::Result<()>;
/// Update auxiliary keys. Values are `None` if should be deleted.
@@ -127,6 +129,10 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage>;
/// Returns state backend with post-state of given block.
fn state_at(&self, block: BlockId<Block>) -> error::Result<Self::State>;
/// Destroy state and save any useful data, such as cache.
fn destroy_state(&self, _state: Self::State) -> error::Result<()> {
Ok(())
}
/// Attempts to revert the chain by `n` blocks. Returns the number of blocks that were
/// successfully reverted.
fn revert(&self, n: NumberFor<Block>) -> error::Result<NumberFor<Block>>;
+37 -30
View File
@@ -24,22 +24,11 @@ use state_machine::{self, OverlayedChanges, Ext,
use executor::{RuntimeVersion, RuntimeInfo, NativeVersion};
use hash_db::Hasher;
use trie::MemoryDB;
use codec::Decode;
use primitives::{H256, Blake2Hasher};
use primitives::storage::well_known_keys;
use backend;
use error;
/// Information regarding the result of a call.
#[derive(Debug, Clone)]
pub struct CallResult {
/// The data that was returned from the call.
pub return_data: Vec<u8>,
/// The changes made to the state by the call.
pub changes: OverlayedChanges,
}
/// Method call executor.
pub trait CallExecutor<B, H>
where
@@ -58,7 +47,7 @@ where
id: &BlockId<B>,
method: &str,
call_data: &[u8],
) -> Result<CallResult, error::Error>;
) -> Result<Vec<u8>, error::Error>;
/// Execute a contextual call on top of state in a block of a given hash.
///
@@ -163,16 +152,22 @@ where
id: &BlockId<Block>,
method: &str,
call_data: &[u8],
) -> error::Result<CallResult> {
) -> error::Result<Vec<u8>> {
let mut changes = OverlayedChanges::default();
let (return_data, _, _) = self.call_at_state(
&self.backend.state_at(*id)?,
let state = self.backend.state_at(*id)?;
let return_data = state_machine::execute_using_consensus_failure_handler(
&state,
self.backend.changes_trie_storage(),
&mut changes,
&self.executor,
method,
call_data,
native_when_possible(),
)?;
Ok(CallResult { return_data, changes })
false,
)
.map(|(result, _, _)| result)?;
self.backend.destroy_state(state)?;
Ok(return_data)
}
fn contextual_call<
@@ -192,28 +187,40 @@ where
//TODO: Find a better way to prevent double block initialization
if method != "Core_initialise_block" && initialised_block.map(|id| id != *at).unwrap_or(true) {
let header = prepare_environment_block()?;
self.call_at_state(&state, changes, "Core_initialise_block", &header.encode(), manager.clone())?;
state_machine::execute_using_consensus_failure_handler(
&state,
self.backend.changes_trie_storage(),
changes,
&self.executor,
"Core_initialise_block",
&header.encode(),
manager.clone(),
false,
)?;
*initialised_block = Some(*at);
}
self.call_at_state(&state, changes, method, call_data, manager).map(|cr| cr.0)
let result = state_machine::execute_using_consensus_failure_handler(
&state,
self.backend.changes_trie_storage(),
changes,
&self.executor,
method,
call_data,
manager,
false,
)
.map(|(result, _, _)| result)?;
self.backend.destroy_state(state)?;
Ok(result)
}
fn runtime_version(&self, id: &BlockId<Block>) -> error::Result<RuntimeVersion> {
let mut overlay = OverlayedChanges::default();
let state = self.backend.state_at(*id)?;
use state_machine::Backend;
let code = state.storage(well_known_keys::CODE)
.map_err(|e| error::ErrorKind::Execution(Box::new(e)))?
.ok_or(error::ErrorKind::VersionInvalid)?
.to_vec();
let heap_pages = state.storage(well_known_keys::HEAP_PAGES)
.map_err(|e| error::ErrorKind::Execution(Box::new(e)))?
.and_then(|v| u64::decode(&mut &v[..]))
.unwrap_or(1024) as usize;
let mut ext = Ext::new(&mut overlay, &state, self.backend.changes_trie_storage());
self.executor.runtime_version(&mut ext, heap_pages, &code)
self.executor.runtime_version(&mut ext)
.ok_or(error::ErrorKind::VersionInvalid.into())
}
+8 -6
View File
@@ -230,7 +230,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let (genesis_storage, children_genesis_storage) = build_genesis_storage.build_storage()?;
let mut op = backend.begin_operation(BlockId::Hash(Default::default()))?;
let state_root = op.reset_storage(genesis_storage, children_genesis_storage)?;
let genesis_block = genesis::construct_genesis_block::<Block>(state_root.into());
info!("Initialising Genesis block/state (state: {}, header-hash: {})", genesis_block.header().state_root(), genesis_block.header().hash());
op.set_block_data(
@@ -284,8 +283,8 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
match self.backend.blockchain().cache().and_then(|cache| cache.authorities_at(*id)) {
Some(cached_value) => Ok(cached_value),
None => self.executor.call(id, "Core_authorities", &[])
.and_then(|r| Vec::<AuthorityIdFor<Block>>::decode(&mut &r.return_data[..])
.ok_or(error::ErrorKind::InvalidAuthoritiesSet.into()))
.and_then(|r| Vec::<AuthorityIdFor<Block>>::decode(&mut &r[..])
.ok_or_else(|| error::ErrorKind::InvalidAuthoritiesSet.into()))
}
}
@@ -602,7 +601,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
);
let (_, storage_update, changes_update) = r?;
overlay.commit_prospective();
(Some(storage_update), Some(changes_update), Some(overlay.into_committed()))
(Some(storage_update), Some(changes_update), Some(overlay.into_committed().collect()))
},
None => (None, None, None)
};
@@ -633,7 +632,10 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
transaction.update_authorities(authorities);
}
if let Some(storage_update) = storage_update {
transaction.update_storage(storage_update)?;
transaction.update_db_storage(storage_update)?;
}
if let Some(storage_changes) = storage_changes.clone() {
transaction.update_storage(storage_changes)?;
}
if let Some(Some(changes_update)) = changes_update {
transaction.update_changes_trie(changes_update)?;
@@ -646,7 +648,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes);
.trigger(&hash, storage_changes.into_iter());
}
if finalized {
+5 -1
View File
@@ -448,7 +448,7 @@ where
self.pending_authorities = Some(authorities);
}
fn update_storage(&mut self, update: <InMemory<H> as StateBackend<H>>::Transaction) -> error::Result<()> {
fn update_db_storage(&mut self, update: <InMemory<H> as StateBackend<H>>::Transaction) -> error::Result<()> {
self.new_state = Some(self.old_state.update(update));
Ok(())
}
@@ -491,6 +491,10 @@ where
self.aux = Some(ops.into_iter().collect());
Ok(())
}
fn update_storage(&mut self, _update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> error::Result<()> {
Ok(())
}
}
/// In-memory backend. Keeps all states and blocks in memory. Useful for testing.
+1 -1
View File
@@ -102,7 +102,7 @@ mod notifications;
#[cfg(feature = "std")]
pub use blockchain::Info as ChainInfo;
#[cfg(feature = "std")]
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use call_executor::{CallExecutor, LocalCallExecutor};
#[cfg(feature = "std")]
pub use client::{
new_with_backend,
+6 -1
View File
@@ -188,7 +188,7 @@ where
self.authorities = Some(authorities);
}
fn update_storage(&mut self, _update: <Self::State as StateBackend<H>>::Transaction) -> ClientResult<()> {
fn update_db_storage(&mut self, _update: <Self::State as StateBackend<H>>::Transaction) -> ClientResult<()> {
// we're not storing anything locally => ignore changes
Ok(())
}
@@ -210,6 +210,11 @@ where
self.aux_ops = ops.into_iter().collect();
Ok(())
}
fn update_storage(&mut self, _update: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> ClientResult<()> {
// we're not storing anything locally => ignore changes
Ok(())
}
}
impl<Block, S, F, H> StateBackend<H> for OnDemandState<Block, S, F>
@@ -31,7 +31,7 @@ use state_machine::{self, Backend as StateBackend, CodeExecutor, OverlayedChange
use hash_db::Hasher;
use blockchain::Backend as ChainBackend;
use call_executor::{CallExecutor, CallResult};
use call_executor::CallExecutor;
use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult};
use light::fetcher::{Fetcher, RemoteCallRequest};
use executor::{RuntimeVersion, NativeVersion};
@@ -74,7 +74,7 @@ where
{
type Error = ClientError;
fn call(&self, id: &BlockId<Block>, method: &str, call_data: &[u8]) -> ClientResult<CallResult> {
fn call(&self, id: &BlockId<Block>, method: &str, call_data: &[u8]) -> ClientResult<Vec<u8>> {
let block_hash = self.blockchain.expect_block_hash_from_id(id)?;
let block_header = self.blockchain.expect_header(id.clone())?;
@@ -105,12 +105,12 @@ where
return Err(ClientErrorKind::NotAvailableOnLightClient.into());
}
self.call(at, method, call_data).map(|cr| cr.return_data)
self.call(at, method, call_data)
}
fn runtime_version(&self, id: &BlockId<Block>) -> ClientResult<RuntimeVersion> {
let call_result = self.call(id, "version", &[])?;
RuntimeVersion::decode(&mut call_result.return_data.as_slice())
RuntimeVersion::decode(&mut call_result.as_slice())
.ok_or_else(|| ClientErrorKind::VersionInvalid.into())
}
@@ -189,7 +189,7 @@ pub fn check_execution_proof<Header, E, H>(
executor: &E,
request: &RemoteCallRequest<Header>,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<CallResult>
) -> ClientResult<Vec<u8>>
where
Header: HeaderT,
E: CodeExecutor<H>,
@@ -226,7 +226,7 @@ pub fn check_execution_proof<Header, E, H>(
&request.call_data,
)?;
Ok(CallResult { return_data: local_result, changes })
Ok(local_result)
}
#[cfg(test)]
@@ -273,7 +273,7 @@ mod tests {
retry_count: None,
}, remote_execution_proof).unwrap();
(remote_result, local_result.return_data)
(remote_result, local_result)
}
// prepare remote client
+5 -7
View File
@@ -28,7 +28,6 @@ use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberF
use state_machine::{CodeExecutor, ChangesTrieRootsStorage, ChangesTrieAnchorBlockId,
TrieBackend, read_proof_check, key_changes_proof_check, create_proof_check_backend_storage};
use call_executor::CallResult;
use cht;
use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult};
use light::blockchain::{Blockchain, Storage as BlockchainStorage};
@@ -118,7 +117,7 @@ pub trait Fetcher<Block: BlockT>: Send + Sync {
/// Remote storage read future.
type RemoteReadResult: IntoFuture<Item=Option<Vec<u8>>, Error=ClientError>;
/// Remote call result future.
type RemoteCallResult: IntoFuture<Item=CallResult, Error=ClientError>;
type RemoteCallResult: IntoFuture<Item=Vec<u8>, Error=ClientError>;
/// Remote changes result future.
type RemoteChangesResult: IntoFuture<Item=Vec<(NumberFor<Block>, u32)>, Error=ClientError>;
@@ -156,7 +155,7 @@ pub trait FetchChecker<Block: BlockT>: Send + Sync {
&self,
request: &RemoteCallRequest<Block::Header>,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<CallResult>;
) -> ClientResult<Vec<u8>>;
/// Check remote changes query proof.
fn check_changes_proof(
&self,
@@ -344,7 +343,7 @@ impl<E, Block, H, S, F> FetchChecker<Block> for LightDataChecker<E, H, Block, S,
&self,
request: &RemoteCallRequest<Block::Header>,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<CallResult> {
) -> ClientResult<Vec<u8>> {
check_execution_proof::<_, _, H>(&self.executor, request, remote_proof)
}
@@ -392,7 +391,6 @@ pub mod tests {
use futures::future::{ok, err, FutureResult};
use parking_lot::Mutex;
use keyring::Keyring;
use call_executor::CallResult;
use client::tests::prepare_client_with_key_changes;
use executor::{self, NativeExecutionDispatch};
use error::Error as ClientError;
@@ -410,12 +408,12 @@ pub mod tests {
use state_machine::Backend;
use super::*;
pub type OkCallFetcher = Mutex<CallResult>;
pub type OkCallFetcher = Mutex<Vec<u8>>;
impl Fetcher<Block> for OkCallFetcher {
type RemoteHeaderResult = FutureResult<Header, ClientError>;
type RemoteReadResult = FutureResult<Option<Vec<u8>>, ClientError>;
type RemoteCallResult = FutureResult<CallResult, ClientError>;
type RemoteCallResult = FutureResult<Vec<u8>, ClientError>;
type RemoteChangesResult = FutureResult<Vec<(NumberFor<Block>, u32)>, ClientError>;
fn remote_header(&self, _request: RemoteHeaderRequest<Header>) -> Self::RemoteHeaderResult {