From a74fb4b83b5aefd707f978c145d6c904717d4a34 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 16 Jan 2024 13:13:35 +0200 Subject: [PATCH] backend: Add only pruned blocks to unpin hashset and add a test Signed-off-by: Alexandru Vasile --- subxt/src/backend/unstable/follow_stream.rs | 7 +- .../backend/unstable/follow_stream_driver.rs | 6 +- .../backend/unstable/follow_stream_unpin.rs | 120 +++++++++++++----- 3 files changed, 97 insertions(+), 36 deletions(-) diff --git a/subxt/src/backend/unstable/follow_stream.rs b/subxt/src/backend/unstable/follow_stream.rs index dbd2616328..ea8253c958 100644 --- a/subxt/src/backend/unstable/follow_stream.rs +++ b/subxt/src/backend/unstable/follow_stream.rs @@ -265,10 +265,13 @@ pub(super) mod test_utils { } /// A finalized event - pub fn ev_finalized(ns: impl IntoIterator) -> FollowEvent { + pub fn ev_finalized( + ns: impl IntoIterator, + pruned: impl IntoIterator, + ) -> FollowEvent { FollowEvent::Finalized(Finalized { finalized_block_hashes: ns.into_iter().map(H256::from_low_u64_le).collect(), - pruned_block_hashes: vec![], + pruned_block_hashes: pruned.into_iter().map(H256::from_low_u64_le).collect(), }) } } diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index 028065f941..fdca29723a 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -424,7 +424,7 @@ mod test { Ok(ev_initialized(0)), Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), - Ok(ev_finalized([1])), + Ok(ev_finalized([1], [])), Err(Error::Other("ended".to_owned())), ] }, @@ -465,7 +465,7 @@ mod test { Ok(ev_initialized(0)), Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), - Ok(ev_finalized([1])), + Ok(ev_finalized([1], [])), Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), Err(Error::Other("ended".to_owned())), @@ -517,7 +517,7 @@ mod test { Ok(ev_best_block(1)), Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), - Ok(ev_finalized([1])), + Ok(ev_finalized([1], [])), Err(Error::Other("ended".to_owned())), ] }, diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index d4e9e7c431..985ef6ff82 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -109,7 +109,8 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Event(FollowEvent::Initialized(details)) => { // The first finalized block gets the starting block_num. let rel_block_num = this.rel_block_num; - let block_ref = this.pin_block_at(rel_block_num, details.finalized_block_hash); + let block_ref = + this.pin_block_at(rel_block_num, details.finalized_block_hash, false); FollowStreamMsg::Event(FollowEvent::Initialized(Initialized { finalized_block_hash: block_ref, @@ -126,9 +127,10 @@ impl Stream for FollowStreamUnpin { .map(|p| p.rel_block_num) .unwrap_or(this.rel_block_num); - let block_ref = this.pin_block_at(parent_rel_block_num + 1, details.block_hash); + let block_ref = + this.pin_block_at(parent_rel_block_num + 1, details.block_hash, false); let parent_block_ref = - this.pin_block_at(parent_rel_block_num, details.parent_block_hash); + this.pin_block_at(parent_rel_block_num, details.parent_block_hash, false); FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock { block_hash: block_ref, @@ -140,7 +142,8 @@ impl Stream for FollowStreamUnpin { // We expect this block to already exist, so it'll keep its existing block_num, // but worst case it'll just get the current finalized block_num + 1. let rel_block_num = this.rel_block_num + 1; - let block_ref = this.pin_block_at(rel_block_num, details.best_block_hash); + let block_ref = + this.pin_block_at(rel_block_num, details.best_block_hash, false); FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_ref, @@ -156,7 +159,7 @@ impl Stream for FollowStreamUnpin { // but if they don't, we just increment the num from the last finalized block // we saw, which should be accurate. let rel_block_num = this.rel_block_num + idx + 1; - this.pin_block_at(rel_block_num, hash) + this.pin_block_at(rel_block_num, hash, false) }) .collect(); @@ -170,17 +173,15 @@ impl Stream for FollowStreamUnpin { .map(|hash| { // We should know about these, too, and if not we set their age to last_finalized + 1 let rel_block_num = this.rel_block_num + 1; - this.pin_block_at(rel_block_num, hash) + this.pin_block_at(rel_block_num, hash, true) }) .collect(); - let pruned_hashes = pruned_block_refs.iter().map(|block| block.hash()); - // At this point, we also check to see which blocks we should submit unpin events // for. When we see a block hash as finalized, we know that it won't be reported again // (except as a parent hash of a new block), so we can safely make an unpin call for it // without worrying about the hash being returned again despite the block not being pinned. - this.unpin_blocks(pruned_hashes, cx.waker()); + this.unpin_blocks(cx.waker()); FollowStreamMsg::Event(FollowEvent::Finalized(Finalized { finalized_block_hashes: finalized_block_refs, @@ -272,12 +273,18 @@ impl FollowStreamUnpin { /// Pin a block, or return the reference to an already-pinned block. If the block has been registered to /// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already /// been sent though, then the block will be unpinned. - fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef { + fn pin_block_at( + &mut self, + rel_block_num: usize, + hash: Hash, + should_unpin: bool, + ) -> BlockRef { let entry = self .pinned .entry(hash) // Only if there's already an entry do we need to clear any unpin flags set against it. - .and_modify(|_| { + .and_modify(|entry| { + entry.block_ref.should_unpin = should_unpin; self.unpin_flags.lock().unwrap().remove(&hash); }) // If there's not an entry already, make one and return it. @@ -288,6 +295,7 @@ impl FollowStreamUnpin { hash, unpin_flags: self.unpin_flags.clone(), }), + should_unpin, }, }); @@ -295,7 +303,7 @@ impl FollowStreamUnpin { } /// Unpin any blocks that are either too old, or have the unpin flag set and are in the list of pruned hashes. - fn unpin_blocks(&mut self, pruned_hashes: impl IntoIterator, waker: &Waker) { + fn unpin_blocks(&mut self, waker: &Waker) { let unpin_flags = std::mem::take(&mut *self.unpin_flags.lock().unwrap()); let rel_block_num = self.rel_block_num; @@ -307,18 +315,14 @@ impl FollowStreamUnpin { let mut blocks_to_unpin = vec![]; for (hash, details) in &self.pinned { - if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life { + if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life + || unpin_flags.contains(&hash) + { // The block is too old, or it's been flagged to be unpinned (no refs to it left) blocks_to_unpin.push(*hash); } } - for hash in pruned_hashes { - if unpin_flags.contains(&hash) { - blocks_to_unpin.push(hash); - } - } - // No need to call the waker etc if nothing to do: if blocks_to_unpin.is_empty() { return; @@ -355,6 +359,7 @@ struct PinnedDetails { #[derive(Debug, Clone)] pub struct BlockRef { inner: Arc>, + should_unpin: bool, } #[derive(Debug)] @@ -373,6 +378,7 @@ impl BlockRef { hash, unpin_flags: Default::default(), }), + should_unpin: false, } } @@ -400,7 +406,7 @@ impl Drop for BlockRef { // only "external" one left and we should ask to unpin it now. if it's // the only ref remaining, it means that it's already been unpinned, so // nothing to do here anyway. - if Arc::strong_count(&self.inner) == 2 { + if self.should_unpin && Arc::strong_count(&self.inner) == 2 { if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() { unpin_flags.insert(self.inner.hash); } @@ -520,11 +526,11 @@ mod test { || { [ Ok(ev_initialized(0)), - Ok(ev_finalized([1])), - Ok(ev_finalized([2])), - Ok(ev_finalized([3])), - Ok(ev_finalized([4])), - Ok(ev_finalized([5])), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [])), + Ok(ev_finalized([3], [])), + Ok(ev_finalized([4], [])), + Ok(ev_finalized([5], [])), Err(Error::Other("ended".to_owned())), ] }, @@ -569,8 +575,10 @@ mod test { || { [ Ok(ev_initialized(0)), - Ok(ev_finalized([1])), - Ok(ev_finalized([2])), + Ok(ev_new_block(0, 1)), + Ok(ev_new_block(1, 2)), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [])), Err(Error::Other("ended".to_owned())), ] }, @@ -579,8 +587,13 @@ mod test { let _r = follow_unpin.next().await.unwrap().unwrap(); let _i0 = follow_unpin.next().await.unwrap().unwrap(); - let f1 = follow_unpin.next().await.unwrap().unwrap(); + let n1 = follow_unpin.next().await.unwrap().unwrap(); + drop(n1); + let n2 = follow_unpin.next().await.unwrap().unwrap(); + drop(n2); + + let f1 = follow_unpin.next().await.unwrap().unwrap(); // We don't care about block 1 any more; drop it. unpinning happens when the max_life is exceeded. drop(f1); @@ -592,6 +605,51 @@ mod test { assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); } + #[tokio::test] + async fn pruned_blocks_should_get_unpinned() { + let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_new_block(1, 2)), + Ok(ev_new_block(1, 3)), + Ok(ev_finalized([1], [3])), + Ok(ev_finalized([2], [])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let _r = follow_unpin.next().await.unwrap().unwrap(); + let _i0 = follow_unpin.next().await.unwrap().unwrap(); + + let n1 = follow_unpin.next().await.unwrap().unwrap(); + drop(n1); + let n2 = follow_unpin.next().await.unwrap().unwrap(); + drop(n2); + let n3 = follow_unpin.next().await.unwrap().unwrap(); + drop(n3); + + let f1 = follow_unpin.next().await.unwrap().unwrap(); + drop(f1); + let f2 = follow_unpin.next().await.unwrap().unwrap(); + drop(f2); + + let (hash, _) = unpin_rx + .try_recv() + .expect("unpin call should have happened"); + assert_eq!(hash, H256::from_low_u64_le(3)); + + // Confirm that 0 and 2 are still pinned and that 1 isnt. + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(0))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + // Block 3 was pruned, so should be unpinned by the second finalized event. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(3))); + } + #[tokio::test] async fn never_unpin_new_block_before_finalized() { // Ensure that if we drop a new block; the pinning is still active until the block is finalized. @@ -602,9 +660,9 @@ mod test { Ok(ev_new_block(0, 1)), Ok(ev_new_block(1, 2)), Ok(ev_best_block(1)), - Ok(ev_finalized([1])), - Ok(ev_finalized([2])), - Ok(ev_finalized([3])), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [])), + Ok(ev_finalized([3], [])), Err(Error::Other("ended".to_owned())), ] },