diff --git a/substrate/client/api/src/backend.rs b/substrate/client/api/src/backend.rs index 808ca7a870..d10e62cc54 100644 --- a/substrate/client/api/src/backend.rs +++ b/substrate/client/api/src/backend.rs @@ -420,11 +420,6 @@ pub trait Backend: AuxStore + Send + Sync { /// Returns state backend with post-state of given block. fn state_at(&self, block: BlockId) -> sp_blockchain::Result; - /// Destroy state and save any useful data, such as cache. - fn destroy_state(&self, _state: Self::State) -> sp_blockchain::Result<()> { - Ok(()) - } - /// Attempts to revert the chain by `n` blocks. If `revert_finalized` is set /// it will attempt to revert past any finalized block, this is unsafe and /// can potentially leave the node in an inconsistent state. diff --git a/substrate/client/block-builder/src/lib.rs b/substrate/client/block-builder/src/lib.rs index 9695fddf86..2666fd9cd7 100644 --- a/substrate/client/block-builder/src/lib.rs +++ b/substrate/client/block-builder/src/lib.rs @@ -224,17 +224,11 @@ where &state, changes_trie_state.as_ref(), parent_hash, - ); - - // We need to destroy the state, before we check if `storage_changes` is `Ok(_)` - { - let _lock = self.backend.get_import_lock().read(); - self.backend.destroy_state(state)?; - } + )?; Ok(BuiltBlock { block: ::new(header, self.extrinsics), - storage_changes: storage_changes?, + storage_changes, proof, }) } diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index eed1fc0e1d..746c73bea2 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -80,7 +80,7 @@ use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTra use sc_client::leaves::{LeafSet, FinalizationDisplaced}; use sc_state_db::StateDb; use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache}; -use crate::storage_cache::{CachingState, SharedCache, new_shared_cache}; +use crate::storage_cache::{CachingState, SyncingCachingState, SharedCache, new_shared_cache}; use crate::stats::StateUsageStats; use log::{trace, debug, warn}; pub use sc_state_db::PruningMode; @@ -523,7 +523,7 @@ impl HeaderMetadata for BlockchainDb { /// Database transaction pub struct BlockImportOperation { - old_state: CachingState, Block>, + old_state: SyncingCachingState, Block>, db_updates: PrefixedMemoryDB>, storage_updates: StorageCollection, child_storage_updates: ChildStorageCollection, @@ -549,7 +549,7 @@ impl BlockImportOperation { } impl sc_client_api::backend::BlockImportOperation for BlockImportOperation { - type State = CachingState, Block>; + type State = SyncingCachingState, Block>; fn state(&self) -> ClientResult> { Ok(Some(&self.old_state)) @@ -755,10 +755,10 @@ pub struct Backend { blockchain: BlockchainDb, canonicalization_delay: u64, shared_cache: SharedCache, - import_lock: RwLock<()>, + import_lock: Arc>, is_archive: bool, io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>, - state_usage: StateUsageStats, + state_usage: Arc, } impl Backend { @@ -830,7 +830,7 @@ impl Backend { import_lock: Default::default(), is_archive: is_archive_pruning, io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)), - state_usage: StateUsageStats::new(), + state_usage: Arc::new(StateUsageStats::new()), }) } @@ -1132,8 +1132,14 @@ impl Backend { self.state_usage.tally_writes(ops, bytes); let number_u64 = number.saturated_into::(); - let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) - .map_err(|e: sc_state_db::Error| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?; + let commit = self.storage.state_db.insert_block( + &hash, + number_u64, + &pending_block.header.parent_hash(), + changeset, + ).map_err(|e: sc_state_db::Error| + sp_blockchain::Error::from(format!("State database error: {:?}", e)) + )?; apply_state_commit(&mut transaction, commit); // Check if need to finalize. Genesis is always finalized instantly. @@ -1161,7 +1167,8 @@ impl Backend { changes_trie_cache_ops, )?); self.state_usage.merge_sm(operation.old_state.usage_info()); - let cache = operation.old_state.release(); // release state reference so that it can be finalized + // release state reference so that it can be finalized + let cache = operation.old_state.into_cache_changes(); if finalized { // TODO: ensure best chain contains this block. @@ -1189,9 +1196,20 @@ impl Backend { displaced_leaf }; - let mut children = children::read_children(&*self.storage.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)?; + let mut children = children::read_children( + &*self.storage.db, + columns::META, + meta_keys::CHILDREN_PREFIX, + parent_hash, + )?; children.push(hash); - children::write_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash, children); + children::write_children( + &mut transaction, + columns::META, + meta_keys::CHILDREN_PREFIX, + parent_hash, + children, + ); meta_updates.push((hash, number, pending_block.leaf_state.is_best(), finalized)); @@ -1201,7 +1219,7 @@ impl Backend { }; let cache_update = if let Some(set_head) = operation.set_head { - if let Some(header) = ::sc_client::blockchain::HeaderBackend::header(&self.blockchain, set_head)? { + if let Some(header) = sc_client::blockchain::HeaderBackend::header(&self.blockchain, set_head)? { let number = header.number(); let hash = header.hash(); @@ -1271,7 +1289,6 @@ impl Backend { Ok(()) } - // write stuff to a transaction after a new block is finalized. // this canonicalizes finalized blocks. Fails if called with a block which // was not a child of the last finalized block. @@ -1359,11 +1376,13 @@ impl sc_client_api::backend::AuxStore for Backend where Block: Blo impl sc_client_api::backend::Backend for Backend { type BlockImportOperation = BlockImportOperation; type Blockchain = BlockchainDb; - type State = CachingState, Block>; + type State = SyncingCachingState, Block>; type OffchainStorage = offchain::LocalStorage; fn begin_operation(&self) -> ClientResult { - let old_state = self.state_at(BlockId::Hash(Default::default()))?; + let mut old_state = self.state_at(BlockId::Hash(Default::default()))?; + old_state.disable_syncing(); + Ok(BlockImportOperation { pending_block: None, old_state, @@ -1386,13 +1405,13 @@ impl sc_client_api::backend::Backend for Backend { block: BlockId, ) -> ClientResult<()> { operation.old_state = self.state_at(block)?; + operation.old_state.disable_syncing(); + operation.commit_state = true; Ok(()) } - fn commit_operation(&self, operation: Self::BlockImportOperation) - -> ClientResult<()> - { + fn commit_operation(&self, operation: Self::BlockImportOperation) -> ClientResult<()> { let usage = operation.old_state.usage_info(); self.state_usage.merge_sm(usage); @@ -1452,7 +1471,6 @@ impl sc_client_api::backend::Backend for Backend { Some(self.offchain_storage.clone()) } - fn usage_info(&self) -> Option { let (io_stats, state_stats) = self.io_stats.take_or_else(|| ( @@ -1577,7 +1595,17 @@ impl sc_client_api::backend::Backend for Backend { let root = genesis_storage.0.clone(); let db_state = DbState::::new(Arc::new(genesis_storage), root); let state = RefTrackingState::new(db_state, self.storage.clone(), None); - return Ok(CachingState::new(state, self.shared_cache.clone(), None)); + let caching_state = CachingState::new( + state, + self.shared_cache.clone(), + None, + ); + return Ok(SyncingCachingState::new( + caching_state, + self.state_usage.clone(), + self.blockchain.meta.clone(), + self.import_lock.clone(), + )); }, _ => {} } @@ -1600,7 +1628,17 @@ impl sc_client_api::backend::Backend for Backend { self.storage.clone(), Some(hash.clone()), ); - Ok(CachingState::new(state, self.shared_cache.clone(), Some(hash))) + let caching_state = CachingState::new( + state, + self.shared_cache.clone(), + Some(hash), + ); + Ok(SyncingCachingState::new( + caching_state, + self.state_usage.clone(), + self.blockchain.meta.clone(), + self.import_lock.clone(), + )) } else { Err( sp_blockchain::Error::UnknownBlock( @@ -1635,17 +1673,8 @@ impl sc_client_api::backend::Backend for Backend { } } - fn destroy_state(&self, state: Self::State) -> ClientResult<()> { - self.state_usage.merge_sm(state.usage_info()); - if let Some(hash) = state.cache.parent_hash.clone() { - let is_best = self.blockchain.meta.read().best_hash == hash; - state.release().sync_cache(&[], &[], vec![], vec![], None, None, is_best); - } - Ok(()) - } - fn get_import_lock(&self) -> &RwLock<()> { - &self.import_lock + &*self.import_lock } } @@ -1844,6 +1873,7 @@ pub(crate) mod tests { op.update_db_storage(overlay).unwrap(); header.state_root = root.into(); + op.update_storage(storage, Vec::new()).unwrap(); op.set_block_data( header, Some(vec![]), diff --git a/substrate/client/db/src/stats.rs b/substrate/client/db/src/stats.rs index 805a0f498f..1d6ed8e7f0 100644 --- a/substrate/client/db/src/stats.rs +++ b/substrate/client/db/src/stats.rs @@ -59,8 +59,14 @@ impl StateUsageStats { } /// Tally one child key read. - pub fn tally_child_key_read(&self, key: &(Vec, Vec), val: Option>, cache: bool) -> Option> { - self.tally_read(key.0.len() as u64 + key.1.len() as u64 + val.as_ref().map(|x| x.len() as u64).unwrap_or(0), cache); + pub fn tally_child_key_read( + &self, + key: &(Vec, Vec), + val: Option>, + cache: bool, + ) -> Option> { + let bytes = key.0.len() + key.1.len() + val.as_ref().map(|x| x.len()).unwrap_or(0); + self.tally_read(bytes as u64, cache); val } @@ -80,11 +86,15 @@ impl StateUsageStats { self.bytes_read_cache.fetch_add(info.cache_reads.bytes, AtomicOrdering::Relaxed); } + /// Returns the collected `UsageInfo` and resets the internal state. pub fn take(&self) -> sp_state_machine::UsageInfo { use sp_state_machine::UsageUnit; fn unit(ops: &AtomicU64, bytes: &AtomicU64) -> UsageUnit { - UsageUnit { ops: ops.swap(0, AtomicOrdering::Relaxed), bytes: bytes.swap(0, AtomicOrdering::Relaxed) } + UsageUnit { + ops: ops.swap(0, AtomicOrdering::Relaxed), + bytes: bytes.swap(0, AtomicOrdering::Relaxed), + } } sp_state_machine::UsageInfo { diff --git a/substrate/client/db/src/storage_cache.rs b/substrate/client/db/src/storage_cache.rs index 6f289369e2..2ac1ee3dbd 100644 --- a/substrate/client/db/src/storage_cache.rs +++ b/substrate/client/db/src/storage_cache.rs @@ -18,6 +18,7 @@ use std::collections::{VecDeque, HashSet, HashMap}; use std::sync::Arc; +use std::hash::Hash as StdHash; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use linked_hash_map::{LinkedHashMap, Entry}; use hash_db::Hasher; @@ -29,8 +30,7 @@ use sp_state_machine::{ StorageCollection, ChildStorageCollection, }; use log::trace; -use std::hash::Hash as StdHash; -use crate::stats::StateUsageStats; +use crate::{utils::Meta, stats::StateUsageStats}; const STATE_CACHE_BLOCKS: usize = 12; @@ -296,16 +296,16 @@ pub struct CacheChanges { /// For canonical instances local cache is accumulated and applied /// in `sync_cache` along with the change overlay. /// For non-canonical clones local cache and changes are dropped. -pub struct CachingState>, B: BlockT> { +pub struct CachingState { /// Usage statistics usage: StateUsageStats, /// Backing state. state: S, /// Cache data. - pub cache: CacheChanges, + cache: CacheChanges, } -impl>, B: BlockT> std::fmt::Debug for CachingState { +impl std::fmt::Debug for CachingState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Block {:?}", self.cache.parent_hash) } @@ -417,12 +417,15 @@ impl CacheChanges { } } } - } impl>, B: BlockT> CachingState { /// Create a new instance wrapping generic State and shared cache. - pub fn new(state: S, shared_cache: SharedCache, parent_hash: Option) -> Self { + pub(crate) fn new( + state: S, + shared_cache: SharedCache, + parent_hash: Option, + ) -> Self { CachingState { usage: StateUsageStats::new(), state, @@ -433,7 +436,7 @@ impl>, B: BlockT> CachingState { hashes: Default::default(), child_storage: Default::default(), }), - parent_hash: parent_hash, + parent_hash, }, } } @@ -445,8 +448,7 @@ impl>, B: BlockT> CachingState { child_key: Option<&ChildStorageKey>, parent_hash: &Option, modifications: &VecDeque> - ) -> bool - { + ) -> bool { let mut parent = match *parent_hash { None => { trace!("Cache lookup skipped for {:?}: no parent hash", key.as_ref().map(HexDisplay::from)); @@ -479,14 +481,12 @@ impl>, B: BlockT> CachingState { } } } - trace!("Cache lookup skipped for {:?}: parent hash is unknown", key.as_ref().map(HexDisplay::from)); + trace!( + "Cache lookup skipped for {:?}: parent hash is unknown", + key.as_ref().map(HexDisplay::from), + ); false } - - /// Dispose state and return cache data. - pub fn release(self) -> CacheChanges { - self.cache - } } impl>, B: BlockT> StateBackend> for CachingState { @@ -668,6 +668,213 @@ impl>, B: BlockT> StateBackend> for Cachin } } +/// Extended [`CachingState`] that will sync the caches on drop. +pub struct SyncingCachingState { + /// The usage statistics of the backend. These will be updated on drop. + state_usage: Arc, + /// Reference to the meta db. + meta: Arc, Block::Hash>>>, + /// Mutex to lock get exlusive access to the backend. + lock: Arc>, + /// The wrapped caching state. + /// + /// This is required to be a `Option`, because sometimes we want to extract + /// the cache changes and Rust does not allow to move fields from types that + /// implement `Drop`. + caching_state: Option>, + /// Disable syncing of the cache. This is by default always `false`. However, + /// we need to disable syncing when this is a state in a + /// [`BlockImportOperation`](crate::BlockImportOperation). The import operation + /// takes care to sync the cache and more importantly we want to prevent a dead + /// lock. + disable_syncing: bool, +} + +impl SyncingCachingState { + /// Create new automatic syncing state. + pub fn new( + caching_state: CachingState, + state_usage: Arc, + meta: Arc, B::Hash>>>, + lock: Arc>, + ) -> Self { + Self { + caching_state: Some(caching_state), + state_usage, + meta, + lock, + disable_syncing: false, + } + } + + /// Returns the reference to the internal [`CachingState`]. + fn caching_state(&self) -> &CachingState { + self.caching_state + .as_ref() + .expect("`caching_state` is always valid for the lifetime of the object; qed") + } + + /// Convert `Self` into the cache changes. + pub fn into_cache_changes(mut self) -> CacheChanges { + self.caching_state + .take() + .expect("`caching_state` is always valid for the lifetime of the object; qed") + .cache + } + + /// Disable syncing the cache on drop. + pub fn disable_syncing(&mut self) { + self.disable_syncing = true; + } +} + +impl std::fmt::Debug for SyncingCachingState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.caching_state().fmt(f) + } +} + +impl>, B: BlockT> StateBackend> for SyncingCachingState { + type Error = S::Error; + type Transaction = S::Transaction; + type TrieBackendStorage = S::TrieBackendStorage; + + fn storage(&self, key: &[u8]) -> Result>, Self::Error> { + self.caching_state().storage(key) + } + + fn storage_hash(&self, key: &[u8]) -> Result, Self::Error> { + self.caching_state().storage_hash(key) + } + + fn child_storage( + &self, + storage_key: &[u8], + child_info: ChildInfo, + key: &[u8], + ) -> Result>, Self::Error> { + self.caching_state().child_storage(storage_key, child_info, key) + } + + fn exists_storage(&self, key: &[u8]) -> Result { + self.caching_state().exists_storage(key) + } + + fn exists_child_storage( + &self, + storage_key: &[u8], + child_info: ChildInfo, + key: &[u8], + ) -> Result { + self.caching_state().exists_child_storage(storage_key, child_info, key) + } + + fn for_keys_in_child_storage( + &self, + storage_key: &[u8], + child_info: ChildInfo, + f: F, + ) { + self.caching_state().for_keys_in_child_storage(storage_key, child_info, f) + } + + fn next_storage_key(&self, key: &[u8]) -> Result>, Self::Error> { + self.caching_state().next_storage_key(key) + } + + fn next_child_storage_key( + &self, + storage_key: &[u8], + child_info: ChildInfo, + key: &[u8], + ) -> Result>, Self::Error> { + self.caching_state().next_child_storage_key(storage_key, child_info, key) + } + + fn for_keys_with_prefix(&self, prefix: &[u8], f: F) { + self.caching_state().for_keys_with_prefix(prefix, f) + } + + fn for_key_values_with_prefix(&self, prefix: &[u8], f: F) { + self.caching_state().for_key_values_with_prefix(prefix, f) + } + + fn for_child_keys_with_prefix( + &self, + storage_key: &[u8], + child_info: ChildInfo, + prefix: &[u8], + f: F, + ) { + self.caching_state().for_child_keys_with_prefix(storage_key, child_info, prefix, f) + } + + fn storage_root(&self, delta: I) -> (B::Hash, Self::Transaction) + where + I: IntoIterator, Option>)>, + { + self.caching_state().storage_root(delta) + } + + fn child_storage_root( + &self, + storage_key: &[u8], + child_info: ChildInfo, + delta: I, + ) -> (B::Hash, bool, Self::Transaction) + where + I: IntoIterator, Option>)>, + { + self.caching_state().child_storage_root(storage_key, child_info, delta) + } + + fn pairs(&self) -> Vec<(Vec, Vec)> { + self.caching_state().pairs() + } + + fn keys(&self, prefix: &[u8]) -> Vec> { + self.caching_state().keys(prefix) + } + + fn child_keys( + &self, + storage_key: &[u8], + child_info: ChildInfo, + prefix: &[u8], + ) -> Vec> { + self.caching_state().child_keys(storage_key, child_info, prefix) + } + + fn as_trie_backend(&mut self) -> Option<&TrieBackend>> { + self.caching_state + .as_mut() + .expect("`caching_state` is valid for the lifetime of the object; qed") + .as_trie_backend() + } + + fn usage_info(&self) -> sp_state_machine::UsageInfo { + self.caching_state().usage_info() + } +} + +impl Drop for SyncingCachingState { + fn drop(&mut self) { + if self.disable_syncing { + return; + } + + if let Some(mut caching_state) = self.caching_state.take() { + let _lock = self.lock.read(); + + self.state_usage.merge_sm(caching_state.usage.take()); + if let Some(hash) = caching_state.cache.parent_hash.clone() { + let is_best = self.meta.read().best_hash == hash; + caching_state.cache.sync_cache(&[], &[], vec![], vec![], None, None, is_best); + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -1258,7 +1465,7 @@ mod qc { CachingState::new( InMemoryBackend::::default(), self.shared.clone(), - Some(hash) + Some(hash), ) } @@ -1327,7 +1534,7 @@ mod qc { let mut state = CachingState::new( InMemoryBackend::::default(), self.shared.clone(), - Some(parent) + Some(parent), ); state.cache.sync_cache( @@ -1366,7 +1573,7 @@ mod qc { let mut state = CachingState::new( InMemoryBackend::::default(), self.shared.clone(), - Some(parent_hash) + Some(parent_hash), ); state.cache.sync_cache( @@ -1413,7 +1620,7 @@ mod qc { let mut state = CachingState::new( InMemoryBackend::::default(), self.shared.clone(), - Some(fork_at) + Some(fork_at), ); let height = pos as u64 + enacted.len() as u64 + 2; diff --git a/substrate/client/src/call_executor.rs b/substrate/client/src/call_executor.rs index 1fdbfe981a..b5206d3c46 100644 --- a/substrate/client/src/call_executor.rs +++ b/substrate/client/src/call_executor.rs @@ -80,7 +80,6 @@ where let changes_trie = backend::changes_tries_state_at_block( id, self.backend.changes_trie_storage() )?; - // make sure to destroy state before exiting this function let state = self.backend.state_at(*id)?; let return_data = StateMachine::new( &state, @@ -93,12 +92,9 @@ where ).execute_using_consensus_failure_handler::<_, NeverNativeValue, fn() -> _>( strategy.get_manager(), None, - ); - { - let _lock = self.backend.get_import_lock().read(); - self.backend.destroy_state(state)?; - } - Ok(return_data?.into_encoded()) + )?; + + Ok(return_data.into_encoded()) } fn contextual_call< @@ -138,9 +134,8 @@ where let changes_trie_state = backend::changes_tries_state_at_block(at, self.backend.changes_trie_storage())?; let mut storage_transaction_cache = storage_transaction_cache.map(|c| c.borrow_mut()); - // make sure to destroy state before exiting this function let mut state = self.backend.state_at(*at)?; - let result = match recorder { + match recorder { Some(recorder) => state.as_trie_backend() .ok_or_else(|| Box::new(sp_state_machine::ExecutionError::UnableToGenerateProof) @@ -176,18 +171,15 @@ where ) .with_storage_transaction_cache(storage_transaction_cache.as_mut().map(|c| &mut **c)) .execute_using_consensus_failure_handler(execution_manager, native_call) - }; - { - let _lock = self.backend.get_import_lock().read(); - self.backend.destroy_state(state)?; - } - result.map_err(Into::into) + }.map_err(Into::into) } fn runtime_version(&self, id: &BlockId) -> sp_blockchain::Result { let mut overlay = OverlayedChanges::default(); - let changes_trie_state = backend::changes_tries_state_at_block(id, self.backend.changes_trie_storage())?; - // make sure to destroy state before exiting this function + let changes_trie_state = backend::changes_tries_state_at_block( + id, + self.backend.changes_trie_storage(), + )?; let state = self.backend.state_at(*id)?; let mut cache = StorageTransactionCache::::default(); let mut ext = Ext::new( @@ -197,12 +189,8 @@ where changes_trie_state, None, ); - let version = self.executor.runtime_version(&mut ext); - { - let _lock = self.backend.get_import_lock().read(); - self.backend.destroy_state(state)?; - } - version.map_err(|e| sp_blockchain::Error::VersionInvalid(format!("{:?}", e)).into()) + self.executor.runtime_version(&mut ext) + .map_err(|e| sp_blockchain::Error::VersionInvalid(format!("{:?}", e)).into()) } fn prove_at_trie_state>>( diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index c921d29701..699e3320ff 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -830,15 +830,7 @@ impl Client where &state, changes_trie_state.as_ref(), *parent_hash, - ); - - { - let _lock = self.backend.get_import_lock().read(); - self.backend.destroy_state(state)?; - } - - // Make sure to consume the error, only after we have destroyed the state. - let gen_storage_changes = gen_storage_changes?; + )?; if import_block.header.state_root() != &gen_storage_changes.transaction_storage_root @@ -1792,7 +1784,9 @@ where fn best_block_header(&self) -> sp_blockchain::Result<::Header> { let info = self.backend.blockchain().info(); let import_lock = self.backend.get_import_lock(); - let best_hash = self.backend.blockchain().best_containing(info.best_hash, None, import_lock)? + let best_hash = self.backend + .blockchain() + .best_containing(info.best_hash, None, import_lock)? .unwrap_or(info.best_hash); Ok(self.backend.blockchain().header(BlockId::Hash(best_hash))?