fix: don't unpin blocks that may show up again (#1368)

* backend(fix): Early unpin for pruned blocks with no active `BlockRef`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend/tests: Check unpinning only after max_life time

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend/tests: Remove unpinning when droped tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend/tests: Ensure new blocks are not unpinned

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend: Add only pruned blocks to unpin hashset and add a test

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend: Introduce the `UnpinPolicy`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update subxt/src/backend/unstable/follow_stream_unpin.rs

* backend: Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Only unpin blocks that have been pruned by the backend

* Fix comments

* Mark initialized and finalized blocks as 'can_be_unpinned' too and tests

* fmt

* tweak a couple more comments

* tidy tests and fix undeterministic check

* Fix wrong names in comments

* tweak another test to focus it a bit more

* clippy

* wee rename of args in a couple of test helpers

* add some logs and simplify submit_transaction in the same way Alex did

* ditch logging again and keep to the experiment pr

* cargo fmt

* reduce CI timeouts to 30mins

* Handle Init and Stop events in submit_transaction too, just in case

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
This commit is contained in:
James Wilson
2024-01-17 09:35:02 +00:00
committed by GitHub
parent df85e46a62
commit 553343548e
5 changed files with 268 additions and 100 deletions
+4 -4
View File
@@ -159,7 +159,7 @@ jobs:
tests:
name: "Test (Native)"
runs-on: ubuntu-latest-16-cores
timeout-minutes: 45
timeout-minutes: 30
steps:
- name: Checkout sources
uses: actions/checkout@v4
@@ -189,7 +189,7 @@ jobs:
unstable_backend_tests:
name: "Test (Unstable Backend)"
runs-on: ubuntu-latest-16-cores
timeout-minutes: 45
timeout-minutes: 30
steps:
- name: Checkout sources
uses: actions/checkout@v4
@@ -219,7 +219,7 @@ jobs:
light_client_tests:
name: "Test (Light Client)"
runs-on: ubuntu-latest-16-cores
timeout-minutes: 45
timeout-minutes: 30
steps:
- name: Checkout sources
uses: actions/checkout@v4
@@ -246,7 +246,7 @@ jobs:
wasm_tests:
name: Test (WASM)
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 30
env:
# Set timeout for wasm tests to be much bigger than the default 20 secs.
WASM_BINDGEN_TEST_TIMEOUT: 300
+11 -5
View File
@@ -249,9 +249,9 @@ pub(super) mod test_utils {
}
/// A new block event
pub fn ev_new_block(parent: u64, n: u64) -> FollowEvent<H256> {
pub fn ev_new_block(parent_n: u64, n: u64) -> FollowEvent<H256> {
FollowEvent::NewBlock(NewBlock {
parent_block_hash: H256::from_low_u64_le(parent),
parent_block_hash: H256::from_low_u64_le(parent_n),
block_hash: H256::from_low_u64_le(n),
new_runtime: None,
})
@@ -265,10 +265,16 @@ pub(super) mod test_utils {
}
/// A finalized event
pub fn ev_finalized(ns: impl IntoIterator<Item = u64>) -> FollowEvent<H256> {
pub fn ev_finalized(
finalized_ns: impl IntoIterator<Item = u64>,
pruned_ns: impl IntoIterator<Item = u64>,
) -> FollowEvent<H256> {
FollowEvent::Finalized(Finalized {
finalized_block_hashes: ns.into_iter().map(H256::from_low_u64_le).collect(),
pruned_block_hashes: vec![],
finalized_block_hashes: finalized_ns
.into_iter()
.map(H256::from_low_u64_le)
.collect(),
pruned_block_hashes: pruned_ns.into_iter().map(H256::from_low_u64_le).collect(),
})
}
}
@@ -424,7 +424,7 @@ mod test {
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([1], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -465,7 +465,7 @@ mod test {
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([1], [])),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Err(Error::Other("ended".to_owned())),
@@ -517,7 +517,7 @@ mod test {
Ok(ev_best_block(1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Ok(ev_finalized([1])),
Ok(ev_finalized([1], [])),
Err(Error::Other("ended".to_owned())),
]
},
+226 -65
View File
@@ -109,7 +109,10 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
// The first finalized block gets the starting block_num.
let rel_block_num = this.rel_block_num;
let block_ref = this.pin_block_at(rel_block_num, details.finalized_block_hash);
// Pin this block, but note that it can be unpinned any time since it won't show up again (except
// as a parent block, which we are ignoring at the moment).
let block_ref =
this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash);
FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
finalized_block_hash: block_ref,
@@ -155,8 +158,11 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
// These blocks _should_ exist already and so will have a known block num,
// but if they don't, we just increment the num from the last finalized block
// we saw, which should be accurate.
//
// `pin_unpinnable_block_at` indicates that the block will not show up in future events
// (They will show up as a parent block, but we don't care about that right now).
let rel_block_num = this.rel_block_num + idx + 1;
this.pin_block_at(rel_block_num, hash)
this.pin_unpinnable_block_at(rel_block_num, hash)
})
.collect();
@@ -168,16 +174,19 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
.pruned_block_hashes
.into_iter()
.map(|hash| {
// We should know about these, too, and if not we set their age to last_finalized + 1
// We should know about these, too, and if not we set their age to last_finalized + 1.
//
// `pin_unpinnable_block_at` indicates that the block will not show up in future events.
let rel_block_num = this.rel_block_num + 1;
this.pin_block_at(rel_block_num, hash)
this.pin_unpinnable_block_at(rel_block_num, hash)
})
.collect();
// At this point, we also check to see which blocks we should submit unpin events
// for. When we see a block hash as finalized, we know that it won't be reported again
// (except as a parent hash of a new block), so we can safely make an unpin call for it
// without worrying about the hash being returned again despite the block not being pinned.
// for. We will unpin:
// - Any block that's older than the max age.
// - Any block that has no references left (ie has been dropped) that _also_ has
// showed up in the pruned list in a finalized event (so it will never be in another event).
this.unpin_blocks(cx.waker());
FollowStreamMsg::Event(FollowEvent::Finalized(Finalized {
@@ -187,7 +196,9 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
}
FollowStreamMsg::Event(FollowEvent::Stop) => {
// clear out "old" things that are no longer applicable since
// the subscription has ended (a new one will be created under the hood).
// the subscription has ended (a new one will be created under the hood, at
// which point we'll get given a new subscription ID.
this.subscription_id = None;
this.pinned.clear();
this.unpin_futs.clear();
this.unpin_flags.lock().unwrap().clear();
@@ -271,11 +282,31 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already
/// been sent though, then the block will be unpinned.
fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef<Hash> {
self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, false)
}
/// Pin a block, or return the reference to an already-pinned block.
///
/// This is the same as [`Self::pin_block_at`], except that it also marks the block as being unpinnable now,
/// which should be done for any block that will no longer be seen in future events.
fn pin_unpinnable_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef<Hash> {
self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, true)
}
fn pin_block_at_setting_unpinnable_flag(
&mut self,
rel_block_num: usize,
hash: Hash,
can_be_unpinned: bool,
) -> BlockRef<Hash> {
let entry = self
.pinned
.entry(hash)
// Only if there's already an entry do we need to clear any unpin flags set against it.
.and_modify(|_| {
// If there's already an entry, then clear any unpin_flags and update the
// can_be_unpinned status (this can become true but cannot become false again
// once true).
.and_modify(|entry| {
entry.can_be_unpinned = entry.can_be_unpinned || can_be_unpinned;
self.unpin_flags.lock().unwrap().remove(&hash);
})
// If there's not an entry already, make one and return it.
@@ -287,6 +318,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
unpin_flags: self.unpin_flags.clone(),
}),
},
can_be_unpinned,
});
entry.block_ref.clone()
@@ -294,11 +326,11 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Unpin any blocks that are either too old, or have the unpin flag set and are old enough.
fn unpin_blocks(&mut self, waker: &Waker) {
let unpin_flags = std::mem::take(&mut *self.unpin_flags.lock().unwrap());
let mut unpin_flags = self.unpin_flags.lock().unwrap();
let rel_block_num = self.rel_block_num;
// If we asked to unpin and there was no subscription_id, then there's nothing to
// do here, and we've cleared the flags now above anyway.
// If we asked to unpin and there was no subscription_id, then there's nothing we can do,
// and nothing will need unpinning now anyway.
let Some(sub_id) = &self.subscription_id else {
return;
};
@@ -306,13 +338,19 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
let mut blocks_to_unpin = vec![];
for (hash, details) in &self.pinned {
if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life
|| unpin_flags.contains(hash)
|| (unpin_flags.contains(hash) && details.can_be_unpinned)
{
// The block is too old, or it's been flagged to be unpinned (no refs to it left)
// The block is too old, or it's been flagged to be unpinned and won't be in a future
// backend event, so we can unpin it for real now.
blocks_to_unpin.push(*hash);
// Clear it from our unpin flags if present so that we don't try to unpin it again.
unpin_flags.remove(hash);
}
}
// Release our lock on unpin_flags ASAP.
drop(unpin_flags);
// No need to call the waker etc if nothing to do:
if blocks_to_unpin.is_empty() {
return;
@@ -343,6 +381,11 @@ struct PinnedDetails<Hash: BlockHash> {
/// Because we store one here until it's unpinned, the live count
/// will only drop to 1 when no external refs are left.
block_ref: BlockRef<Hash>,
/// Has this block showed up in the list of pruned blocks, or has it
/// been finalized? In this case, it can now been pinned as it won't
/// show up again in future events (except as a "parent block" of some
/// new block, which we're currently ignoring).
can_be_unpinned: bool,
}
/// All blocks reported will be wrapped in this.
@@ -433,6 +476,23 @@ pub(super) mod test_utils {
(follow_unpin, unpin_rx)
}
/// Assert that the unpinned blocks sent from the `UnpinRx` channel match the items given.
pub fn assert_from_unpin_rx<Hash: BlockHash + 'static>(
unpin_rx: &UnpinRx<Hash>,
items: impl IntoIterator<Item = Hash>,
) {
let expected_hashes = HashSet::<Hash>::from_iter(items);
for i in 0..expected_hashes.len() {
let Ok((hash, _)) = unpin_rx.try_recv() else {
panic!("Another unpin event is expected, but failed to pull item {i} from channel");
};
assert!(
expected_hashes.contains(&hash),
"Hash {hash:?} was unpinned, but is not expected to have been"
);
}
}
/// An initialized event containing a BlockRef (useful for comparisons)
pub fn ev_initialized_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::Initialized(Initialized {
@@ -474,7 +534,7 @@ mod test {
use super::super::follow_stream::test_utils::{
ev_best_block, ev_finalized, ev_initialized, ev_new_block,
};
use super::test_utils::{ev_new_block_ref, test_unpin_stream_getter};
use super::test_utils::{assert_from_unpin_rx, ev_new_block_ref, test_unpin_stream_getter};
use super::*;
use crate::config::substrate::H256;
@@ -508,17 +568,44 @@ mod test {
);
}
#[tokio::test]
async fn unpins_initialized_block() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1], [])),
Err(Error::Other("ended".to_owned())),
]
},
3,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
// Drop the initialized block:
let i0 = follow_unpin.next().await.unwrap().unwrap();
drop(i0);
// Let a finalization event occur.
let _f1 = follow_unpin.next().await.unwrap().unwrap();
// Now, initialized block should be unpinned.
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]);
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
}
#[tokio::test]
async fn unpins_old_blocks() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_finalized([3])),
Ok(ev_finalized([4])),
Ok(ev_finalized([5])),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Ok(ev_finalized([3], [])),
Ok(ev_finalized([4], [])),
Ok(ev_finalized([5], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -535,36 +622,29 @@ mod test {
let _f3 = follow_unpin.next().await.unwrap().unwrap();
// Max age is 3, so after block 3 finalized, block 0 becomes too old and is unpinned.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(0));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]);
let _f4 = follow_unpin.next().await.unwrap().unwrap();
// Block 1 is now too old and is unpinned.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(1));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);
let _f5 = follow_unpin.next().await.unwrap().unwrap();
// Block 2 is now too old and is unpinned.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(2));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(2)]);
}
#[tokio::test]
async fn unpins_dropped_blocks() {
async fn dropped_new_blocks_should_not_get_unpinned_until_finalization() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Err(Error::Other("ended".to_owned())),
]
},
@@ -573,62 +653,143 @@ mod test {
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
let f1 = follow_unpin.next().await.unwrap().unwrap();
// We don't care about block 1 any more; drop it. unpins happen at finalized evs.
let n1 = follow_unpin.next().await.unwrap().unwrap();
drop(n1);
let n2 = follow_unpin.next().await.unwrap().unwrap();
drop(n2);
// New blocks dropped but still pinned:
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
let f1 = follow_unpin.next().await.unwrap().unwrap();
drop(f1);
let _f2 = follow_unpin.next().await.unwrap().unwrap();
// Check that we get the expected unpin event.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(1));
// Confirm that 0 and 2 are still pinned and that 1 isnt.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
// After block 1 finalized, both blocks are still pinned because:
// - block 1 was handed back in the finalized event, so will be unpinned next time.
// - block 2 wasn't mentioned in the finalized event, so should not have been unpinned yet.
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
let f2 = follow_unpin.next().await.unwrap().unwrap();
drop(f2);
// After block 2 finalized, block 1 can be unpinned finally, but block 2 needs to wait one more event.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);
}
#[tokio::test]
async fn only_unpins_if_finalized_is_dropped() {
// If we drop the "new block" and "best block" BlockRefs,
// and then the block comes back as finalized (for instance),
// no unpin call should be made unless we also drop the finalized
// one.
async fn dropped_new_blocks_should_not_get_unpinned_until_pruned() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(1, 3)),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [3])),
Ok(ev_finalized([4], [])),
Err(Error::Other("ended".to_owned())),
]
},
100,
10,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
let n1 = follow_unpin.next().await.unwrap().unwrap();
drop(n1);
let n2 = follow_unpin.next().await.unwrap().unwrap();
drop(n2);
let n3 = follow_unpin.next().await.unwrap().unwrap();
drop(n3);
let f1 = follow_unpin.next().await.unwrap().unwrap();
drop(f1);
// After block 1 is finalized, everything is still pinned because the finalization event
// itself returns 1, and 2/3 aren't finalized or pruned yet.
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(3)));
let f2 = follow_unpin.next().await.unwrap().unwrap();
drop(f2);
// After the next finalization event, block 1 can finally be unpinned since it was Finalized
// last event _and_ is no longer handed back anywhere. 2 and 3 should still be pinned.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(3)));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);
let f4 = follow_unpin.next().await.unwrap().unwrap();
drop(f4);
// After some other finalized event, we are now allowed to ditch the previously pruned and
// finalized blocks 2 and 3.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(3)));
assert_from_unpin_rx(
&unpin_rx,
[H256::from_low_u64_le(2), H256::from_low_u64_le(3)],
);
}
#[tokio::test]
async fn never_unpin_new_block_before_finalized() {
// Ensure that if we drop a new block; the pinning is still active until the block is finalized.
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
// drop initialised block 0 and new block 1 and new block 2.
let i0 = follow_unpin.next().await.unwrap().unwrap();
drop(i0);
let n1 = follow_unpin.next().await.unwrap().unwrap();
drop(n1);
let n2 = follow_unpin.next().await.unwrap().unwrap();
drop(n2);
let b1 = follow_unpin.next().await.unwrap().unwrap();
drop(b1);
let f1 = follow_unpin.next().await.unwrap().unwrap();
// even though we dropped our block 1 in the new/best events, it won't be unpinned
// because it occurred again in finalized event.
// Nothing unpinned yet!
unpin_rx.try_recv().expect_err("nothing unpinned yet");
let f1 = follow_unpin.next().await.unwrap().unwrap();
drop(f1);
let _f2 = follow_unpin.next().await.unwrap().unwrap();
// Since we dropped the finalized block 1, we'll now unpin it when next block finalized.
let (hash, _) = unpin_rx.try_recv().expect("unpin should have happened now");
assert_eq!(hash, H256::from_low_u64_le(1));
// After finalization, block 1 is now ready to be unpinned (it won't be seen again),
// but isn't actually unpinned yet (because it was just handed back in f1). Block 0
// however has now been unpinned.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]);
unpin_rx.try_recv().expect_err("nothing unpinned yet");
let f2 = follow_unpin.next().await.unwrap().unwrap();
drop(f2);
// After f2, we can get rid of block 1 now, which was finalized last time.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);
unpin_rx.try_recv().expect_err("nothing unpinned yet");
}
}
+24 -23
View File
@@ -458,28 +458,13 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
// We care about new and finalized block hashes.
enum SeenBlock<Ref> {
New(Ref),
Finalized(Vec<Ref>),
}
enum SeenBlockMarker {
New,
Finalized,
}
// First, subscribe to all new and finalized block refs.
// - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we
// can try to return a block ref for the best block.
// - we subscribe to finalized refs so that when we see `Finalized`, we can
// guarantee that when we return here, the finalized block we report has been
// reported from chainHead_follow already.
let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| {
std::future::ready(match ev {
FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)),
FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)),
_ => None,
})
});
// First, subscribe to new blocks.
let mut seen_blocks_sub = self.follow_handle.subscribe().events();
// Then, submit the transaction.
let mut tx_progress = self
@@ -506,22 +491,39 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
SeenBlock::New(block_ref) => {
FollowEvent::Initialized(ev) => {
// Just in case this stream is really slow to start or something..
seen_blocks.insert(
ev.finalized_block_hash.hash(),
(SeenBlockMarker::Finalized, ev.finalized_block_hash),
);
}
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
if finalized_hash.is_none() {
seen_blocks
.insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));
seen_blocks.insert(
ev.block_hash.hash(),
(SeenBlockMarker::New, ev.block_hash),
);
}
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
FollowEvent::Finalized(ev) => {
for block_ref in ev.finalized_block_hashes {
seen_blocks.insert(
block_ref.hash(),
(SeenBlockMarker::Finalized, block_ref),
);
}
}
FollowEvent::Stop => {
// If we get this event, we'll lose all of our existing pinned blocks and have a gap
// in which we may lose the finaliuzed block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
// forever.
return Poll::Ready(Some(Err(Error::Other("chainHead_follow emitted 'stop' event during transaction submission".into()))));
}
_ => {}
}
continue;
}
@@ -555,7 +557,6 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ev))) => ev,
};
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
let ev = match ev {
rpc_methods::TransactionStatus::Finalized { block } => {