diff --git a/Cargo.lock b/Cargo.lock index cf7d806a4a..71312952ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4367,6 +4367,7 @@ dependencies = [ "getrandom 0.2.12", "hex", "impl-serde", + "instant", "jsonrpsee", "parity-scale-codec", "primitive-types", diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index f0b0c04ce5..0686a8c897 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -33,7 +33,14 @@ native = [ # Enable this for web/wasm builds. # Exactly 1 of "web" and "native" is expected. -web = ["jsonrpsee?/async-wasm-client", "jsonrpsee?/client-web-transport", "getrandom/js", "subxt-lightclient?/web", "subxt-macro/web"] +web = [ + "jsonrpsee?/async-wasm-client", + "jsonrpsee?/client-web-transport", + "getrandom/js", + "subxt-lightclient?/web", + "subxt-macro/web", + "instant/wasm-bindgen" +] # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = ["dep:jsonrpsee"] @@ -70,6 +77,7 @@ tracing = { workspace = true } frame-metadata = { workspace = true } derivative = { workspace = true } either = { workspace = true } +instant = { workspace = true } # Provides some deserialization, types like U256/H256 and hashing impls like twox/blake256: impl-serde = { workspace = true } diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index fdca29723a..ec3e5c49f7 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -45,7 +45,7 @@ impl Stream for FollowStreamDriver { type Item = Result<(), Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let item = match self.inner.poll_next_unpin(cx) { + match self.inner.poll_next_unpin(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { // Mark ourselves as done so that everything can end. @@ -53,11 +53,12 @@ impl Stream for FollowStreamDriver { return Poll::Ready(None); } Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), - Poll::Ready(Some(Ok(item))) => item, - }; - - self.shared.push_item(item); - Poll::Ready(Some(Ok(()))) + Poll::Ready(Some(Ok(item))) => { + // Push item to any subscribers. + self.shared.push_item(item); + Poll::Ready(Some(Ok(()))) + } + } } } diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index 3f055f9f4d..00214cb2fe 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -74,10 +74,11 @@ impl Stream for FollowStreamUnpin { let mut this = self.as_mut(); loop { - // Poll the unpin tasks while they are completing. if we get back None, then - // no tasks in the list, and if pending, we'll be woken when we can poll again. - if let Poll::Ready(Some(())) = this.unpin_futs.poll_next_unpin(cx) { - continue; + // Poll any queued unpin tasks. + let unpin_futs_are_pending = match this.unpin_futs.poll_next_unpin(cx) { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => false, + Poll::Pending => true, }; // Poll the inner stream for the next event. @@ -85,9 +86,14 @@ impl Stream for FollowStreamUnpin { return Poll::Pending; }; - // No more progress to be made if inner stream done. let Some(ev) = ev else { - return Poll::Ready(None); + // if the stream is done, but `unpin_futs` are still pending, then + // return pending here so that they are still driven to completion. + // Else, return `Ready(None)` to signal nothing left to do. + return match unpin_futs_are_pending { + true => Poll::Pending, + false => Poll::Ready(None), + }; }; // Error? just return it and do nothing further. diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index a9fc8658a0..4983eb12e4 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -480,24 +480,47 @@ impl Backend for UnstableBackend { // with chainHead_follow. let mut finalized_hash: Option = None; + // Monitor usage to help track down an obscure issue. + let start_instant = instant::Instant::now(); + let mut last_seen_follow_event_str = None; + let mut last_seen_follow_event_time = None; + let mut last_seen_tx_event_str = None; + let mut last_seen_tx_event_time = None; + let mut num_polls: usize = 0; + let mut num_loops: usize = 0; + // Now we can attempt to associate tx events with pinned blocks. let tx_stream = futures::stream::poll_fn(move |cx| { + num_polls = num_polls.saturating_add(1); + loop { + num_loops = num_loops.saturating_add(1); + // Bail early if no more tx events; we don't want to keep polling for pinned blocks. if done { return Poll::Ready(None); } + // Panic if we exceed 4 mins; something very likely went wrong. We'll remove this + // once we get to the bottom of a spurious error that causes this to hang. + if start_instant.elapsed().as_secs() > 240 { + panic!( + "Finalized block expected by now: \ + start_time={start_instant:?}, \ + num_polls={num_polls}, \ + num_loops={num_loops}, \ + last_follow_event_time={last_seen_follow_event_time:?}, \ + last_follow_event={last_seen_follow_event_str:?}, \ + last_tx_event_time={last_seen_tx_event_time:?}, \ + last_tx_event={last_seen_tx_event_str:?}", + ); + } + // Make a note of new or finalized blocks that have come in since we started the TX. - if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { - match seen_block { - FollowEvent::Initialized(ev) => { - // Just in case this stream is really slow to start or something.. - seen_blocks.insert( - ev.finalized_block_hash.hash(), - (SeenBlockMarker::Finalized, ev.finalized_block_hash), - ); - } + if let Poll::Ready(Some(follow_event)) = seen_blocks_sub.poll_next_unpin(cx) { + last_seen_follow_event_str = Some(format!("{follow_event:?}")); + last_seen_follow_event_time = Some(instant::Instant::now()); + match follow_event { FollowEvent::NewBlock(ev) => { // Optimization: once we have a `finalized_hash`, we only care about finalized // block refs now and can avoid bothering to save new blocks. @@ -557,6 +580,10 @@ impl Backend for UnstableBackend { Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(Some(Ok(ev))) => ev, }; + + last_seen_tx_event_str = Some(format!("{ev:?}")); + last_seen_tx_event_time = Some(instant::Instant::now()); + // When we get one, map it to the correct format (or for finalized ev, wait for the pinned block): let ev = match ev { rpc_methods::TransactionStatus::Finalized { block } => { diff --git a/testing/integration-tests/src/utils/node_proc.rs b/testing/integration-tests/src/utils/node_proc.rs index 0bcb72cc1e..9118beab03 100644 --- a/testing/integration-tests/src/utils/node_proc.rs +++ b/testing/integration-tests/src/utils/node_proc.rs @@ -219,7 +219,10 @@ async fn build_unstable_client( use futures::StreamExt; while let Some(val) = driver.next().await { if let Err(e) = val { - eprintln!("Error driving unstable backend: {e}"); + // This is a test; bail if something does wrong and try to + // ensure that the message makes it to some logs. + eprintln!("Error driving unstable backend in tests (will panic): {e}"); + panic!("Error driving unstable backend in tests: {e}"); break; } }