From d9169e2404e6891593508f49e9f45ddd7153e8d9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 10 Jan 2024 11:48:44 +0200 Subject: [PATCH] backend: Detect chainHead stream termination and stop polling Signed-off-by: Alexandru Vasile --- subxt/src/backend/unstable/mod.rs | 134 +++++++++++++++++++++--------- 1 file changed, 96 insertions(+), 38 deletions(-) diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 4c721510e8..f2fcd22346 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -553,6 +553,8 @@ impl Backend for UnstableBackend { let mut mem_log = vec![]; let mut loop_times = 0; + let mut should_poll_blocks = true; + // Now we can attempt to associate tx events with pinned blocks. let tx_stream = futures::stream::poll_fn(move |cx| { loop { @@ -575,52 +577,108 @@ impl Backend for UnstableBackend { } // Make a note of new or finalized blocks that have come in since we started the TX. - if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { - match seen_block { - SeenBlock::New((block_ref, parent)) => { - // Optimization: once we have a `finalized_hash`, we only care about finalized - // block refs now and can avoid bothering to save new blocks. - // if finalized_hash.is_none() { - seen_blocks.insert( - block_ref.hash(), - ( - SeenBlockMarker::New, - block_ref, - parent, - Some(now.elapsed()), - None, - ), - ); - // } - } - SeenBlock::Finalized(block_refs) => { - for block_ref in block_refs { - // if !seen_blocks.contains_key(&block_ref.hash()) { - // panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other); - // } + if should_poll_blocks { + match seen_blocks_sub.poll_next_unpin(cx) { + Poll::Ready(Some(seen_block)) => { + match seen_block { + SeenBlock::New((block_ref, parent)) => { + // Optimization: once we have a `finalized_hash`, we only care about finalized + // block refs now and can avoid bothering to save new blocks. + // if finalized_hash.is_none() { + seen_blocks.insert( + block_ref.hash(), + ( + SeenBlockMarker::New, + block_ref, + parent, + Some(now.elapsed()), + None, + ), + ); + // } + } + SeenBlock::Finalized(block_refs) => { + for block_ref in block_refs { + // if !seen_blocks.contains_key(&block_ref.hash()) { + // panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other); + // } - let entry = seen_blocks.entry(block_ref.hash()).or_insert(( - SeenBlockMarker::Finalized, - block_ref.clone(), - block_ref, - None, - Some(now.elapsed()), - )); - entry.0 = SeenBlockMarker::Finalized; - entry.4 = Some(now.elapsed()); + let entry = + seen_blocks.entry(block_ref.hash()).or_insert(( + SeenBlockMarker::Finalized, + block_ref.clone(), + block_ref, + None, + Some(now.elapsed()), + )); + entry.0 = SeenBlockMarker::Finalized; + entry.4 = Some(now.elapsed()); - // .get_mut(&block_ref.hash()) - // .expect("finalized block seen before new block") - // .0 = SeenBlockMarker::Finalized; + // .get_mut(&block_ref.hash()) + // .expect("finalized block seen before new block") + // .0 = SeenBlockMarker::Finalized; + } + } + SeenBlock::Other(other) => { + seen_other.push((now.elapsed(), other)); + } } + continue; } - SeenBlock::Other(other) => { - seen_other.push((now.elapsed(), other)); + Poll::Ready(None) => { + should_poll_blocks = false; + println!(" seen_blocks_sub ended!"); } + Poll::Pending => (), } - continue; } + // if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { + // match seen_block { + // SeenBlock::New((block_ref, parent)) => { + // // Optimization: once we have a `finalized_hash`, we only care about finalized + // // block refs now and can avoid bothering to save new blocks. + // // if finalized_hash.is_none() { + // seen_blocks.insert( + // block_ref.hash(), + // ( + // SeenBlockMarker::New, + // block_ref, + // parent, + // Some(now.elapsed()), + // None, + // ), + // ); + // // } + // } + // SeenBlock::Finalized(block_refs) => { + // for block_ref in block_refs { + // // if !seen_blocks.contains_key(&block_ref.hash()) { + // // panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other); + // // } + + // let entry = seen_blocks.entry(block_ref.hash()).or_insert(( + // SeenBlockMarker::Finalized, + // block_ref.clone(), + // block_ref, + // None, + // Some(now.elapsed()), + // )); + // entry.0 = SeenBlockMarker::Finalized; + // entry.4 = Some(now.elapsed()); + + // // .get_mut(&block_ref.hash()) + // // .expect("finalized block seen before new block") + // // .0 = SeenBlockMarker::Finalized; + // } + // } + // SeenBlock::Other(other) => { + // seen_other.push((now.elapsed(), other)); + // } + // } + // continue; + // } + // If we have a finalized hash, we are done looking for tx events and we are just waiting // for a pinned block with a matching hash (which must appear eventually given it's finalized). if let Some(hash) = &finalized_hash {