Added client function to delete a recent block (#8533)

* Implemented recent block removal

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2021-05-05 19:16:34 +03:00
committed by GitHub
parent 7c3a3f79ba
commit d11e60510e
7 changed files with 295 additions and 49 deletions
+6
View File
@@ -475,6 +475,12 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
revert_finalized: bool,
) -> sp_blockchain::Result<(NumberFor<Block>, HashSet<Block::Hash>)>;
/// Discard non-best, unfinalized leaf block.
fn remove_leaf_block(
&self,
hash: &Block::Hash,
) -> sp_blockchain::Result<()>;
/// Insert auxiliary data into key-value store.
fn insert_aux<
'a,
+7
View File
@@ -770,6 +770,13 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> where Block::Hash
Ok((Zero::zero(), HashSet::new()))
}
fn remove_leaf_block(
&self,
_hash: &Block::Hash,
) -> sp_blockchain::Result<()> {
Ok(())
}
fn get_import_lock(&self) -> &RwLock<()> {
&self.import_lock
}
+2 -2
View File
@@ -216,8 +216,8 @@ impl<H, N> LeafSet<H, N> where
self.pending_removed.clear();
}
#[cfg(test)]
fn contains(&self, number: N, hash: H) -> bool {
/// Check if given block is a leaf.
pub fn contains(&self, number: N, hash: H) -> bool {
self.storage.get(&Reverse(number)).map_or(false, |hashes| hashes.contains(&hash))
}
+86
View File
@@ -623,6 +623,7 @@ impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
}
fn remove_header_metadata(&self, hash: Block::Hash) {
self.header_cache.lock().remove(&hash);
self.header_metadata_cache.remove_header_metadata(hash);
}
}
@@ -1972,6 +1973,59 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
Ok((reverted, reverted_finalized))
}
fn remove_leaf_block(
&self,
hash: &Block::Hash,
) -> ClientResult<()> {
let best_hash = self.blockchain.info().best_hash;
if best_hash == *hash {
return Err(
sp_blockchain::Error::Backend(
format!("Can't remove best block {:?}", hash)
)
)
}
let hdr = self.blockchain.header_metadata(hash.clone())?;
if !self.have_state_at(&hash, hdr.number) {
return Err(
sp_blockchain::Error::UnknownBlock(
format!("State already discarded for {:?}", hash)
)
)
}
let mut leaves = self.blockchain.leaves.write();
if !leaves.contains(hdr.number, *hash) {
return Err(
sp_blockchain::Error::Backend(
format!("Can't remove non-leaf block {:?}", hash)
)
)
}
let mut transaction = Transaction::new();
if let Some(commit) = self.storage.state_db.remove(hash) {
apply_state_commit(&mut transaction, commit);
}
transaction.remove(columns::KEY_LOOKUP, hash.as_ref());
let changes_trie_cache_ops = self.changes_tries_storage.revert(
&mut transaction,
&cache::ComplexBlockId::new(
*hash,
hdr.number,
),
)?;
self.changes_tries_storage.post_commit(Some(changes_trie_cache_ops));
leaves.revert(hash.clone(), hdr.number);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
self.storage.db.commit(transaction)?;
self.blockchain().remove_header_metadata(*hash);
Ok(())
}
fn blockchain(&self) -> &BlockchainDb<Block> {
&self.blockchain
}
@@ -3008,4 +3062,36 @@ pub(crate) mod tests {
}
}
}
#[test]
fn remove_leaf_block_works() {
let backend = Backend::<Block>::new_test_with_tx_storage(
2,
10,
TransactionStorageMode::StorageChain
);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0 .. 2 {
let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None);
blocks.push(hash);
prev_hash = hash;
}
// insert a fork at block 2, which becomes best block
let best_hash = insert_block(
&backend,
1,
blocks[0],
None,
sp_core::H256::random(),
vec![42.into()],
None
);
assert!(backend.remove_leaf_block(&best_hash).is_err());
assert!(backend.have_state_at(&prev_hash, 1));
backend.remove_leaf_block(&prev_hash).unwrap();
assert_eq!(None, backend.blockchain().header(BlockId::hash(prev_hash.clone())).unwrap());
assert!(!backend.have_state_at(&prev_hash, 1));
}
}
+7
View File
@@ -246,6 +246,13 @@ impl<S, Block> ClientBackend<Block> for Backend<S, HashFor<Block>>
Err(ClientError::NotAvailableOnLightClient)
}
fn remove_leaf_block(
&self,
_hash: &Block::Hash,
) -> ClientResult<()> {
Err(ClientError::NotAvailableOnLightClient)
}
fn get_import_lock(&self) -> &RwLock<()> {
&self.import_lock
}
+17
View File
@@ -364,6 +364,17 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<Block
}
}
fn remove(&mut self, hash: &BlockHash) -> Option<CommitSet<Key>> {
match self.mode {
PruningMode::ArchiveAll => {
Some(CommitSet::default())
},
PruningMode::ArchiveCanonical | PruningMode::Constrained(_) => {
self.non_canonical.remove(hash)
},
}
}
fn pin(&mut self, hash: &BlockHash) -> Result<(), PinError> {
match self.mode {
PruningMode::ArchiveAll => Ok(()),
@@ -509,6 +520,12 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDb<BlockHash
self.db.write().revert_one()
}
/// Remove specified non-canonical block.
/// Returns a database commit or `None` if not possible.
pub fn remove(&self, hash: &BlockHash) -> Option<CommitSet<Key>> {
self.db.write().remove(hash)
}
/// Returns last finalized block number.
pub fn best_canonical(&self) -> Option<u64> {
return self.db.read().best_canonical()
+170 -47
View File
@@ -36,7 +36,7 @@ const MAX_BLOCKS_PER_LEVEL: u64 = 32;
#[derive(parity_util_mem_derive::MallocSizeOf)]
pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
last_canonicalized: Option<(BlockHash, u64)>,
levels: VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
levels: VecDeque<OverlayLevel<BlockHash, Key>>,
parents: HashMap<BlockHash, BlockHash>,
pending_canonicalizations: Vec<BlockHash>,
pending_insertions: Vec<BlockHash>,
@@ -46,6 +46,36 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
pinned_insertions: HashMap<BlockHash, (Vec<Key>, u32)>,
}
#[derive(parity_util_mem_derive::MallocSizeOf)]
#[cfg_attr(test, derive(PartialEq, Debug))]
struct OverlayLevel<BlockHash: Hash, Key: Hash> {
blocks: Vec<BlockOverlay<BlockHash, Key>>,
used_indicies: u64, // Bitmask of available journal indicies.
}
impl<BlockHash: Hash, Key: Hash> OverlayLevel<BlockHash, Key> {
fn push(&mut self, overlay: BlockOverlay<BlockHash, Key>) {
self.used_indicies |= 1 << overlay.journal_index;
self.blocks.push(overlay)
}
fn available_index(&self) -> u64 {
self.used_indicies.trailing_ones() as u64
}
fn remove(&mut self, index: usize) -> BlockOverlay<BlockHash, Key> {
self.used_indicies &= !(1 << self.blocks[index].journal_index);
self.blocks.remove(index)
}
fn new() -> OverlayLevel<BlockHash, Key> {
OverlayLevel {
blocks: Vec::new(),
used_indicies: 0,
}
}
}
#[derive(Encode, Decode)]
struct JournalRecord<BlockHash: Hash, Key: Hash> {
hash: BlockHash,
@@ -62,6 +92,7 @@ fn to_journal_key(block: u64, index: u64) -> Vec<u8> {
#[derive(parity_util_mem_derive::MallocSizeOf)]
struct BlockOverlay<BlockHash: Hash, Key: Hash> {
hash: BlockHash,
journal_index: u64,
journal_key: Vec<u8>,
inserted: Vec<Key>,
deleted: Vec<Key>,
@@ -93,7 +124,7 @@ fn discard_values<Key: Hash>(values: &mut HashMap<Key, (u32, DBValue)>, inserted
}
fn discard_descendants<BlockHash: Hash, Key: Hash>(
levels: &mut (&mut [Vec<BlockOverlay<BlockHash, Key>>], &mut [Vec<BlockOverlay<BlockHash, Key>>]),
levels: &mut (&mut [OverlayLevel<BlockHash, Key>], &mut [OverlayLevel<BlockHash, Key>]),
mut values: &mut HashMap<Key, (u32, DBValue)>,
parents: &mut HashMap<BlockHash, BlockHash>,
pinned: &HashMap<BlockHash, u32>,
@@ -111,36 +142,32 @@ fn discard_descendants<BlockHash: Hash, Key: Hash>(
};
let mut pinned_children = 0;
if let Some(level) = first {
*level = level.drain(..).filter_map(|overlay| {
let parent = parents.get(&overlay.hash)
.expect("there is a parent entry for each entry in levels; qed");
if parent == hash {
let mut num_pinned = discard_descendants(
&mut remainder,
values,
parents,
pinned,
pinned_insertions,
&overlay.hash
);
if pinned.contains_key(&overlay.hash) {
num_pinned += 1;
}
if num_pinned != 0 {
// save to be discarded later.
pinned_insertions.insert(overlay.hash.clone(), (overlay.inserted, num_pinned));
pinned_children += num_pinned;
} else {
// discard immediately.
parents.remove(&overlay.hash);
discard_values(&mut values, overlay.inserted);
}
None
} else {
Some(overlay)
while let Some(i) = level.blocks.iter().position(|overlay| parents.get(&overlay.hash)
.expect("there is a parent entry for each entry in levels; qed")
== hash)
{
let overlay = level.remove(i);
let mut num_pinned = discard_descendants(
&mut remainder,
values,
parents,
pinned,
pinned_insertions,
&overlay.hash
);
if pinned.contains_key(&overlay.hash) {
num_pinned += 1;
}
}).collect();
if num_pinned != 0 {
// save to be discarded later.
pinned_insertions.insert(overlay.hash.clone(), (overlay.inserted, num_pinned));
pinned_children += num_pinned;
} else {
// discard immediately.
parents.remove(&overlay.hash);
discard_values(&mut values, overlay.inserted);
}
}
}
pinned_children
}
@@ -161,7 +188,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
let mut total: u64 = 0;
block += 1;
loop {
let mut level = Vec::new();
let mut level = OverlayLevel::new();
for index in 0 .. MAX_BLOCKS_PER_LEVEL {
let journal_key = to_journal_key(block, index);
if let Some(record) = db.get_meta(&journal_key).map_err(|e| Error::Db(e))? {
@@ -169,6 +196,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
let inserted = record.inserted.iter().map(|(k, _)| k.clone()).collect();
let overlay = BlockOverlay {
hash: record.hash.clone(),
journal_index: index,
journal_key,
inserted: inserted,
deleted: record.deleted,
@@ -187,7 +215,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
total += 1;
}
}
if level.is_empty() {
if level.blocks.is_empty() {
break;
}
levels.push_back(level);
@@ -235,23 +263,24 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
}
}
let level = if self.levels.is_empty() || number == front_block_number + self.levels.len() as u64 {
self.levels.push_back(Vec::new());
self.levels.push_back(OverlayLevel::new());
self.levels.back_mut().expect("can't be empty after insertion; qed")
} else {
self.levels.get_mut((number - front_block_number) as usize)
.expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed")
};
if level.len() >= MAX_BLOCKS_PER_LEVEL as usize {
if level.blocks.len() >= MAX_BLOCKS_PER_LEVEL as usize {
return Err(Error::TooManySiblingBlocks);
}
let index = level.len() as u64;
let index = level.available_index();
let journal_key = to_journal_key(number, index);
let inserted = changeset.inserted.iter().map(|(k, _)| k.clone()).collect();
let overlay = BlockOverlay {
hash: hash.clone(),
journal_index: index,
journal_key: journal_key.clone(),
inserted: inserted,
deleted: changeset.deleted.clone(),
@@ -279,7 +308,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
hash: &BlockHash
) {
if let Some(level) = self.levels.get(level_index) {
level.iter().for_each(|overlay| {
level.blocks.iter().for_each(|overlay| {
let parent = self.parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
if parent == *hash {
discarded_journals.push(overlay.journal_key.clone());
@@ -310,7 +339,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
let start = self.last_canonicalized_block_number().unwrap_or(0);
self.levels
.get(self.pending_canonicalizations.len())
.map(|level| level.iter().map(|r| (r.hash.clone(), start)).collect())
.map(|level| level.blocks.iter().map(|r| (r.hash.clone(), start)).collect())
.unwrap_or_default()
}
@@ -323,14 +352,14 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
) -> Result<(), Error<E>> {
trace!(target: "state-db", "Canonicalizing {:?}", hash);
let level = self.levels.get(self.pending_canonicalizations.len()).ok_or_else(|| Error::InvalidBlock)?;
let index = level
let index = level.blocks
.iter()
.position(|overlay| overlay.hash == *hash)
.ok_or_else(|| Error::InvalidBlock)?;
let mut discarded_journals = Vec::new();
let mut discarded_blocks = Vec::new();
for (i, overlay) in level.iter().enumerate() {
for (i, overlay) in level.blocks.iter().enumerate() {
if i != index {
self.discard_journals(
self.pending_canonicalizations.len() + 1,
@@ -344,7 +373,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
}
// get the one we need to canonicalize
let overlay = &level[index];
let overlay = &level.blocks[index];
commit.data.inserted.extend(overlay.inserted.iter()
.map(|k| (k.clone(), self.values.get(k).expect("For each key in overlays there's a value in values").1.clone())));
commit.data.deleted.extend(overlay.deleted.clone());
@@ -363,13 +392,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
for hash in self.pending_canonicalizations.drain(..) {
trace!(target: "state-db", "Post canonicalizing {:?}", hash);
let level = self.levels.pop_front().expect("Hash validity is checked in `canonicalize`");
let index = level
let index = level.blocks
.iter()
.position(|overlay| overlay.hash == hash)
.expect("Hash validity is checked in `canonicalize`");
// discard unfinalized overlays and values
for (i, overlay) in level.into_iter().enumerate() {
for (i, overlay) in level.blocks.into_iter().enumerate() {
let mut pinned_children = if i != index {
discard_descendants(
&mut self.levels.as_mut_slices(),
@@ -421,7 +450,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
pub fn revert_one(&mut self) -> Option<CommitSet<Key>> {
self.levels.pop_back().map(|level| {
let mut commit = CommitSet::default();
for overlay in level.into_iter() {
for overlay in level.blocks.into_iter() {
commit.meta.deleted.push(overlay.journal_key);
self.parents.remove(&overlay.hash);
discard_values(&mut self.values, overlay.inserted);
@@ -430,6 +459,36 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
})
}
/// Revert a single block. Returns commit set that deletes the journal or `None` if not possible.
pub fn remove(&mut self, hash: &BlockHash) -> Option<CommitSet<Key>> {
let mut commit = CommitSet::default();
let level_count = self.levels.len();
for (level_index, level) in self.levels.iter_mut().enumerate().rev() {
let index = match level.blocks.iter().position(|overlay| &overlay.hash == hash) {
Some(index) => index,
None => continue,
};
// Check that it does not have any children
if (level_index != level_count - 1) && self.parents.values().any(|h| h == hash) {
log::debug!(target: "state-db", "Trying to remove block {:?} with children", hash);
return None;
}
let overlay = level.remove(index);
commit.meta.deleted.push(overlay.journal_key);
self.parents.remove(&overlay.hash);
discard_values(&mut self.values, overlay.inserted);
break;
}
if self.levels.back().map_or(false, |l| l.blocks.is_empty()) {
self.levels.pop_back();
}
if !commit.meta.deleted.is_empty() {
Some(commit)
} else {
None
}
}
fn revert_insertions(&mut self) {
self.pending_insertions.reverse();
for hash in self.pending_insertions.drain(..) {
@@ -437,12 +496,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
// find a level. When iterating insertions backwards the hash is always last in the level.
let level_index =
self.levels.iter().position(|level|
level.last().expect("Hash is added in `insert` in reverse order").hash == hash)
level.blocks.last().expect("Hash is added in `insert` in reverse order").hash == hash)
.expect("Hash is added in insert");
let overlay = self.levels[level_index].pop().expect("Empty levels are not allowed in self.levels");
let overlay_index = self.levels[level_index].blocks.len() - 1;
let overlay = self.levels[level_index].remove(overlay_index);
discard_values(&mut self.values, overlay.inserted);
if self.levels[level_index].is_empty() {
if self.levels[level_index].blocks.is_empty() {
debug_assert_eq!(level_index, self.levels.len() - 1);
self.levels.pop_back();
}
@@ -1000,4 +1060,67 @@ mod tests {
overlay.apply_pending();
assert!(!contains(&overlay, 21));
}
#[test]
fn index_reuse() {
// This test discards a branch that is journaled under a non-zero index on level 1,
// making sure all journals are loaded for each level even if some of them are missing.
let root = H256::random();
let h1 = H256::random();
let h2 = H256::random();
let h11 = H256::random();
let h21 = H256::random();
let mut db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
db.commit(&overlay.insert::<io::Error>(&root, 10, &H256::default(), make_changeset(&[], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h1, 11, &root, make_changeset(&[1], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h2, 11, &root, make_changeset(&[2], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h11, 12, &h1, make_changeset(&[11], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h21, 12, &h2, make_changeset(&[21], &[])).unwrap());
let mut commit = CommitSet::default();
overlay.canonicalize::<io::Error>(&root, &mut commit).unwrap();
overlay.canonicalize::<io::Error>(&h2, &mut commit).unwrap(); // h11 should stay in the DB
db.commit(&commit);
overlay.apply_pending();
// add another block at top level. It should reuse journal index 0 of previously discarded block
let h22 = H256::random();
db.commit(&overlay.insert::<io::Error>(&h22, 12, &h2, make_changeset(&[22], &[])).unwrap());
assert_eq!(overlay.levels[0].blocks[0].journal_index, 1);
assert_eq!(overlay.levels[0].blocks[1].journal_index, 0);
// Restore into a new overlay and check that journaled value exists.
let overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
assert_eq!(overlay.parents.len(), 2);
assert!(contains(&overlay, 21));
assert!(contains(&overlay, 22));
}
#[test]
fn remove_works() {
let root = H256::random();
let h1 = H256::random();
let h2 = H256::random();
let h11 = H256::random();
let h21 = H256::random();
let mut db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
db.commit(&overlay.insert::<io::Error>(&root, 10, &H256::default(), make_changeset(&[], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h1, 11, &root, make_changeset(&[1], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h2, 11, &root, make_changeset(&[2], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h11, 12, &h1, make_changeset(&[11], &[])).unwrap());
db.commit(&overlay.insert::<io::Error>(&h21, 12, &h2, make_changeset(&[21], &[])).unwrap());
assert!(overlay.remove(&h1).is_none());
assert!(overlay.remove(&h2).is_none());
assert_eq!(overlay.levels.len(), 3);
db.commit(&overlay.remove(&h11).unwrap());
assert!(!contains(&overlay, 11));
db.commit(&overlay.remove(&h21).unwrap());
assert_eq!(overlay.levels.len(), 2);
db.commit(&overlay.remove(&h2).unwrap());
assert!(!contains(&overlay, 2));
}
}