diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c419251d01..225b5e8158 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -159,7 +159,7 @@ jobs: tests: name: "Test (Native)" runs-on: ubuntu-latest-16-cores - timeout-minutes: 45 + timeout-minutes: 30 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -189,7 +189,7 @@ jobs: unstable_backend_tests: name: "Test (Unstable Backend)" runs-on: ubuntu-latest-16-cores - timeout-minutes: 45 + timeout-minutes: 30 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -219,7 +219,7 @@ jobs: light_client_tests: name: "Test (Light Client)" runs-on: ubuntu-latest-16-cores - timeout-minutes: 45 + timeout-minutes: 30 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -246,7 +246,7 @@ jobs: wasm_tests: name: Test (WASM) runs-on: ubuntu-latest - timeout-minutes: 45 + timeout-minutes: 30 env: # Set timeout for wasm tests to be much bigger than the default 20 secs. WASM_BINDGEN_TEST_TIMEOUT: 300 diff --git a/subxt/src/backend/unstable/follow_stream.rs b/subxt/src/backend/unstable/follow_stream.rs index dbd2616328..9474f7a302 100644 --- a/subxt/src/backend/unstable/follow_stream.rs +++ b/subxt/src/backend/unstable/follow_stream.rs @@ -249,9 +249,9 @@ pub(super) mod test_utils { } /// A new block event - pub fn ev_new_block(parent: u64, n: u64) -> FollowEvent { + pub fn ev_new_block(parent_n: u64, n: u64) -> FollowEvent { FollowEvent::NewBlock(NewBlock { - parent_block_hash: H256::from_low_u64_le(parent), + parent_block_hash: H256::from_low_u64_le(parent_n), block_hash: H256::from_low_u64_le(n), new_runtime: None, }) @@ -265,10 +265,16 @@ pub(super) mod test_utils { } /// A finalized event - pub fn ev_finalized(ns: impl IntoIterator) -> FollowEvent { + pub fn ev_finalized( + finalized_ns: impl IntoIterator, + pruned_ns: impl IntoIterator, + ) -> FollowEvent { FollowEvent::Finalized(Finalized { - finalized_block_hashes: ns.into_iter().map(H256::from_low_u64_le).collect(), - pruned_block_hashes: vec![], + finalized_block_hashes: finalized_ns + .into_iter() + .map(H256::from_low_u64_le) + .collect(), + pruned_block_hashes: pruned_ns.into_iter().map(H256::from_low_u64_le).collect(), }) } } diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index 028065f941..fdca29723a 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -424,7 +424,7 @@ mod test { Ok(ev_initialized(0)), Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), - Ok(ev_finalized([1])), + Ok(ev_finalized([1], [])), Err(Error::Other("ended".to_owned())), ] }, @@ -465,7 +465,7 @@ mod test { Ok(ev_initialized(0)), Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), - Ok(ev_finalized([1])), + Ok(ev_finalized([1], [])), Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), Err(Error::Other("ended".to_owned())), @@ -517,7 +517,7 @@ mod test { Ok(ev_best_block(1)), Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), - Ok(ev_finalized([1])), + Ok(ev_finalized([1], [])), Err(Error::Other("ended".to_owned())), ] }, diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index 7b54f642dc..3f055f9f4d 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -109,7 +109,10 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Event(FollowEvent::Initialized(details)) => { // The first finalized block gets the starting block_num. let rel_block_num = this.rel_block_num; - let block_ref = this.pin_block_at(rel_block_num, details.finalized_block_hash); + // Pin this block, but note that it can be unpinned any time since it won't show up again (except + // as a parent block, which we are ignoring at the moment). + let block_ref = + this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash); FollowStreamMsg::Event(FollowEvent::Initialized(Initialized { finalized_block_hash: block_ref, @@ -155,8 +158,11 @@ impl Stream for FollowStreamUnpin { // These blocks _should_ exist already and so will have a known block num, // but if they don't, we just increment the num from the last finalized block // we saw, which should be accurate. + // + // `pin_unpinnable_block_at` indicates that the block will not show up in future events + // (They will show up as a parent block, but we don't care about that right now). let rel_block_num = this.rel_block_num + idx + 1; - this.pin_block_at(rel_block_num, hash) + this.pin_unpinnable_block_at(rel_block_num, hash) }) .collect(); @@ -168,16 +174,19 @@ impl Stream for FollowStreamUnpin { .pruned_block_hashes .into_iter() .map(|hash| { - // We should know about these, too, and if not we set their age to last_finalized + 1 + // We should know about these, too, and if not we set their age to last_finalized + 1. + // + // `pin_unpinnable_block_at` indicates that the block will not show up in future events. let rel_block_num = this.rel_block_num + 1; - this.pin_block_at(rel_block_num, hash) + this.pin_unpinnable_block_at(rel_block_num, hash) }) .collect(); // At this point, we also check to see which blocks we should submit unpin events - // for. When we see a block hash as finalized, we know that it won't be reported again - // (except as a parent hash of a new block), so we can safely make an unpin call for it - // without worrying about the hash being returned again despite the block not being pinned. + // for. We will unpin: + // - Any block that's older than the max age. + // - Any block that has no references left (ie has been dropped) that _also_ has + // showed up in the pruned list in a finalized event (so it will never be in another event). this.unpin_blocks(cx.waker()); FollowStreamMsg::Event(FollowEvent::Finalized(Finalized { @@ -187,7 +196,9 @@ impl Stream for FollowStreamUnpin { } FollowStreamMsg::Event(FollowEvent::Stop) => { // clear out "old" things that are no longer applicable since - // the subscription has ended (a new one will be created under the hood). + // the subscription has ended (a new one will be created under the hood, at + // which point we'll get given a new subscription ID. + this.subscription_id = None; this.pinned.clear(); this.unpin_futs.clear(); this.unpin_flags.lock().unwrap().clear(); @@ -271,11 +282,31 @@ impl FollowStreamUnpin { /// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already /// been sent though, then the block will be unpinned. fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef { + self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, false) + } + + /// Pin a block, or return the reference to an already-pinned block. + /// + /// This is the same as [`Self::pin_block_at`], except that it also marks the block as being unpinnable now, + /// which should be done for any block that will no longer be seen in future events. + fn pin_unpinnable_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef { + self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, true) + } + + fn pin_block_at_setting_unpinnable_flag( + &mut self, + rel_block_num: usize, + hash: Hash, + can_be_unpinned: bool, + ) -> BlockRef { let entry = self .pinned .entry(hash) - // Only if there's already an entry do we need to clear any unpin flags set against it. - .and_modify(|_| { + // If there's already an entry, then clear any unpin_flags and update the + // can_be_unpinned status (this can become true but cannot become false again + // once true). + .and_modify(|entry| { + entry.can_be_unpinned = entry.can_be_unpinned || can_be_unpinned; self.unpin_flags.lock().unwrap().remove(&hash); }) // If there's not an entry already, make one and return it. @@ -287,6 +318,7 @@ impl FollowStreamUnpin { unpin_flags: self.unpin_flags.clone(), }), }, + can_be_unpinned, }); entry.block_ref.clone() @@ -294,11 +326,11 @@ impl FollowStreamUnpin { /// Unpin any blocks that are either too old, or have the unpin flag set and are old enough. fn unpin_blocks(&mut self, waker: &Waker) { - let unpin_flags = std::mem::take(&mut *self.unpin_flags.lock().unwrap()); + let mut unpin_flags = self.unpin_flags.lock().unwrap(); let rel_block_num = self.rel_block_num; - // If we asked to unpin and there was no subscription_id, then there's nothing to - // do here, and we've cleared the flags now above anyway. + // If we asked to unpin and there was no subscription_id, then there's nothing we can do, + // and nothing will need unpinning now anyway. let Some(sub_id) = &self.subscription_id else { return; }; @@ -306,13 +338,19 @@ impl FollowStreamUnpin { let mut blocks_to_unpin = vec![]; for (hash, details) in &self.pinned { if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life - || unpin_flags.contains(hash) + || (unpin_flags.contains(hash) && details.can_be_unpinned) { - // The block is too old, or it's been flagged to be unpinned (no refs to it left) + // The block is too old, or it's been flagged to be unpinned and won't be in a future + // backend event, so we can unpin it for real now. blocks_to_unpin.push(*hash); + // Clear it from our unpin flags if present so that we don't try to unpin it again. + unpin_flags.remove(hash); } } + // Release our lock on unpin_flags ASAP. + drop(unpin_flags); + // No need to call the waker etc if nothing to do: if blocks_to_unpin.is_empty() { return; @@ -343,6 +381,11 @@ struct PinnedDetails { /// Because we store one here until it's unpinned, the live count /// will only drop to 1 when no external refs are left. block_ref: BlockRef, + /// Has this block showed up in the list of pruned blocks, or has it + /// been finalized? In this case, it can now been pinned as it won't + /// show up again in future events (except as a "parent block" of some + /// new block, which we're currently ignoring). + can_be_unpinned: bool, } /// All blocks reported will be wrapped in this. @@ -433,6 +476,23 @@ pub(super) mod test_utils { (follow_unpin, unpin_rx) } + /// Assert that the unpinned blocks sent from the `UnpinRx` channel match the items given. + pub fn assert_from_unpin_rx( + unpin_rx: &UnpinRx, + items: impl IntoIterator, + ) { + let expected_hashes = HashSet::::from_iter(items); + for i in 0..expected_hashes.len() { + let Ok((hash, _)) = unpin_rx.try_recv() else { + panic!("Another unpin event is expected, but failed to pull item {i} from channel"); + }; + assert!( + expected_hashes.contains(&hash), + "Hash {hash:?} was unpinned, but is not expected to have been" + ); + } + } + /// An initialized event containing a BlockRef (useful for comparisons) pub fn ev_initialized_ref(n: u64) -> FollowEvent> { FollowEvent::Initialized(Initialized { @@ -474,7 +534,7 @@ mod test { use super::super::follow_stream::test_utils::{ ev_best_block, ev_finalized, ev_initialized, ev_new_block, }; - use super::test_utils::{ev_new_block_ref, test_unpin_stream_getter}; + use super::test_utils::{assert_from_unpin_rx, ev_new_block_ref, test_unpin_stream_getter}; use super::*; use crate::config::substrate::H256; @@ -508,17 +568,44 @@ mod test { ); } + #[tokio::test] + async fn unpins_initialized_block() { + let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_finalized([1], [])), + Err(Error::Other("ended".to_owned())), + ] + }, + 3, + ); + + let _r = follow_unpin.next().await.unwrap().unwrap(); + + // Drop the initialized block: + let i0 = follow_unpin.next().await.unwrap().unwrap(); + drop(i0); + + // Let a finalization event occur. + let _f1 = follow_unpin.next().await.unwrap().unwrap(); + + // Now, initialized block should be unpinned. + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]); + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(0))); + } + #[tokio::test] async fn unpins_old_blocks() { let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( || { [ Ok(ev_initialized(0)), - Ok(ev_finalized([1])), - Ok(ev_finalized([2])), - Ok(ev_finalized([3])), - Ok(ev_finalized([4])), - Ok(ev_finalized([5])), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [])), + Ok(ev_finalized([3], [])), + Ok(ev_finalized([4], [])), + Ok(ev_finalized([5], [])), Err(Error::Other("ended".to_owned())), ] }, @@ -535,36 +622,29 @@ mod test { let _f3 = follow_unpin.next().await.unwrap().unwrap(); // Max age is 3, so after block 3 finalized, block 0 becomes too old and is unpinned. - let (hash, _) = unpin_rx - .try_recv() - .expect("unpin call should have happened"); - assert_eq!(hash, H256::from_low_u64_le(0)); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]); let _f4 = follow_unpin.next().await.unwrap().unwrap(); // Block 1 is now too old and is unpinned. - let (hash, _) = unpin_rx - .try_recv() - .expect("unpin call should have happened"); - assert_eq!(hash, H256::from_low_u64_le(1)); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]); let _f5 = follow_unpin.next().await.unwrap().unwrap(); // Block 2 is now too old and is unpinned. - let (hash, _) = unpin_rx - .try_recv() - .expect("unpin call should have happened"); - assert_eq!(hash, H256::from_low_u64_le(2)); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(2)]); } #[tokio::test] - async fn unpins_dropped_blocks() { + async fn dropped_new_blocks_should_not_get_unpinned_until_finalization() { let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( || { [ Ok(ev_initialized(0)), - Ok(ev_finalized([1])), - Ok(ev_finalized([2])), + Ok(ev_new_block(0, 1)), + Ok(ev_new_block(1, 2)), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [])), Err(Error::Other("ended".to_owned())), ] }, @@ -573,62 +653,143 @@ mod test { let _r = follow_unpin.next().await.unwrap().unwrap(); let _i0 = follow_unpin.next().await.unwrap().unwrap(); - let f1 = follow_unpin.next().await.unwrap().unwrap(); - // We don't care about block 1 any more; drop it. unpins happen at finalized evs. + let n1 = follow_unpin.next().await.unwrap().unwrap(); + drop(n1); + let n2 = follow_unpin.next().await.unwrap().unwrap(); + drop(n2); + + // New blocks dropped but still pinned: + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + + let f1 = follow_unpin.next().await.unwrap().unwrap(); drop(f1); - let _f2 = follow_unpin.next().await.unwrap().unwrap(); - - // Check that we get the expected unpin event. - let (hash, _) = unpin_rx - .try_recv() - .expect("unpin call should have happened"); - assert_eq!(hash, H256::from_low_u64_le(1)); - - // Confirm that 0 and 2 are still pinned and that 1 isnt. - assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1))); - assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(0))); + // After block 1 finalized, both blocks are still pinned because: + // - block 1 was handed back in the finalized event, so will be unpinned next time. + // - block 2 wasn't mentioned in the finalized event, so should not have been unpinned yet. + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1))); assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + + let f2 = follow_unpin.next().await.unwrap().unwrap(); + drop(f2); + + // After block 2 finalized, block 1 can be unpinned finally, but block 2 needs to wait one more event. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]); } #[tokio::test] - async fn only_unpins_if_finalized_is_dropped() { - // If we drop the "new block" and "best block" BlockRefs, - // and then the block comes back as finalized (for instance), - // no unpin call should be made unless we also drop the finalized - // one. + async fn dropped_new_blocks_should_not_get_unpinned_until_pruned() { let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( || { [ Ok(ev_initialized(0)), Ok(ev_new_block(0, 1)), - Ok(ev_best_block(1)), - Ok(ev_finalized([1])), - Ok(ev_finalized([2])), + Ok(ev_new_block(1, 2)), + Ok(ev_new_block(1, 3)), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [3])), + Ok(ev_finalized([4], [])), Err(Error::Other("ended".to_owned())), ] }, - 100, + 10, ); let _r = follow_unpin.next().await.unwrap().unwrap(); let _i0 = follow_unpin.next().await.unwrap().unwrap(); + let n1 = follow_unpin.next().await.unwrap().unwrap(); drop(n1); + let n2 = follow_unpin.next().await.unwrap().unwrap(); + drop(n2); + let n3 = follow_unpin.next().await.unwrap().unwrap(); + drop(n3); + + let f1 = follow_unpin.next().await.unwrap().unwrap(); + drop(f1); + + // After block 1 is finalized, everything is still pinned because the finalization event + // itself returns 1, and 2/3 aren't finalized or pruned yet. + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(3))); + + let f2 = follow_unpin.next().await.unwrap().unwrap(); + drop(f2); + + // After the next finalization event, block 1 can finally be unpinned since it was Finalized + // last event _and_ is no longer handed back anywhere. 2 and 3 should still be pinned. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(3))); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]); + + let f4 = follow_unpin.next().await.unwrap().unwrap(); + drop(f4); + + // After some other finalized event, we are now allowed to ditch the previously pruned and + // finalized blocks 2 and 3. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(3))); + assert_from_unpin_rx( + &unpin_rx, + [H256::from_low_u64_le(2), H256::from_low_u64_le(3)], + ); + } + + #[tokio::test] + async fn never_unpin_new_block_before_finalized() { + // Ensure that if we drop a new block; the pinning is still active until the block is finalized. + let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_new_block(1, 2)), + Ok(ev_best_block(1)), + Ok(ev_finalized([1], [])), + Ok(ev_finalized([2], [])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let _r = follow_unpin.next().await.unwrap().unwrap(); + + // drop initialised block 0 and new block 1 and new block 2. + let i0 = follow_unpin.next().await.unwrap().unwrap(); + drop(i0); + let n1 = follow_unpin.next().await.unwrap().unwrap(); + drop(n1); + let n2 = follow_unpin.next().await.unwrap().unwrap(); + drop(n2); let b1 = follow_unpin.next().await.unwrap().unwrap(); drop(b1); - let f1 = follow_unpin.next().await.unwrap().unwrap(); - // even though we dropped our block 1 in the new/best events, it won't be unpinned - // because it occurred again in finalized event. + // Nothing unpinned yet! unpin_rx.try_recv().expect_err("nothing unpinned yet"); + let f1 = follow_unpin.next().await.unwrap().unwrap(); drop(f1); - let _f2 = follow_unpin.next().await.unwrap().unwrap(); - // Since we dropped the finalized block 1, we'll now unpin it when next block finalized. - let (hash, _) = unpin_rx.try_recv().expect("unpin should have happened now"); - assert_eq!(hash, H256::from_low_u64_le(1)); + // After finalization, block 1 is now ready to be unpinned (it won't be seen again), + // but isn't actually unpinned yet (because it was just handed back in f1). Block 0 + // however has now been unpinned. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(0))); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]); + unpin_rx.try_recv().expect_err("nothing unpinned yet"); + + let f2 = follow_unpin.next().await.unwrap().unwrap(); + drop(f2); + + // After f2, we can get rid of block 1 now, which was finalized last time. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]); + unpin_rx.try_recv().expect_err("nothing unpinned yet"); } } diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 9067d7b6c7..a9fc8658a0 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -458,28 +458,13 @@ impl Backend for UnstableBackend { extrinsic: &[u8], ) -> Result>, Error> { // We care about new and finalized block hashes. - enum SeenBlock { - New(Ref), - Finalized(Vec), - } enum SeenBlockMarker { New, Finalized, } - // First, subscribe to all new and finalized block refs. - // - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we - // can try to return a block ref for the best block. - // - we subscribe to finalized refs so that when we see `Finalized`, we can - // guarantee that when we return here, the finalized block we report has been - // reported from chainHead_follow already. - let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| { - std::future::ready(match ev { - FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)), - FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)), - _ => None, - }) - }); + // First, subscribe to new blocks. + let mut seen_blocks_sub = self.follow_handle.subscribe().events(); // Then, submit the transaction. let mut tx_progress = self @@ -506,22 +491,39 @@ impl Backend for UnstableBackend { // Make a note of new or finalized blocks that have come in since we started the TX. if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { match seen_block { - SeenBlock::New(block_ref) => { + FollowEvent::Initialized(ev) => { + // Just in case this stream is really slow to start or something.. + seen_blocks.insert( + ev.finalized_block_hash.hash(), + (SeenBlockMarker::Finalized, ev.finalized_block_hash), + ); + } + FollowEvent::NewBlock(ev) => { // Optimization: once we have a `finalized_hash`, we only care about finalized // block refs now and can avoid bothering to save new blocks. if finalized_hash.is_none() { - seen_blocks - .insert(block_ref.hash(), (SeenBlockMarker::New, block_ref)); + seen_blocks.insert( + ev.block_hash.hash(), + (SeenBlockMarker::New, ev.block_hash), + ); } } - SeenBlock::Finalized(block_refs) => { - for block_ref in block_refs { + FollowEvent::Finalized(ev) => { + for block_ref in ev.finalized_block_hashes { seen_blocks.insert( block_ref.hash(), (SeenBlockMarker::Finalized, block_ref), ); } } + FollowEvent::Stop => { + // If we get this event, we'll lose all of our existing pinned blocks and have a gap + // in which we may lose the finaliuzed block that the TX is in. For now, just error if + // this happens, to prevent the case in which we never see a finalized block and wait + // forever. + return Poll::Ready(Some(Err(Error::Other("chainHead_follow emitted 'stop' event during transaction submission".into())))); + } + _ => {} } continue; } @@ -555,7 +557,6 @@ impl Backend for UnstableBackend { Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(Some(Ok(ev))) => ev, }; - // When we get one, map it to the correct format (or for finalized ev, wait for the pinned block): let ev = match ev { rpc_methods::TransactionStatus::Finalized { block } => {