Remove Backend::destroy_state (#5068)

* Remove `Backend::destroy_state`

This removes the `destroy_state` function of `Backend` and instead moves
the functionality into the `Drop` implementation of the state. This
makes it much easier to work with the state, as the user no longer needs
to call` destroy_state` manually. However, it requires that we switch
from `RwLock` to `ReentrantMutex` as while importing a block we maybe
need to lock again in `drop`.

* Bring back the `RwLock` and some other clean ups

* Fix compilation
This commit is contained in:
Gavin Wood
2020-03-05 17:01:52 +01:00
committed by GitHub
parent 26fc97f635
commit a8441ff968
7 changed files with 318 additions and 100 deletions
+61 -31
View File
@@ -80,7 +80,7 @@ use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTra
use sc_client::leaves::{LeafSet, FinalizationDisplaced};
use sc_state_db::StateDb;
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache};
use crate::storage_cache::{CachingState, SharedCache, new_shared_cache};
use crate::storage_cache::{CachingState, SyncingCachingState, SharedCache, new_shared_cache};
use crate::stats::StateUsageStats;
use log::{trace, debug, warn};
pub use sc_state_db::PruningMode;
@@ -523,7 +523,7 @@ impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
/// Database transaction
pub struct BlockImportOperation<Block: BlockT> {
old_state: CachingState<RefTrackingState<Block>, Block>,
old_state: SyncingCachingState<RefTrackingState<Block>, Block>,
db_updates: PrefixedMemoryDB<HashFor<Block>>,
storage_updates: StorageCollection,
child_storage_updates: ChildStorageCollection,
@@ -549,7 +549,7 @@ impl<Block: BlockT> BlockImportOperation<Block> {
}
impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for BlockImportOperation<Block> {
type State = CachingState<RefTrackingState<Block>, Block>;
type State = SyncingCachingState<RefTrackingState<Block>, Block>;
fn state(&self) -> ClientResult<Option<&Self::State>> {
Ok(Some(&self.old_state))
@@ -755,10 +755,10 @@ pub struct Backend<Block: BlockT> {
blockchain: BlockchainDb<Block>,
canonicalization_delay: u64,
shared_cache: SharedCache<Block>,
import_lock: RwLock<()>,
import_lock: Arc<RwLock<()>>,
is_archive: bool,
io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
state_usage: StateUsageStats,
state_usage: Arc<StateUsageStats>,
}
impl<Block: BlockT> Backend<Block> {
@@ -830,7 +830,7 @@ impl<Block: BlockT> Backend<Block> {
import_lock: Default::default(),
is_archive: is_archive_pruning,
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)),
state_usage: StateUsageStats::new(),
state_usage: Arc::new(StateUsageStats::new()),
})
}
@@ -1132,8 +1132,14 @@ impl<Block: BlockT> Backend<Block> {
self.state_usage.tally_writes(ops, bytes);
let number_u64 = number.saturated_into::<u64>();
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?;
let commit = self.storage.state_db.insert_block(
&hash,
number_u64,
&pending_block.header.parent_hash(),
changeset,
).map_err(|e: sc_state_db::Error<io::Error>|
sp_blockchain::Error::from(format!("State database error: {:?}", e))
)?;
apply_state_commit(&mut transaction, commit);
// Check if need to finalize. Genesis is always finalized instantly.
@@ -1161,7 +1167,8 @@ impl<Block: BlockT> Backend<Block> {
changes_trie_cache_ops,
)?);
self.state_usage.merge_sm(operation.old_state.usage_info());
let cache = operation.old_state.release(); // release state reference so that it can be finalized
// release state reference so that it can be finalized
let cache = operation.old_state.into_cache_changes();
if finalized {
// TODO: ensure best chain contains this block.
@@ -1189,9 +1196,20 @@ impl<Block: BlockT> Backend<Block> {
displaced_leaf
};
let mut children = children::read_children(&*self.storage.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)?;
let mut children = children::read_children(
&*self.storage.db,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
)?;
children.push(hash);
children::write_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash, children);
children::write_children(
&mut transaction,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
children,
);
meta_updates.push((hash, number, pending_block.leaf_state.is_best(), finalized));
@@ -1201,7 +1219,7 @@ impl<Block: BlockT> Backend<Block> {
};
let cache_update = if let Some(set_head) = operation.set_head {
if let Some(header) = ::sc_client::blockchain::HeaderBackend::header(&self.blockchain, set_head)? {
if let Some(header) = sc_client::blockchain::HeaderBackend::header(&self.blockchain, set_head)? {
let number = header.number();
let hash = header.hash();
@@ -1271,7 +1289,6 @@ impl<Block: BlockT> Backend<Block> {
Ok(())
}
// write stuff to a transaction after a new block is finalized.
// this canonicalizes finalized blocks. Fails if called with a block which
// was not a child of the last finalized block.
@@ -1359,11 +1376,13 @@ impl<Block> sc_client_api::backend::AuxStore for Backend<Block> where Block: Blo
impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
type BlockImportOperation = BlockImportOperation<Block>;
type Blockchain = BlockchainDb<Block>;
type State = CachingState<RefTrackingState<Block>, Block>;
type State = SyncingCachingState<RefTrackingState<Block>, Block>;
type OffchainStorage = offchain::LocalStorage;
fn begin_operation(&self) -> ClientResult<Self::BlockImportOperation> {
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
let mut old_state = self.state_at(BlockId::Hash(Default::default()))?;
old_state.disable_syncing();
Ok(BlockImportOperation {
pending_block: None,
old_state,
@@ -1386,13 +1405,13 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
block: BlockId<Block>,
) -> ClientResult<()> {
operation.old_state = self.state_at(block)?;
operation.old_state.disable_syncing();
operation.commit_state = true;
Ok(())
}
fn commit_operation(&self, operation: Self::BlockImportOperation)
-> ClientResult<()>
{
fn commit_operation(&self, operation: Self::BlockImportOperation) -> ClientResult<()> {
let usage = operation.old_state.usage_info();
self.state_usage.merge_sm(usage);
@@ -1452,7 +1471,6 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
Some(self.offchain_storage.clone())
}
fn usage_info(&self) -> Option<UsageInfo> {
let (io_stats, state_stats) = self.io_stats.take_or_else(||
(
@@ -1577,7 +1595,17 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
let root = genesis_storage.0.clone();
let db_state = DbState::<Block>::new(Arc::new(genesis_storage), root);
let state = RefTrackingState::new(db_state, self.storage.clone(), None);
return Ok(CachingState::new(state, self.shared_cache.clone(), None));
let caching_state = CachingState::new(
state,
self.shared_cache.clone(),
None,
);
return Ok(SyncingCachingState::new(
caching_state,
self.state_usage.clone(),
self.blockchain.meta.clone(),
self.import_lock.clone(),
));
},
_ => {}
}
@@ -1600,7 +1628,17 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
self.storage.clone(),
Some(hash.clone()),
);
Ok(CachingState::new(state, self.shared_cache.clone(), Some(hash)))
let caching_state = CachingState::new(
state,
self.shared_cache.clone(),
Some(hash),
);
Ok(SyncingCachingState::new(
caching_state,
self.state_usage.clone(),
self.blockchain.meta.clone(),
self.import_lock.clone(),
))
} else {
Err(
sp_blockchain::Error::UnknownBlock(
@@ -1635,17 +1673,8 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
}
}
fn destroy_state(&self, state: Self::State) -> ClientResult<()> {
self.state_usage.merge_sm(state.usage_info());
if let Some(hash) = state.cache.parent_hash.clone() {
let is_best = self.blockchain.meta.read().best_hash == hash;
state.release().sync_cache(&[], &[], vec![], vec![], None, None, is_best);
}
Ok(())
}
fn get_import_lock(&self) -> &RwLock<()> {
&self.import_lock
&*self.import_lock
}
}
@@ -1844,6 +1873,7 @@ pub(crate) mod tests {
op.update_db_storage(overlay).unwrap();
header.state_root = root.into();
op.update_storage(storage, Vec::new()).unwrap();
op.set_block_data(
header,
Some(vec![]),
+13 -3
View File
@@ -59,8 +59,14 @@ impl StateUsageStats {
}
/// Tally one child key read.
pub fn tally_child_key_read(&self, key: &(Vec<u8>, Vec<u8>), val: Option<Vec<u8>>, cache: bool) -> Option<Vec<u8>> {
self.tally_read(key.0.len() as u64 + key.1.len() as u64 + val.as_ref().map(|x| x.len() as u64).unwrap_or(0), cache);
pub fn tally_child_key_read(
&self,
key: &(Vec<u8>, Vec<u8>),
val: Option<Vec<u8>>,
cache: bool,
) -> Option<Vec<u8>> {
let bytes = key.0.len() + key.1.len() + val.as_ref().map(|x| x.len()).unwrap_or(0);
self.tally_read(bytes as u64, cache);
val
}
@@ -80,11 +86,15 @@ impl StateUsageStats {
self.bytes_read_cache.fetch_add(info.cache_reads.bytes, AtomicOrdering::Relaxed);
}
/// Returns the collected `UsageInfo` and resets the internal state.
pub fn take(&self) -> sp_state_machine::UsageInfo {
use sp_state_machine::UsageUnit;
fn unit(ops: &AtomicU64, bytes: &AtomicU64) -> UsageUnit {
UsageUnit { ops: ops.swap(0, AtomicOrdering::Relaxed), bytes: bytes.swap(0, AtomicOrdering::Relaxed) }
UsageUnit {
ops: ops.swap(0, AtomicOrdering::Relaxed),
bytes: bytes.swap(0, AtomicOrdering::Relaxed),
}
}
sp_state_machine::UsageInfo {
+227 -20
View File
@@ -18,6 +18,7 @@
use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use std::hash::Hash as StdHash;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use linked_hash_map::{LinkedHashMap, Entry};
use hash_db::Hasher;
@@ -29,8 +30,7 @@ use sp_state_machine::{
StorageCollection, ChildStorageCollection,
};
use log::trace;
use std::hash::Hash as StdHash;
use crate::stats::StateUsageStats;
use crate::{utils::Meta, stats::StateUsageStats};
const STATE_CACHE_BLOCKS: usize = 12;
@@ -296,16 +296,16 @@ pub struct CacheChanges<B: BlockT> {
/// For canonical instances local cache is accumulated and applied
/// in `sync_cache` along with the change overlay.
/// For non-canonical clones local cache and changes are dropped.
pub struct CachingState<S: StateBackend<HashFor<B>>, B: BlockT> {
pub struct CachingState<S, B: BlockT> {
/// Usage statistics
usage: StateUsageStats,
/// Backing state.
state: S,
/// Cache data.
pub cache: CacheChanges<B>,
cache: CacheChanges<B>,
}
impl<S: StateBackend<HashFor<B>>, B: BlockT> std::fmt::Debug for CachingState<S, B> {
impl<S, B: BlockT> std::fmt::Debug for CachingState<S, B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Block {:?}", self.cache.parent_hash)
}
@@ -417,12 +417,15 @@ impl<B: BlockT> CacheChanges<B> {
}
}
}
}
impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
/// Create a new instance wrapping generic State and shared cache.
pub fn new(state: S, shared_cache: SharedCache<B>, parent_hash: Option<B::Hash>) -> Self {
pub(crate) fn new(
state: S,
shared_cache: SharedCache<B>,
parent_hash: Option<B::Hash>,
) -> Self {
CachingState {
usage: StateUsageStats::new(),
state,
@@ -433,7 +436,7 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
hashes: Default::default(),
child_storage: Default::default(),
}),
parent_hash: parent_hash,
parent_hash,
},
}
}
@@ -445,8 +448,7 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
child_key: Option<&ChildStorageKey>,
parent_hash: &Option<B::Hash>,
modifications: &VecDeque<BlockChanges<B::Header>>
) -> bool
{
) -> bool {
let mut parent = match *parent_hash {
None => {
trace!("Cache lookup skipped for {:?}: no parent hash", key.as_ref().map(HexDisplay::from));
@@ -479,14 +481,12 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> CachingState<S, B> {
}
}
}
trace!("Cache lookup skipped for {:?}: parent hash is unknown", key.as_ref().map(HexDisplay::from));
trace!(
"Cache lookup skipped for {:?}: parent hash is unknown",
key.as_ref().map(HexDisplay::from),
);
false
}
/// Dispose state and return cache data.
pub fn release(self) -> CacheChanges<B> {
self.cache
}
}
impl<S: StateBackend<HashFor<B>>, B: BlockT> StateBackend<HashFor<B>> for CachingState<S, B> {
@@ -668,6 +668,213 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> StateBackend<HashFor<B>> for Cachin
}
}
/// Extended [`CachingState`] that will sync the caches on drop.
pub struct SyncingCachingState<S, Block: BlockT> {
/// The usage statistics of the backend. These will be updated on drop.
state_usage: Arc<StateUsageStats>,
/// Reference to the meta db.
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
/// Mutex to lock get exlusive access to the backend.
lock: Arc<RwLock<()>>,
/// The wrapped caching state.
///
/// This is required to be a `Option`, because sometimes we want to extract
/// the cache changes and Rust does not allow to move fields from types that
/// implement `Drop`.
caching_state: Option<CachingState<S, Block>>,
/// Disable syncing of the cache. This is by default always `false`. However,
/// we need to disable syncing when this is a state in a
/// [`BlockImportOperation`](crate::BlockImportOperation). The import operation
/// takes care to sync the cache and more importantly we want to prevent a dead
/// lock.
disable_syncing: bool,
}
impl<S, B: BlockT> SyncingCachingState<S, B> {
/// Create new automatic syncing state.
pub fn new(
caching_state: CachingState<S, B>,
state_usage: Arc<StateUsageStats>,
meta: Arc<RwLock<Meta<NumberFor<B>, B::Hash>>>,
lock: Arc<RwLock<()>>,
) -> Self {
Self {
caching_state: Some(caching_state),
state_usage,
meta,
lock,
disable_syncing: false,
}
}
/// Returns the reference to the internal [`CachingState`].
fn caching_state(&self) -> &CachingState<S, B> {
self.caching_state
.as_ref()
.expect("`caching_state` is always valid for the lifetime of the object; qed")
}
/// Convert `Self` into the cache changes.
pub fn into_cache_changes(mut self) -> CacheChanges<B> {
self.caching_state
.take()
.expect("`caching_state` is always valid for the lifetime of the object; qed")
.cache
}
/// Disable syncing the cache on drop.
pub fn disable_syncing(&mut self) {
self.disable_syncing = true;
}
}
impl<S, B: BlockT> std::fmt::Debug for SyncingCachingState<S, B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.caching_state().fmt(f)
}
}
impl<S: StateBackend<HashFor<B>>, B: BlockT> StateBackend<HashFor<B>> for SyncingCachingState<S, B> {
type Error = S::Error;
type Transaction = S::Transaction;
type TrieBackendStorage = S::TrieBackendStorage;
fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.caching_state().storage(key)
}
fn storage_hash(&self, key: &[u8]) -> Result<Option<B::Hash>, Self::Error> {
self.caching_state().storage_hash(key)
}
fn child_storage(
&self,
storage_key: &[u8],
child_info: ChildInfo,
key: &[u8],
) -> Result<Option<Vec<u8>>, Self::Error> {
self.caching_state().child_storage(storage_key, child_info, key)
}
fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
self.caching_state().exists_storage(key)
}
fn exists_child_storage(
&self,
storage_key: &[u8],
child_info: ChildInfo,
key: &[u8],
) -> Result<bool, Self::Error> {
self.caching_state().exists_child_storage(storage_key, child_info, key)
}
fn for_keys_in_child_storage<F: FnMut(&[u8])>(
&self,
storage_key: &[u8],
child_info: ChildInfo,
f: F,
) {
self.caching_state().for_keys_in_child_storage(storage_key, child_info, f)
}
fn next_storage_key(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.caching_state().next_storage_key(key)
}
fn next_child_storage_key(
&self,
storage_key: &[u8],
child_info: ChildInfo,
key: &[u8],
) -> Result<Option<Vec<u8>>, Self::Error> {
self.caching_state().next_child_storage_key(storage_key, child_info, key)
}
fn for_keys_with_prefix<F: FnMut(&[u8])>(&self, prefix: &[u8], f: F) {
self.caching_state().for_keys_with_prefix(prefix, f)
}
fn for_key_values_with_prefix<F: FnMut(&[u8], &[u8])>(&self, prefix: &[u8], f: F) {
self.caching_state().for_key_values_with_prefix(prefix, f)
}
fn for_child_keys_with_prefix<F: FnMut(&[u8])>(
&self,
storage_key: &[u8],
child_info: ChildInfo,
prefix: &[u8],
f: F,
) {
self.caching_state().for_child_keys_with_prefix(storage_key, child_info, prefix, f)
}
fn storage_root<I>(&self, delta: I) -> (B::Hash, Self::Transaction)
where
I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
{
self.caching_state().storage_root(delta)
}
fn child_storage_root<I>(
&self,
storage_key: &[u8],
child_info: ChildInfo,
delta: I,
) -> (B::Hash, bool, Self::Transaction)
where
I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
{
self.caching_state().child_storage_root(storage_key, child_info, delta)
}
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.caching_state().pairs()
}
fn keys(&self, prefix: &[u8]) -> Vec<Vec<u8>> {
self.caching_state().keys(prefix)
}
fn child_keys(
&self,
storage_key: &[u8],
child_info: ChildInfo,
prefix: &[u8],
) -> Vec<Vec<u8>> {
self.caching_state().child_keys(storage_key, child_info, prefix)
}
fn as_trie_backend(&mut self) -> Option<&TrieBackend<Self::TrieBackendStorage, HashFor<B>>> {
self.caching_state
.as_mut()
.expect("`caching_state` is valid for the lifetime of the object; qed")
.as_trie_backend()
}
fn usage_info(&self) -> sp_state_machine::UsageInfo {
self.caching_state().usage_info()
}
}
impl<S, B: BlockT> Drop for SyncingCachingState<S, B> {
fn drop(&mut self) {
if self.disable_syncing {
return;
}
if let Some(mut caching_state) = self.caching_state.take() {
let _lock = self.lock.read();
self.state_usage.merge_sm(caching_state.usage.take());
if let Some(hash) = caching_state.cache.parent_hash.clone() {
let is_best = self.meta.read().best_hash == hash;
caching_state.cache.sync_cache(&[], &[], vec![], vec![], None, None, is_best);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1258,7 +1465,7 @@ mod qc {
CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
self.shared.clone(),
Some(hash)
Some(hash),
)
}
@@ -1327,7 +1534,7 @@ mod qc {
let mut state = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
self.shared.clone(),
Some(parent)
Some(parent),
);
state.cache.sync_cache(
@@ -1366,7 +1573,7 @@ mod qc {
let mut state = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
self.shared.clone(),
Some(parent_hash)
Some(parent_hash),
);
state.cache.sync_cache(
@@ -1413,7 +1620,7 @@ mod qc {
let mut state = CachingState::new(
InMemoryBackend::<BlakeTwo256>::default(),
self.shared.clone(),
Some(fork_at)
Some(fork_at),
);
let height = pos as u64 + enacted.len() as u64 + 2;