mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 21:41:12 +00:00
State-db I/o metrics (#4562)
* add usage mod to sp-state-machine * State usage tracking finalized. * fix spaces * add license preamble * imporove output * review suggestions * update naming * merge fixes * Update client/db/src/light.rs Co-Authored-By: Gavin Wood <gavin@parity.io> Co-authored-by: Gavin Wood <github@gavwood.com>
This commit is contained in:
@@ -33,6 +33,7 @@ mod children;
|
||||
mod cache;
|
||||
mod storage_cache;
|
||||
mod utils;
|
||||
mod stats;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::path::PathBuf;
|
||||
@@ -63,13 +64,14 @@ use sp_runtime::traits::{
|
||||
use sc_executor::RuntimeInfo;
|
||||
use sp_state_machine::{
|
||||
DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, ChangesTrieBuildCache,
|
||||
backend::Backend as StateBackend,
|
||||
backend::Backend as StateBackend, UsageInfo as StateUsageInfo,
|
||||
};
|
||||
use crate::utils::{Meta, db_err, meta_keys, read_db, read_meta};
|
||||
use sc_client::leaves::{LeafSet, FinalizationDisplaced};
|
||||
use sc_state_db::StateDb;
|
||||
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache};
|
||||
use crate::storage_cache::{CachingState, SharedCache, new_shared_cache};
|
||||
use crate::stats::StateUsageStats;
|
||||
use log::{trace, debug, warn};
|
||||
pub use sc_state_db::PruningMode;
|
||||
|
||||
@@ -895,7 +897,8 @@ pub struct Backend<Block: BlockT> {
|
||||
shared_cache: SharedCache<Block>,
|
||||
import_lock: RwLock<()>,
|
||||
is_archive: bool,
|
||||
io_stats: FrozenForDuration<kvdb::IoStats>,
|
||||
io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
|
||||
state_usage: StateUsageStats,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> Backend<Block> {
|
||||
@@ -963,7 +966,8 @@ impl<Block: BlockT> Backend<Block> {
|
||||
),
|
||||
import_lock: Default::default(),
|
||||
is_archive: is_archive_pruning,
|
||||
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1), kvdb::IoStats::empty()),
|
||||
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1), (kvdb::IoStats::empty(), StateUsageInfo::empty())),
|
||||
state_usage: StateUsageStats::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1257,13 +1261,23 @@ impl<Block: BlockT> Backend<Block> {
|
||||
|
||||
let finalized = if operation.commit_state {
|
||||
let mut changeset: sc_state_db::ChangeSet<Vec<u8>> = sc_state_db::ChangeSet::default();
|
||||
let mut ops: u64 = 0;
|
||||
let mut bytes: u64 = 0;
|
||||
for (key, (val, rc)) in operation.db_updates.drain() {
|
||||
if rc > 0 {
|
||||
ops += 1;
|
||||
bytes += key.len() as u64 + val.len() as u64;
|
||||
|
||||
changeset.inserted.push((key, val.to_vec()));
|
||||
} else if rc < 0 {
|
||||
ops += 1;
|
||||
bytes += key.len() as u64;
|
||||
|
||||
changeset.deleted.push(key);
|
||||
}
|
||||
}
|
||||
self.state_usage.tally_writes(ops, bytes);
|
||||
|
||||
let number_u64 = number.saturated_into::<u64>();
|
||||
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
|
||||
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?;
|
||||
@@ -1495,6 +1509,9 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
|
||||
fn commit_operation(&self, operation: Self::BlockImportOperation)
|
||||
-> ClientResult<()>
|
||||
{
|
||||
let usage = operation.old_state.usage_info();
|
||||
self.state_usage.merge_sm(usage);
|
||||
|
||||
match self.try_commit_operation(operation) {
|
||||
Ok(_) => {
|
||||
self.storage.state_db.apply_pending();
|
||||
@@ -1548,8 +1565,14 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
|
||||
Some(self.offchain_storage.clone())
|
||||
}
|
||||
|
||||
|
||||
fn usage_info(&self) -> Option<UsageInfo> {
|
||||
let io_stats = self.io_stats.take_or_else(|| self.storage.db.io_stats(kvdb::IoStatsKind::SincePrevious));
|
||||
let (io_stats, state_stats) = self.io_stats.take_or_else(||
|
||||
(
|
||||
self.storage.db.io_stats(kvdb::IoStatsKind::SincePrevious),
|
||||
self.state_usage.take(),
|
||||
)
|
||||
);
|
||||
let database_cache = parity_util_mem::malloc_size(&*self.storage.db);
|
||||
let state_cache = (*&self.shared_cache).lock().used_storage_cache_size();
|
||||
|
||||
@@ -1565,6 +1588,8 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
|
||||
writes: io_stats.writes,
|
||||
reads: io_stats.reads,
|
||||
average_transaction_size: io_stats.avg_transaction_size() as u64,
|
||||
state_reads: state_stats.reads.ops,
|
||||
state_reads_cache: state_stats.cache_reads.ops,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -580,6 +580,9 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
|
||||
writes: io_stats.writes,
|
||||
reads: io_stats.reads,
|
||||
average_transaction_size: io_stats.avg_transaction_size() as u64,
|
||||
// Light client does not track those
|
||||
state_reads: 0,
|
||||
state_reads_cache: 0,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Database usage statistics
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
|
||||
|
||||
/// Accumulated usage statistics for state queries.
|
||||
pub struct StateUsageStats {
|
||||
started: std::time::Instant,
|
||||
reads: AtomicU64,
|
||||
bytes_read: AtomicU64,
|
||||
writes: AtomicU64,
|
||||
bytes_written: AtomicU64,
|
||||
reads_cache: AtomicU64,
|
||||
bytes_read_cache: AtomicU64,
|
||||
}
|
||||
|
||||
impl StateUsageStats {
|
||||
/// New empty usage stats.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
started: std::time::Instant::now(),
|
||||
reads: 0.into(),
|
||||
bytes_read: 0.into(),
|
||||
writes: 0.into(),
|
||||
bytes_written: 0.into(),
|
||||
reads_cache: 0.into(),
|
||||
bytes_read_cache: 0.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tally one read operation, of some length.
|
||||
pub fn tally_read(&self, data_bytes: u64, cache: bool) {
|
||||
self.reads.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
self.bytes_read.fetch_add(data_bytes, AtomicOrdering::Relaxed);
|
||||
if cache {
|
||||
self.reads_cache.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
self.bytes_read_cache.fetch_add(data_bytes, AtomicOrdering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tally one key read.
|
||||
pub fn tally_key_read(&self, key: &[u8], val: Option<&Vec<u8>>, cache: bool) {
|
||||
self.tally_read(key.len() as u64 + val.as_ref().map(|x| x.len() as u64).unwrap_or(0), cache);
|
||||
}
|
||||
|
||||
/// Tally one child key read.
|
||||
pub fn tally_child_key_read(&self, key: &(Vec<u8>, Vec<u8>), val: Option<Vec<u8>>, cache: bool) -> Option<Vec<u8>> {
|
||||
self.tally_read(key.0.len() as u64 + key.1.len() as u64 + val.as_ref().map(|x| x.len() as u64).unwrap_or(0), cache);
|
||||
val
|
||||
}
|
||||
|
||||
/// Tally some write operations, including their byte count.
|
||||
pub fn tally_writes(&self, ops: u64, data_bytes: u64) {
|
||||
self.writes.fetch_add(ops, AtomicOrdering::Relaxed);
|
||||
self.bytes_written.fetch_add(data_bytes, AtomicOrdering::Relaxed);
|
||||
}
|
||||
|
||||
/// Merge state machine usage info.
|
||||
pub fn merge_sm(&self, info: sp_state_machine::UsageInfo) {
|
||||
self.reads.fetch_add(info.reads.ops, AtomicOrdering::Relaxed);
|
||||
self.bytes_read.fetch_add(info.reads.bytes, AtomicOrdering::Relaxed);
|
||||
self.writes.fetch_add(info.writes.ops, AtomicOrdering::Relaxed);
|
||||
self.bytes_written.fetch_add(info.writes.bytes, AtomicOrdering::Relaxed);
|
||||
self.reads_cache.fetch_add(info.cache_reads.ops, AtomicOrdering::Relaxed);
|
||||
self.bytes_read_cache.fetch_add(info.cache_reads.bytes, AtomicOrdering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn take(&self) -> sp_state_machine::UsageInfo {
|
||||
use sp_state_machine::UsageUnit;
|
||||
|
||||
fn unit(ops: &AtomicU64, bytes: &AtomicU64) -> UsageUnit {
|
||||
UsageUnit { ops: ops.swap(0, AtomicOrdering::Relaxed), bytes: bytes.swap(0, AtomicOrdering::Relaxed) }
|
||||
}
|
||||
|
||||
sp_state_machine::UsageInfo {
|
||||
reads: unit(&self.reads, &self.bytes_read),
|
||||
writes: unit(&self.writes, &self.bytes_written),
|
||||
cache_reads: unit(&self.reads_cache, &self.bytes_read_cache),
|
||||
// TODO: Proper tracking state of memory footprint here requires
|
||||
// imposing `MallocSizeOf` requirement on half of the codebase,
|
||||
// so it is an open question how to do it better
|
||||
memory: 0,
|
||||
started: self.started,
|
||||
span: self.started.elapsed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,7 @@ use sp_state_machine::{backend::Backend as StateBackend, TrieBackend};
|
||||
use log::trace;
|
||||
use sc_client_api::backend::{StorageCollection, ChildStorageCollection};
|
||||
use std::hash::Hash as StdHash;
|
||||
use crate::stats::StateUsageStats;
|
||||
|
||||
const STATE_CACHE_BLOCKS: usize = 12;
|
||||
|
||||
@@ -296,6 +297,8 @@ pub struct CacheChanges<B: BlockT> {
|
||||
/// in `sync_cache` along with the change overlay.
|
||||
/// For non-canonical clones local cache and changes are dropped.
|
||||
pub struct CachingState<S: StateBackend<HasherFor<B>>, B: BlockT> {
|
||||
/// Usage statistics
|
||||
usage: StateUsageStats,
|
||||
/// Backing state.
|
||||
state: S,
|
||||
/// Cache data.
|
||||
@@ -421,6 +424,7 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> CachingState<S, B> {
|
||||
/// Create a new instance wrapping generic State and shared cache.
|
||||
pub fn new(state: S, shared_cache: SharedCache<B>, parent_hash: Option<B::Hash>) -> Self {
|
||||
CachingState {
|
||||
usage: StateUsageStats::new(),
|
||||
state,
|
||||
cache: CacheChanges {
|
||||
shared_cache,
|
||||
@@ -495,18 +499,22 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> StateBackend<HasherFor<B>> for Ca
|
||||
// Note that local cache makes that lru is not refreshed
|
||||
if let Some(entry) = local_cache.storage.get(key).cloned() {
|
||||
trace!("Found in local cache: {:?}", HexDisplay::from(&key));
|
||||
self.usage.tally_key_read(key, entry.as_ref(), true);
|
||||
|
||||
return Ok(entry)
|
||||
}
|
||||
let mut cache = self.cache.shared_cache.lock();
|
||||
if Self::is_allowed(Some(key), None, &self.cache.parent_hash, &cache.modifications) {
|
||||
if let Some(entry) = cache.lru_storage.get(key).map(|a| a.clone()) {
|
||||
trace!("Found in shared cache: {:?}", HexDisplay::from(&key));
|
||||
self.usage.tally_key_read(key, entry.as_ref(), true);
|
||||
return Ok(entry)
|
||||
}
|
||||
}
|
||||
trace!("Cache miss: {:?}", HexDisplay::from(&key));
|
||||
let value = self.state.storage(key)?;
|
||||
RwLockUpgradableReadGuard::upgrade(local_cache).storage.insert(key.to_vec(), value.clone());
|
||||
self.usage.tally_key_read(key, value.as_ref(), false);
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
@@ -539,17 +547,25 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> StateBackend<HasherFor<B>> for Ca
|
||||
let local_cache = self.cache.local_cache.upgradable_read();
|
||||
if let Some(entry) = local_cache.child_storage.get(&key).cloned() {
|
||||
trace!("Found in local cache: {:?}", key);
|
||||
return Ok(entry)
|
||||
return Ok(
|
||||
self.usage.tally_child_key_read(&key, entry, true)
|
||||
)
|
||||
}
|
||||
let mut cache = self.cache.shared_cache.lock();
|
||||
if Self::is_allowed(None, Some(&key), &self.cache.parent_hash, &cache.modifications) {
|
||||
if let Some(entry) = cache.lru_child_storage.get(&key).map(|a| a.clone()) {
|
||||
trace!("Found in shared cache: {:?}", key);
|
||||
return Ok(entry)
|
||||
return Ok(
|
||||
self.usage.tally_child_key_read(&key, entry, true)
|
||||
)
|
||||
}
|
||||
}
|
||||
trace!("Cache miss: {:?}", key);
|
||||
let value = self.state.child_storage(storage_key, child_info, &key.1[..])?;
|
||||
|
||||
// just pass it through the usage counter
|
||||
let value = self.usage.tally_child_key_read(&key, value, false);
|
||||
|
||||
RwLockUpgradableReadGuard::upgrade(local_cache).child_storage.insert(key, value.clone());
|
||||
Ok(value)
|
||||
}
|
||||
@@ -646,6 +662,10 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> StateBackend<HasherFor<B>> for Ca
|
||||
fn as_trie_backend(&mut self) -> Option<&TrieBackend<Self::TrieBackendStorage, HasherFor<B>>> {
|
||||
self.state.as_trie_backend()
|
||||
}
|
||||
|
||||
fn usage_info(&self) -> sp_state_machine::UsageInfo {
|
||||
self.usage.take()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user