mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 16:21:06 +00:00
backend: Introduce the UnpinPolicy
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
@@ -109,8 +109,11 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
|||||||
FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
|
FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
|
||||||
// The first finalized block gets the starting block_num.
|
// The first finalized block gets the starting block_num.
|
||||||
let rel_block_num = this.rel_block_num;
|
let rel_block_num = this.rel_block_num;
|
||||||
let block_ref =
|
let block_ref = this.pin_block_at(
|
||||||
this.pin_block_at(rel_block_num, details.finalized_block_hash, false);
|
rel_block_num,
|
||||||
|
details.finalized_block_hash,
|
||||||
|
UnpinPolicy::MaxAge,
|
||||||
|
);
|
||||||
|
|
||||||
FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
|
FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: block_ref,
|
finalized_block_hash: block_ref,
|
||||||
@@ -127,10 +130,16 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
|||||||
.map(|p| p.rel_block_num)
|
.map(|p| p.rel_block_num)
|
||||||
.unwrap_or(this.rel_block_num);
|
.unwrap_or(this.rel_block_num);
|
||||||
|
|
||||||
let block_ref =
|
let block_ref = this.pin_block_at(
|
||||||
this.pin_block_at(parent_rel_block_num + 1, details.block_hash, false);
|
parent_rel_block_num + 1,
|
||||||
let parent_block_ref =
|
details.block_hash,
|
||||||
this.pin_block_at(parent_rel_block_num, details.parent_block_hash, false);
|
UnpinPolicy::MaxAge,
|
||||||
|
);
|
||||||
|
let parent_block_ref = this.pin_block_at(
|
||||||
|
parent_rel_block_num,
|
||||||
|
details.parent_block_hash,
|
||||||
|
UnpinPolicy::MaxAge,
|
||||||
|
);
|
||||||
|
|
||||||
FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock {
|
FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock {
|
||||||
block_hash: block_ref,
|
block_hash: block_ref,
|
||||||
@@ -142,8 +151,11 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
|||||||
// We expect this block to already exist, so it'll keep its existing block_num,
|
// We expect this block to already exist, so it'll keep its existing block_num,
|
||||||
// but worst case it'll just get the current finalized block_num + 1.
|
// but worst case it'll just get the current finalized block_num + 1.
|
||||||
let rel_block_num = this.rel_block_num + 1;
|
let rel_block_num = this.rel_block_num + 1;
|
||||||
let block_ref =
|
let block_ref = this.pin_block_at(
|
||||||
this.pin_block_at(rel_block_num, details.best_block_hash, false);
|
rel_block_num,
|
||||||
|
details.best_block_hash,
|
||||||
|
UnpinPolicy::MaxAge,
|
||||||
|
);
|
||||||
|
|
||||||
FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged {
|
FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged {
|
||||||
best_block_hash: block_ref,
|
best_block_hash: block_ref,
|
||||||
@@ -159,7 +171,7 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
|||||||
// but if they don't, we just increment the num from the last finalized block
|
// but if they don't, we just increment the num from the last finalized block
|
||||||
// we saw, which should be accurate.
|
// we saw, which should be accurate.
|
||||||
let rel_block_num = this.rel_block_num + idx + 1;
|
let rel_block_num = this.rel_block_num + idx + 1;
|
||||||
this.pin_block_at(rel_block_num, hash, false)
|
this.pin_block_at(rel_block_num, hash, UnpinPolicy::MaxAge)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -172,8 +184,9 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|hash| {
|
.map(|hash| {
|
||||||
// We should know about these, too, and if not we set their age to last_finalized + 1
|
// We should know about these, too, and if not we set their age to last_finalized + 1
|
||||||
|
// Set the unpinning policy to `OnDrop` to unpin the block when the last ref is dropped.
|
||||||
let rel_block_num = this.rel_block_num + 1;
|
let rel_block_num = this.rel_block_num + 1;
|
||||||
this.pin_block_at(rel_block_num, hash, true)
|
this.pin_block_at(rel_block_num, hash, UnpinPolicy::OnDrop)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -277,14 +290,14 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
rel_block_num: usize,
|
rel_block_num: usize,
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
should_unpin: bool,
|
unpin_policy: UnpinPolicy,
|
||||||
) -> BlockRef<Hash> {
|
) -> BlockRef<Hash> {
|
||||||
let entry = self
|
let entry = self
|
||||||
.pinned
|
.pinned
|
||||||
.entry(hash)
|
.entry(hash)
|
||||||
// Only if there's already an entry do we need to clear any unpin flags set against it.
|
// Only if there's already an entry do we need to clear any unpin flags set against it.
|
||||||
.and_modify(|entry| {
|
.and_modify(|entry| {
|
||||||
entry.block_ref.should_unpin = should_unpin;
|
entry.block_ref.unpin_policy = unpin_policy;
|
||||||
self.unpin_flags.lock().unwrap().remove(&hash);
|
self.unpin_flags.lock().unwrap().remove(&hash);
|
||||||
})
|
})
|
||||||
// If there's not an entry already, make one and return it.
|
// If there's not an entry already, make one and return it.
|
||||||
@@ -295,7 +308,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
|||||||
hash,
|
hash,
|
||||||
unpin_flags: self.unpin_flags.clone(),
|
unpin_flags: self.unpin_flags.clone(),
|
||||||
}),
|
}),
|
||||||
should_unpin,
|
unpin_policy,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -355,11 +368,22 @@ struct PinnedDetails<Hash: BlockHash> {
|
|||||||
block_ref: BlockRef<Hash>,
|
block_ref: BlockRef<Hash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The policy to use for unpinning blocks.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
enum UnpinPolicy {
|
||||||
|
/// Unpin blocks when they are too old.
|
||||||
|
MaxAge,
|
||||||
|
/// Unpin blocks when they are dropped.
|
||||||
|
///
|
||||||
|
/// Unpinning happens with the next finalized event.
|
||||||
|
OnDrop,
|
||||||
|
}
|
||||||
|
|
||||||
/// All blocks reported will be wrapped in this.
|
/// All blocks reported will be wrapped in this.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct BlockRef<Hash: BlockHash> {
|
pub struct BlockRef<Hash: BlockHash> {
|
||||||
inner: Arc<BlockRefInner<Hash>>,
|
inner: Arc<BlockRefInner<Hash>>,
|
||||||
should_unpin: bool,
|
unpin_policy: UnpinPolicy,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -378,7 +402,7 @@ impl<Hash: BlockHash> BlockRef<Hash> {
|
|||||||
hash,
|
hash,
|
||||||
unpin_flags: Default::default(),
|
unpin_flags: Default::default(),
|
||||||
}),
|
}),
|
||||||
should_unpin: false,
|
unpin_policy: UnpinPolicy::MaxAge,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -406,7 +430,7 @@ impl<Hash: BlockHash> Drop for BlockRef<Hash> {
|
|||||||
// only "external" one left and we should ask to unpin it now. if it's
|
// only "external" one left and we should ask to unpin it now. if it's
|
||||||
// the only ref remaining, it means that it's already been unpinned, so
|
// the only ref remaining, it means that it's already been unpinned, so
|
||||||
// nothing to do here anyway.
|
// nothing to do here anyway.
|
||||||
if self.should_unpin && Arc::strong_count(&self.inner) == 2 {
|
if self.unpin_policy == UnpinPolicy::OnDrop && Arc::strong_count(&self.inner) == 2 {
|
||||||
if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() {
|
if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() {
|
||||||
unpin_flags.insert(self.inner.hash);
|
unpin_flags.insert(self.inner.hash);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user