mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 10:17:57 +00:00
@@ -30,7 +30,7 @@ use crate::error::{Error, RpcError};
|
||||
use crate::Config;
|
||||
use async_trait::async_trait;
|
||||
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
|
||||
use futures::{SinkExt, Stream, StreamExt};
|
||||
use futures::{Stream, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
@@ -446,7 +446,19 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
||||
Finalized,
|
||||
}
|
||||
|
||||
let mut seen_blocks_sub = self.follow_handle.subscribe().events();
|
||||
// First, subscribe to all new and finalized block refs.
|
||||
// - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we
|
||||
// can try to return a block ref for the best block.
|
||||
// - we subscribe to finalized refs so that when we see `Finalized`, we can
|
||||
// guarantee that when we return here, the finalized block we report has been
|
||||
// reported from chainHead_follow already.
|
||||
let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| {
|
||||
std::future::ready(match ev {
|
||||
FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)),
|
||||
FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)),
|
||||
_ => None,
|
||||
})
|
||||
});
|
||||
|
||||
// Then, submit the transaction.
|
||||
let mut tx_progress = self
|
||||
@@ -462,23 +474,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
||||
// with chainHead_follow.
|
||||
let mut finalized_hash: Option<T::Hash> = None;
|
||||
|
||||
let mut mem_events = Vec::with_capacity(128);
|
||||
let mut mem_tx_events = Vec::with_capacity(128);
|
||||
let mut iter_num = 0;
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
// Now we can attempt to associate tx events with pinned blocks.
|
||||
let tx_stream = futures::stream::poll_fn(move |cx| {
|
||||
loop {
|
||||
iter_num += 1;
|
||||
|
||||
if now.elapsed().as_secs() > 120 {
|
||||
panic!(
|
||||
"iter={:#?}\nmem_events={:#?}\nmem_tx_events={:#?}",
|
||||
iter_num, mem_events, mem_tx_events
|
||||
);
|
||||
}
|
||||
|
||||
// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
|
||||
if done {
|
||||
return Poll::Ready(None);
|
||||
@@ -486,25 +484,24 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
||||
|
||||
// Make a note of new or finalized blocks that have come in since we started the TX.
|
||||
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
|
||||
mem_events.push((now.elapsed(), seen_block.clone()));
|
||||
|
||||
match seen_block {
|
||||
FollowEvent::NewBlock(block_ref) => {
|
||||
seen_blocks.insert(
|
||||
block_ref.block_hash.hash(),
|
||||
(SeenBlockMarker::New, block_ref.block_hash),
|
||||
);
|
||||
SeenBlock::New(block_ref) => {
|
||||
// Optimization: once we have a `finalized_hash`, we only care about finalized
|
||||
// block refs now and can avoid bothering to save new blocks.
|
||||
if finalized_hash.is_none() {
|
||||
seen_blocks
|
||||
.insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));
|
||||
}
|
||||
}
|
||||
FollowEvent::Finalized(block_refs) => {
|
||||
for block_ref in block_refs.finalized_block_hashes {
|
||||
SeenBlock::Finalized(block_refs) => {
|
||||
for block_ref in block_refs {
|
||||
seen_blocks.insert(
|
||||
block_ref.hash(),
|
||||
(SeenBlockMarker::Finalized, block_ref),
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -538,8 +535,6 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
||||
Poll::Ready(Some(Ok(ev))) => ev,
|
||||
};
|
||||
|
||||
mem_tx_events.push((now.elapsed(), ev.clone()));
|
||||
|
||||
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
|
||||
let ev = match ev {
|
||||
rpc_methods::TransactionStatus::Finalized { block } => {
|
||||
|
||||
Reference in New Issue
Block a user