Add logging to submit_transaction and unstable driver, and ensure unpin evs complete

This commit is contained in:
James Wilson
2024-01-17 17:07:40 +00:00
parent 553343548e
commit 6b83016a6d
6 changed files with 69 additions and 23 deletions
Generated
+1
View File
@@ -4367,6 +4367,7 @@ dependencies = [
"getrandom 0.2.12",
"hex",
"impl-serde",
"instant",
"jsonrpsee",
"parity-scale-codec",
"primitive-types",
+9 -1
View File
@@ -33,7 +33,14 @@ native = [
# Enable this for web/wasm builds.
# Exactly 1 of "web" and "native" is expected.
web = ["jsonrpsee?/async-wasm-client", "jsonrpsee?/client-web-transport", "getrandom/js", "subxt-lightclient?/web", "subxt-macro/web"]
web = [
"jsonrpsee?/async-wasm-client",
"jsonrpsee?/client-web-transport",
"getrandom/js",
"subxt-lightclient?/web",
"subxt-macro/web",
"instant/wasm-bindgen"
]
# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = ["dep:jsonrpsee"]
@@ -70,6 +77,7 @@ tracing = { workspace = true }
frame-metadata = { workspace = true }
derivative = { workspace = true }
either = { workspace = true }
instant = { workspace = true }
# Provides some deserialization, types like U256/H256 and hashing impls like twox/blake256:
impl-serde = { workspace = true }
@@ -45,7 +45,7 @@ impl<Hash: BlockHash> Stream for FollowStreamDriver<Hash> {
type Item = Result<(), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = match self.inner.poll_next_unpin(cx) {
match self.inner.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
// Mark ourselves as done so that everything can end.
@@ -53,11 +53,12 @@ impl<Hash: BlockHash> Stream for FollowStreamDriver<Hash> {
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(item))) => item,
};
self.shared.push_item(item);
Poll::Ready(Some(Ok(())))
Poll::Ready(Some(Ok(item))) => {
// Push item to any subscribers.
self.shared.push_item(item);
Poll::Ready(Some(Ok(())))
}
}
}
}
@@ -74,10 +74,11 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
let mut this = self.as_mut();
loop {
// Poll the unpin tasks while they are completing. if we get back None, then
// no tasks in the list, and if pending, we'll be woken when we can poll again.
if let Poll::Ready(Some(())) = this.unpin_futs.poll_next_unpin(cx) {
continue;
// Poll any queued unpin tasks.
let unpin_futs_are_pending = match this.unpin_futs.poll_next_unpin(cx) {
Poll::Ready(Some(())) => continue,
Poll::Ready(None) => false,
Poll::Pending => true,
};
// Poll the inner stream for the next event.
@@ -85,9 +86,14 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
return Poll::Pending;
};
// No more progress to be made if inner stream done.
let Some(ev) = ev else {
return Poll::Ready(None);
// if the stream is done, but `unpin_futs` are still pending, then
// return pending here so that they are still driven to completion.
// Else, return `Ready(None)` to signal nothing left to do.
return match unpin_futs_are_pending {
true => Poll::Pending,
false => Poll::Ready(None),
};
};
// Error? just return it and do nothing further.
+36 -9
View File
@@ -480,24 +480,47 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;
// Monitor usage to help track down an obscure issue.
let start_instant = instant::Instant::now();
let mut last_seen_follow_event_str = None;
let mut last_seen_follow_event_time = None;
let mut last_seen_tx_event_str = None;
let mut last_seen_tx_event_time = None;
let mut num_polls: usize = 0;
let mut num_loops: usize = 0;
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
num_polls = num_polls.saturating_add(1);
loop {
num_loops = num_loops.saturating_add(1);
// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
if done {
return Poll::Ready(None);
}
// Panic if we exceed 4 mins; something very likely went wrong. We'll remove this
// once we get to the bottom of a spurious error that causes this to hang.
if start_instant.elapsed().as_secs() > 240 {
panic!(
"Finalized block expected by now: \
start_time={start_instant:?}, \
num_polls={num_polls}, \
num_loops={num_loops}, \
last_follow_event_time={last_seen_follow_event_time:?}, \
last_follow_event={last_seen_follow_event_str:?}, \
last_tx_event_time={last_seen_tx_event_time:?}, \
last_tx_event={last_seen_tx_event_str:?}",
);
}
// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
FollowEvent::Initialized(ev) => {
// Just in case this stream is really slow to start or something..
seen_blocks.insert(
ev.finalized_block_hash.hash(),
(SeenBlockMarker::Finalized, ev.finalized_block_hash),
);
}
if let Poll::Ready(Some(follow_event)) = seen_blocks_sub.poll_next_unpin(cx) {
last_seen_follow_event_str = Some(format!("{follow_event:?}"));
last_seen_follow_event_time = Some(instant::Instant::now());
match follow_event {
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
@@ -557,6 +580,10 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ev))) => ev,
};
last_seen_tx_event_str = Some(format!("{ev:?}"));
last_seen_tx_event_time = Some(instant::Instant::now());
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
let ev = match ev {
rpc_methods::TransactionStatus::Finalized { block } => {
@@ -219,7 +219,10 @@ async fn build_unstable_client<T: Config>(
use futures::StreamExt;
while let Some(val) = driver.next().await {
if let Err(e) = val {
eprintln!("Error driving unstable backend: {e}");
// This is a test; bail if something does wrong and try to
// ensure that the message makes it to some logs.
eprintln!("Error driving unstable backend in tests (will panic): {e}");
panic!("Error driving unstable backend in tests: {e}");
break;
}
}