From b4cc97efb55620e29ab56f1fdb4e1a7daabd768f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 16 Jan 2024 13:19:35 +0200 Subject: [PATCH] backend: Introduce the `UnpinPolicy` Signed-off-by: Alexandru Vasile --- .../backend/unstable/follow_stream_unpin.rs | 56 +++++++++++++------ 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index 985ef6ff82..11ebe0615f 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -109,8 +109,11 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Event(FollowEvent::Initialized(details)) => { // The first finalized block gets the starting block_num. let rel_block_num = this.rel_block_num; - let block_ref = - this.pin_block_at(rel_block_num, details.finalized_block_hash, false); + let block_ref = this.pin_block_at( + rel_block_num, + details.finalized_block_hash, + UnpinPolicy::MaxAge, + ); FollowStreamMsg::Event(FollowEvent::Initialized(Initialized { finalized_block_hash: block_ref, @@ -127,10 +130,16 @@ impl Stream for FollowStreamUnpin { .map(|p| p.rel_block_num) .unwrap_or(this.rel_block_num); - let block_ref = - this.pin_block_at(parent_rel_block_num + 1, details.block_hash, false); - let parent_block_ref = - this.pin_block_at(parent_rel_block_num, details.parent_block_hash, false); + let block_ref = this.pin_block_at( + parent_rel_block_num + 1, + details.block_hash, + UnpinPolicy::MaxAge, + ); + let parent_block_ref = this.pin_block_at( + parent_rel_block_num, + details.parent_block_hash, + UnpinPolicy::MaxAge, + ); FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock { block_hash: block_ref, @@ -142,8 +151,11 @@ impl Stream for FollowStreamUnpin { // We expect this block to already exist, so it'll keep its existing block_num, // but worst case it'll just get the current finalized block_num + 1. let rel_block_num = this.rel_block_num + 1; - let block_ref = - this.pin_block_at(rel_block_num, details.best_block_hash, false); + let block_ref = this.pin_block_at( + rel_block_num, + details.best_block_hash, + UnpinPolicy::MaxAge, + ); FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_ref, @@ -159,7 +171,7 @@ impl Stream for FollowStreamUnpin { // but if they don't, we just increment the num from the last finalized block // we saw, which should be accurate. let rel_block_num = this.rel_block_num + idx + 1; - this.pin_block_at(rel_block_num, hash, false) + this.pin_block_at(rel_block_num, hash, UnpinPolicy::MaxAge) }) .collect(); @@ -172,8 +184,9 @@ impl Stream for FollowStreamUnpin { .into_iter() .map(|hash| { // We should know about these, too, and if not we set their age to last_finalized + 1 + // Set the unpinning policy to `OnDrop` to unpin the block when the last ref is dropped. let rel_block_num = this.rel_block_num + 1; - this.pin_block_at(rel_block_num, hash, true) + this.pin_block_at(rel_block_num, hash, UnpinPolicy::OnDrop) }) .collect(); @@ -277,14 +290,14 @@ impl FollowStreamUnpin { &mut self, rel_block_num: usize, hash: Hash, - should_unpin: bool, + unpin_policy: UnpinPolicy, ) -> BlockRef { let entry = self .pinned .entry(hash) // Only if there's already an entry do we need to clear any unpin flags set against it. .and_modify(|entry| { - entry.block_ref.should_unpin = should_unpin; + entry.block_ref.unpin_policy = unpin_policy; self.unpin_flags.lock().unwrap().remove(&hash); }) // If there's not an entry already, make one and return it. @@ -295,7 +308,7 @@ impl FollowStreamUnpin { hash, unpin_flags: self.unpin_flags.clone(), }), - should_unpin, + unpin_policy, }, }); @@ -355,11 +368,22 @@ struct PinnedDetails { block_ref: BlockRef, } +/// The policy to use for unpinning blocks. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum UnpinPolicy { + /// Unpin blocks when they are too old. + MaxAge, + /// Unpin blocks when they are dropped. + /// + /// Unpinning happens with the next finalized event. + OnDrop, +} + /// All blocks reported will be wrapped in this. #[derive(Debug, Clone)] pub struct BlockRef { inner: Arc>, - should_unpin: bool, + unpin_policy: UnpinPolicy, } #[derive(Debug)] @@ -378,7 +402,7 @@ impl BlockRef { hash, unpin_flags: Default::default(), }), - should_unpin: false, + unpin_policy: UnpinPolicy::MaxAge, } } @@ -406,7 +430,7 @@ impl Drop for BlockRef { // only "external" one left and we should ask to unpin it now. if it's // the only ref remaining, it means that it's already been unpinned, so // nothing to do here anyway. - if self.should_unpin && Arc::strong_count(&self.inner) == 2 { + if self.unpin_policy == UnpinPolicy::OnDrop && Arc::strong_count(&self.inner) == 2 { if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() { unpin_flags.insert(self.inner.hash); }