mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 04:11:07 +00:00
Configurable state cache size and enforce exact state cache size (#2314)
* Allow configuring state cache size via cli * Track used cache size * Expose memory counter to informant * Enforce max cache size constraint exactly * Default to 64 MiB Co-Authored-By: cmichi <mich@elmueller.net> * Remove self as parameter
This commit is contained in:
committed by
Gavin Wood
parent
3f06fe32f3
commit
c6d15e2cea
@@ -76,6 +76,12 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
|
||||
TransferRateFormat(bandwidth_upload),
|
||||
);
|
||||
|
||||
let backend = (*client).backend();
|
||||
let used_state_cache_size = match backend.used_state_cache_size(){
|
||||
Some(size) => size,
|
||||
None => 0,
|
||||
};
|
||||
|
||||
// get cpu usage and memory usage of this process
|
||||
let (cpu_usage, memory) = if sys.refresh_process(self_pid) {
|
||||
let proc = sys.get_process(self_pid).expect("Above refresh_process succeeds, this should be Some(), qed");
|
||||
@@ -99,6 +105,7 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
|
||||
"finalized_hash" => ?info.chain.finalized_hash,
|
||||
"bandwidth_download" => bandwidth_download,
|
||||
"bandwidth_upload" => bandwidth_upload,
|
||||
"used_state_cache_size" => used_state_cache_size,
|
||||
);
|
||||
} else {
|
||||
warn!("Error getting best block information");
|
||||
|
||||
@@ -404,6 +404,7 @@ where
|
||||
config.database_path =
|
||||
db_path(&base_path, config.chain_spec.id()).to_string_lossy().into();
|
||||
config.database_cache_size = cli.database_cache_size;
|
||||
config.state_cache_size = cli.state_cache_size;
|
||||
config.pruning = match cli.pruning {
|
||||
Some(ref s) if s == "archive" => PruningMode::ArchiveAll,
|
||||
None => PruningMode::default(),
|
||||
|
||||
@@ -313,6 +313,10 @@ pub struct RunCmd {
|
||||
#[structopt(long = "db-cache", value_name = "MiB")]
|
||||
pub database_cache_size: Option<u32>,
|
||||
|
||||
/// Specify the state cache size
|
||||
#[structopt(long = "state-cache-size", value_name = "Bytes", default_value = "67108864")]
|
||||
pub state_cache_size: usize,
|
||||
|
||||
/// Listen to all RPC interfaces (default is local)
|
||||
#[structopt(long = "rpc-external")]
|
||||
pub rpc_external: bool,
|
||||
|
||||
@@ -65,7 +65,6 @@ use client::in_mem::Backend as InMemoryBackend;
|
||||
|
||||
const CANONICALIZATION_DELAY: u64 = 4096;
|
||||
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u64 = 32768;
|
||||
const STATE_CACHE_SIZE_BYTES: usize = 16 * 1024 * 1024;
|
||||
|
||||
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
|
||||
pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2Hasher>>, Blake2Hasher>;
|
||||
@@ -74,6 +73,8 @@ pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2H
|
||||
pub struct DatabaseSettings {
|
||||
/// Cache size in bytes. If `None` default is used.
|
||||
pub cache_size: Option<usize>,
|
||||
/// State cache size.
|
||||
pub state_cache_size: usize,
|
||||
/// Path to the database.
|
||||
pub path: PathBuf,
|
||||
/// Pruning mode.
|
||||
@@ -543,7 +544,7 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
|
||||
pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
|
||||
let db = open_database(&config, columns::META, "full")?;
|
||||
|
||||
Backend::from_kvdb(db as Arc<_>, config.pruning, canonicalization_delay)
|
||||
Backend::from_kvdb(db as Arc<_>, config.pruning, canonicalization_delay, config.state_cache_size)
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
@@ -556,10 +557,11 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
|
||||
db as Arc<_>,
|
||||
PruningMode::keep_blocks(keep_blocks),
|
||||
canonicalization_delay,
|
||||
16777216,
|
||||
).expect("failed to create test-db")
|
||||
}
|
||||
|
||||
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
|
||||
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64, state_cache_size: usize) -> Result<Self, client::error::Error> {
|
||||
let is_archive_pruning = pruning.is_archive();
|
||||
let blockchain = BlockchainDb::new(db.clone())?;
|
||||
let meta = blockchain.meta.clone();
|
||||
@@ -582,7 +584,7 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
|
||||
changes_trie_config: Mutex::new(None),
|
||||
blockchain,
|
||||
canonicalization_delay,
|
||||
shared_cache: new_shared_cache(STATE_CACHE_SIZE_BYTES),
|
||||
shared_cache: new_shared_cache(state_cache_size),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1161,6 +1163,11 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
|
||||
&self.blockchain
|
||||
}
|
||||
|
||||
fn used_state_cache_size(&self) -> Option<usize> {
|
||||
let used = (*&self.shared_cache).lock().used_storage_cache_size();
|
||||
Some(used)
|
||||
}
|
||||
|
||||
fn state_at(&self, block: BlockId<Block>) -> Result<Self::State, client::error::Error> {
|
||||
use client::blockchain::HeaderBackend as BcHeaderBackend;
|
||||
|
||||
@@ -1319,7 +1326,7 @@ mod tests {
|
||||
db.storage.db.clone()
|
||||
};
|
||||
|
||||
let backend = Backend::<Block>::from_kvdb(backing, PruningMode::keep_blocks(1), 0).unwrap();
|
||||
let backend = Backend::<Block>::from_kvdb(backing, PruningMode::keep_blocks(1), 0, 16777216).unwrap();
|
||||
assert_eq!(backend.blockchain().info().unwrap().best_number, 9);
|
||||
for i in 0..10 {
|
||||
assert!(backend.blockchain().hash(i).unwrap().is_some())
|
||||
|
||||
@@ -39,17 +39,40 @@ pub struct Cache<B: Block, H: Hasher> {
|
||||
/// Information on the modifications in recently committed blocks; specifically which keys
|
||||
/// changed in which block. Ordered by block number.
|
||||
modifications: VecDeque<BlockChanges<B::Header>>,
|
||||
/// Maximum cache size available, in Bytes.
|
||||
shared_cache_size: usize,
|
||||
/// Used storage size, in Bytes.
|
||||
storage_used_size: usize,
|
||||
}
|
||||
|
||||
impl<B: Block, H: Hasher> Cache<B, H> {
|
||||
/// Returns the used memory size of the storage cache in bytes.
|
||||
pub fn used_storage_cache_size(&self) -> usize {
|
||||
self.storage_used_size
|
||||
}
|
||||
}
|
||||
|
||||
pub type SharedCache<B, H> = Arc<Mutex<Cache<B, H>>>;
|
||||
|
||||
/// Create new shared cache instance with given max memory usage.
|
||||
pub fn new_shared_cache<B: Block, H: Hasher>(shared_cache_size: usize) -> SharedCache<B, H> {
|
||||
let cache_items = shared_cache_size / 100; // Guestimate, potentially inaccurate
|
||||
// we need to supply a max capacity to `LruCache`, but since
|
||||
// we don't have any idea how large the size of each item
|
||||
// that is stored will be we can't calculate the max amount
|
||||
// of items properly from `shared_cache_size`.
|
||||
//
|
||||
// what we do instead is to supply `shared_cache_size` as the
|
||||
// max upper bound capacity (this would only be reached if each
|
||||
// item would be one byte).
|
||||
// each time we store to the storage cache we verify the memory
|
||||
// constraint and pop the lru item if space needs to be freed.
|
||||
|
||||
Arc::new(Mutex::new(Cache {
|
||||
storage: LruCache::new(cache_items),
|
||||
hashes: LruCache::new(cache_items),
|
||||
storage: LruCache::new(shared_cache_size),
|
||||
hashes: LruCache::new(shared_cache_size),
|
||||
modifications: VecDeque::new(),
|
||||
shared_cache_size: shared_cache_size,
|
||||
storage_used_size: 0,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -109,6 +132,33 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_insert(cache: &mut Cache<B, H>, k: StorageValue, v: Option<StorageValue>) {
|
||||
if let Some(v_) = &v {
|
||||
while cache.storage_used_size + v_.len() > cache.shared_cache_size {
|
||||
// pop until space constraint satisfied
|
||||
match cache.storage.remove_lru() {
|
||||
Some((_, Some(popped_v))) =>
|
||||
cache.storage_used_size = cache.storage_used_size - popped_v.len(),
|
||||
Some((_, None)) => continue,
|
||||
None => break,
|
||||
};
|
||||
}
|
||||
cache.storage_used_size = cache.storage_used_size + v_.len();
|
||||
}
|
||||
cache.storage.insert(k, v);
|
||||
}
|
||||
|
||||
fn storage_remove(
|
||||
storage: &mut LruCache<StorageKey, Option<StorageValue>>,
|
||||
k: &StorageKey,
|
||||
storage_used_size: &mut usize,
|
||||
) {
|
||||
let v = storage.remove(k);
|
||||
if let Some(Some(v_)) = v {
|
||||
*storage_used_size = *storage_used_size - v_.len();
|
||||
}
|
||||
}
|
||||
|
||||
/// Propagate local cache into the shared cache and synchronize
|
||||
/// the shared cache with the best block state.
|
||||
/// This function updates the shared cache by removing entries
|
||||
@@ -139,7 +189,7 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
|
||||
m.is_canon = true;
|
||||
for a in &m.storage {
|
||||
trace!("Reverting enacted key {:?}", a);
|
||||
cache.storage.remove(a);
|
||||
CachingState::<H, S, B>::storage_remove(&mut cache.storage, a, &mut cache.storage_used_size);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
@@ -155,7 +205,7 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
|
||||
m.is_canon = false;
|
||||
for a in &m.storage {
|
||||
trace!("Retracted key {:?}", a);
|
||||
cache.storage.remove(a);
|
||||
CachingState::<H, S, B>::storage_remove(&mut cache.storage, a, &mut cache.storage_used_size);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
@@ -178,7 +228,7 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
|
||||
if is_best {
|
||||
trace!("Committing {} local, {} hashes, {} modified entries", local_cache.storage.len(), local_cache.hashes.len(), changes.len());
|
||||
for (k, v) in local_cache.storage.drain() {
|
||||
cache.storage.insert(k, v);
|
||||
CachingState::<H, S, B>::storage_insert(cache, k, v);
|
||||
}
|
||||
for (k, v) in local_cache.hashes.drain() {
|
||||
cache.hashes.insert(k, v);
|
||||
@@ -198,7 +248,7 @@ impl<H: Hasher, S: StateBackend<H>, B: Block> CachingState<H, S, B> {
|
||||
modifications.insert(k.clone());
|
||||
if is_best {
|
||||
cache.hashes.remove(&k);
|
||||
cache.storage.insert(k, v);
|
||||
CachingState::<H, S, B>::storage_insert(cache, k, v);
|
||||
}
|
||||
}
|
||||
// Save modified storage. These are ordered by the block number.
|
||||
@@ -418,4 +468,38 @@ mod tests {
|
||||
let s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(h3a.clone()));
|
||||
assert!(s.storage(&key).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_track_used_size_correctly() {
|
||||
let root_parent = H256::random();
|
||||
let shared = new_shared_cache::<Block, Blake2Hasher>(5);
|
||||
let h0 = H256::random();
|
||||
|
||||
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone()));
|
||||
|
||||
let key = H256::random()[..].to_vec();
|
||||
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2, 3]))], Some(h0.clone()), Some(0), || true);
|
||||
assert_eq!(shared.lock().used_storage_cache_size(), 3 /* bytes */);
|
||||
|
||||
let key = H256::random()[..].to_vec();
|
||||
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2]))], Some(h0.clone()), Some(0), || true);
|
||||
assert_eq!(shared.lock().used_storage_cache_size(), 5 /* bytes */);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_remove_lru_items_based_on_tracking_used_size() {
|
||||
let root_parent = H256::random();
|
||||
let shared = new_shared_cache::<Block, Blake2Hasher>(5);
|
||||
let h0 = H256::random();
|
||||
|
||||
let mut s = CachingState::new(InMemory::<Blake2Hasher>::default(), shared.clone(), Some(root_parent.clone()));
|
||||
|
||||
let key = H256::random()[..].to_vec();
|
||||
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2, 3, 4]))], Some(h0.clone()), Some(0), || true);
|
||||
assert_eq!(shared.lock().used_storage_cache_size(), 4 /* bytes */);
|
||||
|
||||
let key = H256::random()[..].to_vec();
|
||||
s.sync_cache(&[], &[], vec![(key.clone(), Some(vec![1, 2]))], Some(h0.clone()), Some(0), || true);
|
||||
assert_eq!(shared.lock().used_storage_cache_size(), 2 /* bytes */);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,6 +141,8 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where
|
||||
fn finalize_block(&self, block: BlockId<Block>, justification: Option<Justification>) -> error::Result<()>;
|
||||
/// Returns reference to blockchain backend.
|
||||
fn blockchain(&self) -> &Self::Blockchain;
|
||||
/// Returns the used state cache, if existent.
|
||||
fn used_state_cache_size(&self) -> Option<usize>;
|
||||
/// Returns reference to changes trie storage.
|
||||
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage>;
|
||||
/// Returns true if state for given block is available.
|
||||
|
||||
@@ -651,6 +651,10 @@ where
|
||||
&self.blockchain
|
||||
}
|
||||
|
||||
fn used_state_cache_size(&self) -> Option<usize> {
|
||||
None
|
||||
}
|
||||
|
||||
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> {
|
||||
Some(&self.changes_trie_storage)
|
||||
}
|
||||
|
||||
@@ -183,6 +183,10 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F, H> where
|
||||
&self.blockchain
|
||||
}
|
||||
|
||||
fn used_state_cache_size(&self) -> Option<usize> {
|
||||
None
|
||||
}
|
||||
|
||||
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -457,6 +457,7 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
|
||||
{
|
||||
let db_settings = client_db::DatabaseSettings {
|
||||
cache_size: config.database_cache_size.map(|u| u as usize),
|
||||
state_cache_size: config.state_cache_size,
|
||||
path: config.database_path.as_str().into(),
|
||||
pruning: config.pruning.clone(),
|
||||
};
|
||||
@@ -532,6 +533,7 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
|
||||
{
|
||||
let db_settings = client_db::DatabaseSettings {
|
||||
cache_size: None,
|
||||
state_cache_size: config.state_cache_size,
|
||||
path: config.database_path.as_str().into(),
|
||||
pruning: config.pruning.clone(),
|
||||
};
|
||||
|
||||
@@ -48,6 +48,8 @@ pub struct Configuration<C, G: Serialize + DeserializeOwned + BuildStorage> {
|
||||
pub database_path: String,
|
||||
/// Cache Size for internal database in MiB
|
||||
pub database_cache_size: Option<u32>,
|
||||
/// Size of internal state cache in Bytes
|
||||
pub state_cache_size: usize,
|
||||
/// Pruning settings.
|
||||
pub pruning: PruningMode,
|
||||
/// Additional key seeds.
|
||||
@@ -93,6 +95,7 @@ impl<C: Default, G: Serialize + DeserializeOwned + BuildStorage> Configuration<C
|
||||
keystore_path: Default::default(),
|
||||
database_path: Default::default(),
|
||||
database_cache_size: Default::default(),
|
||||
state_cache_size: Default::default(),
|
||||
keys: Default::default(),
|
||||
custom: Default::default(),
|
||||
pruning: PruningMode::default(),
|
||||
|
||||
@@ -112,6 +112,7 @@ fn node_config<F: ServiceFactory> (
|
||||
keystore_path: root.join("key").to_str().unwrap().into(),
|
||||
database_path: root.join("db").to_str().unwrap().into(),
|
||||
database_cache_size: None,
|
||||
state_cache_size: 16777216,
|
||||
pruning: Default::default(),
|
||||
keys: keys,
|
||||
chain_spec: (*spec).clone(),
|
||||
|
||||
Reference in New Issue
Block a user