broken fractured commits, only a few errors remaining

This commit is contained in:
Tadeo hepperle
2024-01-23 13:55:12 +01:00
parent 2c40b14230
commit 70e5118d42
47 changed files with 225 additions and 136 deletions
+46 -26
View File
@@ -27,7 +27,6 @@ use crate::backend::{
};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use crate::prelude::*;
use crate::Config;
use async_trait::async_trait;
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
@@ -43,7 +42,7 @@ pub use rpc_methods::UnstableRpcMethods;
/// Configure and build an [`UnstableBackend`].
pub struct UnstableBackendBuilder<T> {
max_block_life: usize,
_marker: PhantomData<T>,
_marker: std::marker::PhantomData<T>,
}
impl<T: Config> Default for UnstableBackendBuilder<T> {
@@ -57,7 +56,7 @@ impl<T: Config> UnstableBackendBuilder<T> {
pub fn new() -> Self {
Self {
max_block_life: usize::MAX,
_marker: PhantomData,
_marker: std::marker::PhantomData,
}
}
@@ -481,24 +480,43 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;
// Record the start time so that we can time out if things appear to take too long.
let start_instant = instant::Instant::now();
// A quick helper to return a generic error.
let err_other = |s: &str| Some(Err(Error::Other(s.into())));
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
loop {
// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
// Bail early if we're finished; nothing else to do.
if done {
return Poll::Ready(None);
}
// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
FollowEvent::Initialized(ev) => {
// Just in case this stream is really slow to start or something..
seen_blocks.insert(
ev.finalized_block_hash.hash(),
(SeenBlockMarker::Finalized, ev.finalized_block_hash),
);
}
// Bail if we exceed 4 mins; something very likely went wrong.
if start_instant.elapsed().as_secs() > 240 {
return Poll::Ready(err_other(
"Timeout waiting for the transaction to be finalized",
));
}
// Poll for a follow event, and error if the stream has unexpectedly ended.
let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(None) => {
return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly"))
}
Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev),
Poll::Pending => Poll::Pending,
};
let follow_ev_is_pending = follow_ev_poll.is_pending();
// If there was a follow event, then handle it and loop around to see if there are more.
// We want to buffer follow events until we hit Pending, so that we are as up-to-date as possible
// for when we see a BestBlockChanged event, so that we have the best change of already having
// seen the block that it mentions and returning a proper pinned block.
if let Poll::Ready(follow_ev) = follow_ev_poll {
match follow_ev {
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
@@ -522,7 +540,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// in which we may lose the finaliuzed block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
// forever.
return Poll::Ready(Some(Err(Error::Other("chainHead_follow emitted 'stop' event during transaction submission".into()))));
return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission"));
}
_ => {}
}
@@ -541,25 +559,27 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Keep waiting for more finalized blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway).
// Not found it! If follow ev is pending, then return pending here and wait for
// a new one to come in, else loop around and see if we get another one immediately.
seen_blocks.clear();
continue;
if follow_ev_is_pending {
return Poll::Pending;
} else {
continue;
}
}
}
// Otherwise, we are still watching for tx events:
let ev = match tx_progress.poll_next_unpin(cx) {
// If we don't have a finalized block yet, we keep polling for tx progress events.
let tx_progress_ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
done = true;
return Poll::Ready(None);
}
Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ev))) => ev,
};
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
let ev = match ev {
let tx_progress_ev = match tx_progress_ev {
rpc_methods::TransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
@@ -597,7 +617,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}
rpc_methods::TransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(ev)));
return Poll::Ready(Some(Ok(tx_progress_ev)));
}
});