mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-19 21:41:02 +00:00
backend: Add debug logs again
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
@@ -30,7 +30,7 @@ use crate::error::{Error, RpcError};
|
|||||||
use crate::Config;
|
use crate::Config;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
|
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{SinkExt, Stream, StreamExt};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
@@ -446,19 +446,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
|||||||
Finalized,
|
Finalized,
|
||||||
}
|
}
|
||||||
|
|
||||||
// First, subscribe to all new and finalized block refs.
|
let mut seen_blocks_sub = self.follow_handle.subscribe().events();
|
||||||
// - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we
|
|
||||||
// can try to return a block ref for the best block.
|
|
||||||
// - we subscribe to finalized refs so that when we see `Finalized`, we can
|
|
||||||
// guarantee that when we return here, the finalized block we report has been
|
|
||||||
// reported from chainHead_follow already.
|
|
||||||
let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| {
|
|
||||||
std::future::ready(match ev {
|
|
||||||
FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)),
|
|
||||||
FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)),
|
|
||||||
_ => None,
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
// Then, submit the transaction.
|
// Then, submit the transaction.
|
||||||
let mut tx_progress = self
|
let mut tx_progress = self
|
||||||
@@ -474,9 +462,23 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
|||||||
// with chainHead_follow.
|
// with chainHead_follow.
|
||||||
let mut finalized_hash: Option<T::Hash> = None;
|
let mut finalized_hash: Option<T::Hash> = None;
|
||||||
|
|
||||||
|
let mut mem_events = Vec::with_capacity(128);
|
||||||
|
let mut mem_tx_events = Vec::with_capacity(128);
|
||||||
|
let mut iter_num = 0;
|
||||||
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
// Now we can attempt to associate tx events with pinned blocks.
|
// Now we can attempt to associate tx events with pinned blocks.
|
||||||
let tx_stream = futures::stream::poll_fn(move |cx| {
|
let tx_stream = futures::stream::poll_fn(move |cx| {
|
||||||
loop {
|
loop {
|
||||||
|
iter_num += 1;
|
||||||
|
|
||||||
|
if now.elapsed().as_secs() > 120 {
|
||||||
|
panic!(
|
||||||
|
"iter={:#?}\nmem_events={:#?}\nmem_tx_events={:#?}",
|
||||||
|
iter_num, mem_events, mem_tx_events
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
|
// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
|
||||||
if done {
|
if done {
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
@@ -484,24 +486,25 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
|||||||
|
|
||||||
// Make a note of new or finalized blocks that have come in since we started the TX.
|
// Make a note of new or finalized blocks that have come in since we started the TX.
|
||||||
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
|
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
|
||||||
|
mem_events.push((now.elapsed(), seen_block.clone()));
|
||||||
|
|
||||||
match seen_block {
|
match seen_block {
|
||||||
SeenBlock::New(block_ref) => {
|
FollowEvent::NewBlock(block_ref) => {
|
||||||
// Optimization: once we have a `finalized_hash`, we only care about finalized
|
seen_blocks.insert(
|
||||||
// block refs now and can avoid bothering to save new blocks.
|
block_ref.block_hash.hash(),
|
||||||
if finalized_hash.is_none() {
|
(SeenBlockMarker::New, block_ref.block_hash),
|
||||||
seen_blocks
|
);
|
||||||
.insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
SeenBlock::Finalized(block_refs) => {
|
FollowEvent::Finalized(block_refs) => {
|
||||||
for block_ref in block_refs {
|
for block_ref in block_refs.finalized_block_hashes {
|
||||||
seen_blocks.insert(
|
seen_blocks.insert(
|
||||||
block_ref.hash(),
|
block_ref.hash(),
|
||||||
(SeenBlockMarker::Finalized, block_ref),
|
(SeenBlockMarker::Finalized, block_ref),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
_ => (),
|
||||||
|
};
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -535,6 +538,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
|||||||
Poll::Ready(Some(Ok(ev))) => ev,
|
Poll::Ready(Some(Ok(ev))) => ev,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
mem_tx_events.push((now.elapsed(), ev.clone()));
|
||||||
|
|
||||||
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
|
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
|
||||||
let ev = match ev {
|
let ev = match ev {
|
||||||
rpc_methods::TransactionStatus::Finalized { block } => {
|
rpc_methods::TransactionStatus::Finalized { block } => {
|
||||||
|
|||||||
Reference in New Issue
Block a user