mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 19:01:08 +00:00
Synchronize state cache on finalization (#3246)
* Reorg test * Fixed informant misreporting reorgs * Update cache when reorg is caused by applying finality * Test for finality reorg * Simplified test * Typo Co-Authored-By: André Silva <andre.beat@gmail.com>
This commit is contained in:
committed by
André Silva
parent
1295260f2b
commit
1d5cd20c44
Generated
+1
@@ -4225,6 +4225,7 @@ name = "substrate-client"
|
||||
version = "2.0.0"
|
||||
dependencies = [
|
||||
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hash-db 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
||||
@@ -48,15 +48,15 @@ where C: Components {
|
||||
});
|
||||
|
||||
let client = service.client();
|
||||
let mut last = {
|
||||
let mut last_best = {
|
||||
let info = client.info();
|
||||
Some((info.chain.best_number, info.chain.best_hash))
|
||||
};
|
||||
|
||||
let display_block_import = client.import_notification_stream().map(|v| Ok::<_, ()>(v)).compat().for_each(move |n| {
|
||||
// detect and log reorganizations.
|
||||
if let Some((ref last_num, ref last_hash)) = last {
|
||||
if n.header.parent_hash() != last_hash {
|
||||
if let Some((ref last_num, ref last_hash)) = last_best {
|
||||
if n.header.parent_hash() != last_hash && n.is_new_best {
|
||||
let tree_route = ::client::blockchain::tree_route(
|
||||
#[allow(deprecated)]
|
||||
client.backend().blockchain(),
|
||||
@@ -77,7 +77,9 @@ where C: Components {
|
||||
}
|
||||
}
|
||||
|
||||
last = Some((n.header.number().clone(), n.hash.clone()));
|
||||
if n.is_new_best {
|
||||
last_best = Some((n.header.number().clone(), n.hash.clone()));
|
||||
}
|
||||
|
||||
info!(target: "substrate", "Imported #{} ({})", n.header.number(), n.hash);
|
||||
Ok(())
|
||||
|
||||
@@ -28,6 +28,7 @@ inherents = { package = "substrate-inherents", path = "../inherents", default-fe
|
||||
sr-api-macros = { path = "../sr-api-macros" }
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.6"
|
||||
test-client = { package = "substrate-test-runtime-client", path = "../test-runtime/client" }
|
||||
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d" }
|
||||
|
||||
|
||||
@@ -1108,21 +1108,24 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(set_head) = operation.set_head {
|
||||
let cache_update = if let Some(set_head) = operation.set_head {
|
||||
if let Some(header) = ::client::blockchain::HeaderBackend::header(&self.blockchain, set_head)? {
|
||||
let number = header.number();
|
||||
let hash = header.hash();
|
||||
|
||||
self.set_head_with_transaction(
|
||||
let (enacted, retracted) = self.set_head_with_transaction(
|
||||
&mut transaction,
|
||||
hash.clone(),
|
||||
(number.clone(), hash.clone())
|
||||
)?;
|
||||
meta_updates.push((hash, *number, true, false));
|
||||
Some((enacted, retracted))
|
||||
} else {
|
||||
return Err(client::error::Error::UnknownBlock(format!("Cannot set head {:?}", set_head)))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let write_result = self.storage.db.write(transaction).map_err(db_err);
|
||||
|
||||
@@ -1152,6 +1155,10 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some((enacted, retracted)) = cache_update {
|
||||
self.shared_cache.lock().sync(&enacted, &retracted);
|
||||
}
|
||||
|
||||
for (hash, number, is_best, is_finalized) in meta_updates {
|
||||
self.blockchain.update_meta(hash, number, is_best, is_finalized);
|
||||
}
|
||||
|
||||
@@ -151,6 +151,65 @@ impl<B: BlockT, H: Hasher> Cache<B, H> {
|
||||
+ self.lru_child_storage.used_size()
|
||||
// ignore small hashes storage and self.lru_hashes.used_size()
|
||||
}
|
||||
|
||||
/// Synchronize the shared cache with the best block state.
|
||||
/// This function updates the shared cache by removing entries
|
||||
/// that are invalidated by chain reorganization. It should be
|
||||
/// be called when chain reorg happens without importing a new block.
|
||||
pub fn sync(&mut self, enacted: &[B::Hash], retracted: &[B::Hash]) {
|
||||
trace!("Syncing shared cache, enacted = {:?}, retracted = {:?}", enacted, retracted);
|
||||
|
||||
// Purge changes from re-enacted and retracted blocks.
|
||||
// Filter out commiting block if any.
|
||||
let mut clear = false;
|
||||
for block in enacted {
|
||||
clear = clear || {
|
||||
if let Some(ref mut m) = self.modifications.iter_mut().find(|m| &m.hash == block) {
|
||||
trace!("Reverting enacted block {:?}", block);
|
||||
m.is_canon = true;
|
||||
for a in &m.storage {
|
||||
trace!("Reverting enacted key {:?}", a);
|
||||
self.lru_storage.remove(a);
|
||||
}
|
||||
for a in &m.child_storage {
|
||||
trace!("Reverting enacted child key {:?}", a);
|
||||
self.lru_child_storage.remove(a);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for block in retracted {
|
||||
clear = clear || {
|
||||
if let Some(ref mut m) = self.modifications.iter_mut().find(|m| &m.hash == block) {
|
||||
trace!("Retracting block {:?}", block);
|
||||
m.is_canon = false;
|
||||
for a in &m.storage {
|
||||
trace!("Retracted key {:?}", a);
|
||||
self.lru_storage.remove(a);
|
||||
}
|
||||
for a in &m.child_storage {
|
||||
trace!("Retracted child key {:?}", a);
|
||||
self.lru_child_storage.remove(a);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
};
|
||||
}
|
||||
if clear {
|
||||
// We don't know anything about the block; clear everything
|
||||
trace!("Wiping cache");
|
||||
self.lru_storage.clear();
|
||||
self.lru_child_storage.clear();
|
||||
self.lru_hashes.clear();
|
||||
self.modifications.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type SharedCache<B, H> = Arc<Mutex<Cache<B, H>>>;
|
||||
@@ -247,58 +306,12 @@ impl<H: Hasher, B: BlockT> CacheChanges<H, B> {
|
||||
let is_best = is_best();
|
||||
trace!("Syncing cache, id = (#{:?}, {:?}), parent={:?}, best={}", commit_number, commit_hash, self.parent_hash, is_best);
|
||||
let cache = &mut *cache;
|
||||
|
||||
// Purge changes from re-enacted and retracted blocks.
|
||||
// Filter out commiting block if any.
|
||||
let mut clear = false;
|
||||
for block in enacted.iter().filter(|h| commit_hash.as_ref().map_or(true, |p| *h != p)) {
|
||||
clear = clear || {
|
||||
if let Some(ref mut m) = cache.modifications.iter_mut().find(|m| &m.hash == block) {
|
||||
trace!("Reverting enacted block {:?}", block);
|
||||
m.is_canon = true;
|
||||
for a in &m.storage {
|
||||
trace!("Reverting enacted key {:?}", a);
|
||||
cache.lru_storage.remove(a);
|
||||
}
|
||||
for a in &m.child_storage {
|
||||
trace!("Reverting enacted child key {:?}", a);
|
||||
cache.lru_child_storage.remove(a);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for block in retracted {
|
||||
clear = clear || {
|
||||
if let Some(ref mut m) = cache.modifications.iter_mut().find(|m| &m.hash == block) {
|
||||
trace!("Retracting block {:?}", block);
|
||||
m.is_canon = false;
|
||||
for a in &m.storage {
|
||||
trace!("Retracted key {:?}", a);
|
||||
cache.lru_storage.remove(a);
|
||||
}
|
||||
for a in &m.child_storage {
|
||||
trace!("Retracted child key {:?}", a);
|
||||
cache.lru_child_storage.remove(a);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
};
|
||||
}
|
||||
if clear {
|
||||
// We don't know anything about the block; clear everything
|
||||
trace!("Wiping cache");
|
||||
cache.lru_storage.clear();
|
||||
cache.lru_child_storage.clear();
|
||||
cache.lru_hashes.clear();
|
||||
cache.modifications.clear();
|
||||
}
|
||||
|
||||
let enacted: Vec<_> = enacted
|
||||
.iter()
|
||||
.filter(|h| commit_hash.as_ref().map_or(true, |p| *h != p))
|
||||
.cloned()
|
||||
.collect();
|
||||
cache.sync(&enacted, retracted);
|
||||
// Propagate cache only if committing on top of the latest canonical state
|
||||
// blocks are ordered by number and only one block with a given number is marked as canonical
|
||||
// (contributed to canonical state cache)
|
||||
|
||||
@@ -2690,4 +2690,89 @@ pub(crate) mod tests {
|
||||
let id = BlockId::<Block>::Number(72340207214430721);
|
||||
client.header(&id).expect_err("invalid block number overflows u32");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_reverted_on_reorg() {
|
||||
let _ = env_logger::try_init();
|
||||
let client = test_client::new();
|
||||
|
||||
let current_balance = ||
|
||||
client.runtime_api().balance_of(
|
||||
&BlockId::number(client.current_height()), AccountKeyring::Alice.into()
|
||||
).unwrap();
|
||||
|
||||
// G -> A1 -> A2
|
||||
// \
|
||||
// -> B1
|
||||
let mut a1 = client.new_block_at(&BlockId::Number(0), Default::default()).unwrap();
|
||||
a1.push_transfer(Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Bob.into(),
|
||||
amount: 10,
|
||||
nonce: 0,
|
||||
}).unwrap();
|
||||
let a1 = a1.bake().unwrap();
|
||||
client.import(BlockOrigin::Own, a1.clone()).unwrap();
|
||||
|
||||
let mut b1 = client.new_block_at(&BlockId::Number(0), Default::default()).unwrap();
|
||||
b1.push_transfer(Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Ferdie.into(),
|
||||
amount: 50,
|
||||
nonce: 0,
|
||||
}).unwrap();
|
||||
let b1 = b1.bake().unwrap();
|
||||
// Reorg to B1
|
||||
client.import_as_best(BlockOrigin::Own, b1.clone()).unwrap();
|
||||
|
||||
assert_eq!(950, current_balance());
|
||||
let mut a2 = client.new_block_at(&BlockId::Hash(a1.hash()), Default::default()).unwrap();
|
||||
a2.push_transfer(Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Charlie.into(),
|
||||
amount: 10,
|
||||
nonce: 1,
|
||||
}).unwrap();
|
||||
// Re-org to A2
|
||||
client.import_as_best(BlockOrigin::Own, a2.bake().unwrap()).unwrap();
|
||||
assert_eq!(980, current_balance());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_reverted_on_set_head() {
|
||||
let _ = env_logger::try_init();
|
||||
let client = test_client::new();
|
||||
|
||||
let current_balance = ||
|
||||
client.runtime_api().balance_of(
|
||||
&BlockId::number(client.current_height()), AccountKeyring::Alice.into()
|
||||
).unwrap();
|
||||
|
||||
// G -> A1
|
||||
// \
|
||||
// -> B1
|
||||
let mut a1 = client.new_block_at(&BlockId::Number(0), Default::default()).unwrap();
|
||||
a1.push_transfer(Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Bob.into(),
|
||||
amount: 10,
|
||||
nonce: 0,
|
||||
}).unwrap();
|
||||
let a1 = a1.bake().unwrap();
|
||||
client.import(BlockOrigin::Own, a1.clone()).unwrap();
|
||||
|
||||
let mut b1 = client.new_block_at(&BlockId::Number(0), Default::default()).unwrap();
|
||||
b1.push_transfer(Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Ferdie.into(),
|
||||
amount: 50,
|
||||
nonce: 0,
|
||||
}).unwrap();
|
||||
let b1 = b1.bake().unwrap();
|
||||
client.import(BlockOrigin::Own, b1.clone()).unwrap();
|
||||
assert_eq!(990, current_balance());
|
||||
// Set B1 as new best
|
||||
client.set_head(BlockId::hash(b1.hash())).unwrap();
|
||||
assert_eq!(950, current_balance());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,8 +292,11 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
}
|
||||
|
||||
pub fn pin(&mut self, hash: &BlockHash) {
|
||||
trace!(target: "state-db", "Pinned block: {:?}", hash);
|
||||
*self.pinned.entry(hash.clone()).or_default() += 1;
|
||||
let refs = self.pinned.entry(hash.clone()).or_default();
|
||||
if *refs == 0 {
|
||||
trace!(target: "state-db", "Pinned block: {:?}", hash);
|
||||
}
|
||||
*refs += 1
|
||||
}
|
||||
|
||||
pub fn unpin(&mut self, hash: &BlockHash) {
|
||||
|
||||
@@ -34,6 +34,10 @@ pub trait ClientExt<Block: BlockT>: Sized {
|
||||
fn import(&self, origin: BlockOrigin, block: Block)
|
||||
-> Result<(), ConsensusError>;
|
||||
|
||||
/// Import a block and make it our best block if possible.
|
||||
fn import_as_best(&self, origin: BlockOrigin, block: Block)
|
||||
-> Result<(), ConsensusError>;
|
||||
|
||||
/// Import block with justification, finalizes block.
|
||||
fn import_justified(
|
||||
&self,
|
||||
@@ -78,6 +82,24 @@ impl<B, E, RA, Block> ClientExt<Block> for Client<B, E, Block, RA>
|
||||
BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ())
|
||||
}
|
||||
|
||||
fn import_as_best(&self, origin: BlockOrigin, block: Block)
|
||||
-> Result<(), ConsensusError>
|
||||
{
|
||||
let (header, extrinsics) = block.deconstruct();
|
||||
let import = BlockImportParams {
|
||||
origin,
|
||||
header,
|
||||
justification: None,
|
||||
post_digests: vec![],
|
||||
body: Some(extrinsics),
|
||||
finalized: false,
|
||||
auxiliary: Vec::new(),
|
||||
fork_choice: ForkChoiceStrategy::Custom(true),
|
||||
};
|
||||
|
||||
BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ())
|
||||
}
|
||||
|
||||
fn import_justified(
|
||||
&self,
|
||||
origin: BlockOrigin,
|
||||
|
||||
Reference in New Issue
Block a user