backend: Add only pruned blocks to unpin hashset and add a test

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-01-16 13:13:35 +02:00
parent 2a5ac613f4
commit a74fb4b83b
3 changed files with 97 additions and 36 deletions
+5 -2
View File
@@ -265,10 +265,13 @@ pub(super) mod test_utils {
}
/// A finalized event
pub fn ev_finalized(ns: impl IntoIterator<Item = u64>) -> FollowEvent<H256> {
pub fn ev_finalized(
ns: impl IntoIterator<Item = u64>,
pruned: impl IntoIterator<Item = u64>,
) -> FollowEvent<H256> {
FollowEvent::Finalized(Finalized {
finalized_block_hashes: ns.into_iter().map(H256::from_low_u64_le).collect(),
pruned_block_hashes: vec![],
pruned_block_hashes: pruned.into_iter().map(H256::from_low_u64_le).collect(),
})
}
}
@@ -424,7 +424,7 @@ mod test {
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([1], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -465,7 +465,7 @@ mod test {
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([1], [])),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Err(Error::Other("ended".to_owned())),
@@ -517,7 +517,7 @@ mod test {
Ok(ev_best_block(1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Ok(ev_finalized([1])),
Ok(ev_finalized([1], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -109,7 +109,8 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
// The first finalized block gets the starting block_num.
let rel_block_num = this.rel_block_num;
let block_ref = this.pin_block_at(rel_block_num, details.finalized_block_hash);
let block_ref =
this.pin_block_at(rel_block_num, details.finalized_block_hash, false);
FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
finalized_block_hash: block_ref,
@@ -126,9 +127,10 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
.map(|p| p.rel_block_num)
.unwrap_or(this.rel_block_num);
let block_ref = this.pin_block_at(parent_rel_block_num + 1, details.block_hash);
let block_ref =
this.pin_block_at(parent_rel_block_num + 1, details.block_hash, false);
let parent_block_ref =
this.pin_block_at(parent_rel_block_num, details.parent_block_hash);
this.pin_block_at(parent_rel_block_num, details.parent_block_hash, false);
FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock {
block_hash: block_ref,
@@ -140,7 +142,8 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
// We expect this block to already exist, so it'll keep its existing block_num,
// but worst case it'll just get the current finalized block_num + 1.
let rel_block_num = this.rel_block_num + 1;
let block_ref = this.pin_block_at(rel_block_num, details.best_block_hash);
let block_ref =
this.pin_block_at(rel_block_num, details.best_block_hash, false);
FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: block_ref,
@@ -156,7 +159,7 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
// but if they don't, we just increment the num from the last finalized block
// we saw, which should be accurate.
let rel_block_num = this.rel_block_num + idx + 1;
this.pin_block_at(rel_block_num, hash)
this.pin_block_at(rel_block_num, hash, false)
})
.collect();
@@ -170,17 +173,15 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
.map(|hash| {
// We should know about these, too, and if not we set their age to last_finalized + 1
let rel_block_num = this.rel_block_num + 1;
this.pin_block_at(rel_block_num, hash)
this.pin_block_at(rel_block_num, hash, true)
})
.collect();
let pruned_hashes = pruned_block_refs.iter().map(|block| block.hash());
// At this point, we also check to see which blocks we should submit unpin events
// for. When we see a block hash as finalized, we know that it won't be reported again
// (except as a parent hash of a new block), so we can safely make an unpin call for it
// without worrying about the hash being returned again despite the block not being pinned.
this.unpin_blocks(pruned_hashes, cx.waker());
this.unpin_blocks(cx.waker());
FollowStreamMsg::Event(FollowEvent::Finalized(Finalized {
finalized_block_hashes: finalized_block_refs,
@@ -272,12 +273,18 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Pin a block, or return the reference to an already-pinned block. If the block has been registered to
/// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already
/// been sent though, then the block will be unpinned.
fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef<Hash> {
fn pin_block_at(
&mut self,
rel_block_num: usize,
hash: Hash,
should_unpin: bool,
) -> BlockRef<Hash> {
let entry = self
.pinned
.entry(hash)
// Only if there's already an entry do we need to clear any unpin flags set against it.
.and_modify(|_| {
.and_modify(|entry| {
entry.block_ref.should_unpin = should_unpin;
self.unpin_flags.lock().unwrap().remove(&hash);
})
// If there's not an entry already, make one and return it.
@@ -288,6 +295,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
hash,
unpin_flags: self.unpin_flags.clone(),
}),
should_unpin,
},
});
@@ -295,7 +303,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
}
/// Unpin any blocks that are either too old, or have the unpin flag set and are in the list of pruned hashes.
fn unpin_blocks(&mut self, pruned_hashes: impl IntoIterator<Item = Hash>, waker: &Waker) {
fn unpin_blocks(&mut self, waker: &Waker) {
let unpin_flags = std::mem::take(&mut *self.unpin_flags.lock().unwrap());
let rel_block_num = self.rel_block_num;
@@ -307,18 +315,14 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
let mut blocks_to_unpin = vec![];
for (hash, details) in &self.pinned {
if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life {
if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life
|| unpin_flags.contains(&hash)
{
// The block is too old, or it's been flagged to be unpinned (no refs to it left)
blocks_to_unpin.push(*hash);
}
}
for hash in pruned_hashes {
if unpin_flags.contains(&hash) {
blocks_to_unpin.push(hash);
}
}
// No need to call the waker etc if nothing to do:
if blocks_to_unpin.is_empty() {
return;
@@ -355,6 +359,7 @@ struct PinnedDetails<Hash: BlockHash> {
#[derive(Debug, Clone)]
pub struct BlockRef<Hash: BlockHash> {
inner: Arc<BlockRefInner<Hash>>,
should_unpin: bool,
}
#[derive(Debug)]
@@ -373,6 +378,7 @@ impl<Hash: BlockHash> BlockRef<Hash> {
hash,
unpin_flags: Default::default(),
}),
should_unpin: false,
}
}
@@ -400,7 +406,7 @@ impl<Hash: BlockHash> Drop for BlockRef<Hash> {
// only "external" one left and we should ask to unpin it now. if it's
// the only ref remaining, it means that it's already been unpinned, so
// nothing to do here anyway.
if Arc::strong_count(&self.inner) == 2 {
if self.should_unpin && Arc::strong_count(&self.inner) == 2 {
if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() {
unpin_flags.insert(self.inner.hash);
}
@@ -520,11 +526,11 @@ mod test {
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_finalized([3])),
Ok(ev_finalized([4])),
Ok(ev_finalized([5])),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Ok(ev_finalized([3], [])),
Ok(ev_finalized([4], [])),
Ok(ev_finalized([5], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -569,8 +575,10 @@ mod test {
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -579,8 +587,13 @@ mod test {
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
let f1 = follow_unpin.next().await.unwrap().unwrap();
let n1 = follow_unpin.next().await.unwrap().unwrap();
drop(n1);
let n2 = follow_unpin.next().await.unwrap().unwrap();
drop(n2);
let f1 = follow_unpin.next().await.unwrap().unwrap();
// We don't care about block 1 any more; drop it. unpinning happens when the max_life is exceeded.
drop(f1);
@@ -592,6 +605,51 @@ mod test {
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
}
#[tokio::test]
async fn pruned_blocks_should_get_unpinned() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(1, 3)),
Ok(ev_finalized([1], [3])),
Ok(ev_finalized([2], [])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
let n1 = follow_unpin.next().await.unwrap().unwrap();
drop(n1);
let n2 = follow_unpin.next().await.unwrap().unwrap();
drop(n2);
let n3 = follow_unpin.next().await.unwrap().unwrap();
drop(n3);
let f1 = follow_unpin.next().await.unwrap().unwrap();
drop(f1);
let f2 = follow_unpin.next().await.unwrap().unwrap();
drop(f2);
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(3));
// Confirm that 0 and 2 are still pinned and that 1 isnt.
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
// Block 3 was pruned, so should be unpinned by the second finalized event.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(3)));
}
#[tokio::test]
async fn never_unpin_new_block_before_finalized() {
// Ensure that if we drop a new block; the pinning is still active until the block is finalized.
@@ -602,9 +660,9 @@ mod test {
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_finalized([3])),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Ok(ev_finalized([3], [])),
Err(Error::Other("ended".to_owned())),
]
},