From b34ce7a58049502eb0a5d3ed62bb6aa4e1549917 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 12 Jan 2024 18:48:25 +0200 Subject: [PATCH] Revert "backend: Add debug logs again" This reverts commit 2fbd054ceba0a10754abd5ea223ef6b81b3a579f. --- subxt/src/backend/unstable/mod.rs | 53 ++++++++++++++----------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 77f501a51a..fb1437352d 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -30,7 +30,7 @@ use crate::error::{Error, RpcError}; use crate::Config; use async_trait::async_trait; use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; -use futures::{SinkExt, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use std::collections::HashMap; use std::sync::Arc; use std::task::Poll; @@ -446,7 +446,19 @@ impl Backend for UnstableBackend { Finalized, } - let mut seen_blocks_sub = self.follow_handle.subscribe().events(); + // First, subscribe to all new and finalized block refs. + // - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we + // can try to return a block ref for the best block. + // - we subscribe to finalized refs so that when we see `Finalized`, we can + // guarantee that when we return here, the finalized block we report has been + // reported from chainHead_follow already. + let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| { + std::future::ready(match ev { + FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)), + FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)), + _ => None, + }) + }); // Then, submit the transaction. let mut tx_progress = self @@ -462,23 +474,9 @@ impl Backend for UnstableBackend { // with chainHead_follow. let mut finalized_hash: Option = None; - let mut mem_events = Vec::with_capacity(128); - let mut mem_tx_events = Vec::with_capacity(128); - let mut iter_num = 0; - let now = std::time::Instant::now(); - // Now we can attempt to associate tx events with pinned blocks. let tx_stream = futures::stream::poll_fn(move |cx| { loop { - iter_num += 1; - - if now.elapsed().as_secs() > 120 { - panic!( - "iter={:#?}\nmem_events={:#?}\nmem_tx_events={:#?}", - iter_num, mem_events, mem_tx_events - ); - } - // Bail early if no more tx events; we don't want to keep polling for pinned blocks. if done { return Poll::Ready(None); @@ -486,25 +484,24 @@ impl Backend for UnstableBackend { // Make a note of new or finalized blocks that have come in since we started the TX. if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { - mem_events.push((now.elapsed(), seen_block.clone())); - match seen_block { - FollowEvent::NewBlock(block_ref) => { - seen_blocks.insert( - block_ref.block_hash.hash(), - (SeenBlockMarker::New, block_ref.block_hash), - ); + SeenBlock::New(block_ref) => { + // Optimization: once we have a `finalized_hash`, we only care about finalized + // block refs now and can avoid bothering to save new blocks. + if finalized_hash.is_none() { + seen_blocks + .insert(block_ref.hash(), (SeenBlockMarker::New, block_ref)); + } } - FollowEvent::Finalized(block_refs) => { - for block_ref in block_refs.finalized_block_hashes { + SeenBlock::Finalized(block_refs) => { + for block_ref in block_refs { seen_blocks.insert( block_ref.hash(), (SeenBlockMarker::Finalized, block_ref), ); } } - _ => (), - }; + } continue; } @@ -538,8 +535,6 @@ impl Backend for UnstableBackend { Poll::Ready(Some(Ok(ev))) => ev, }; - mem_tx_events.push((now.elapsed(), ev.clone())); - // When we get one, map it to the correct format (or for finalized ev, wait for the pinned block): let ev = match ev { rpc_methods::TransactionStatus::Finalized { block } => {