Allow submitting transactions ignoring follow events (#1962)

This commit is contained in:
James Wilson
2025-03-14 16:08:38 +00:00
committed by GitHub
parent da3ea0b528
commit 06e658cd4b
+236 -146
View File
@@ -48,6 +48,7 @@ pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods;
pub struct ChainHeadBackendBuilder<T> {
max_block_life: usize,
transaction_timeout_secs: usize,
submit_transactions_ignoring_follow_events: bool,
_marker: std::marker::PhantomData<T>,
}
@@ -63,6 +64,7 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
Self {
max_block_life: usize::MAX,
transaction_timeout_secs: 240,
submit_transactions_ignoring_follow_events: false,
_marker: std::marker::PhantomData,
}
}
@@ -84,11 +86,35 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
/// give up waiting.
///
/// Provide a value here to denote how long, in seconds, to wait before giving up. Defaults to 240 seconds.
///
/// If [`Self::submit_transactions_ignoring_follow_events()`] is called, this timeout is ignored.
pub fn transaction_timeout(mut self, timeout_secs: usize) -> Self {
self.transaction_timeout_secs = timeout_secs;
self
}
/// When a transaction is submitted, we normally synchronize the events that we get back with events from
/// our background `chainHead_follow` subscription, to ensure that any blocks hashes that we see can be
/// immediately queried (for example to get events or state at that block), and are kept around unless they
/// are no longer needed.
///
/// The main downside of this synchronization is that there may be a delay in being handed back a
/// [`TransactionStatus::InFinalizedBlock`] event while we wait to see the same block hash emitted from
/// our background `chainHead_follow` subscription in order to ensure it's available for querying.
///
/// Calling this method turns off this synchronization, speeding up the response and removing any reliance
/// on the `chainHead_follow` subscription continuing to run without stopping throughout submitting a transaction.
///
/// # Warning
///
/// This can lead to errors when calling APIs like `wait_for_finalized_success`, which will try to retrieve events
/// at the finalized block, because there will be a race and the finalized block may not be available for querying
/// yet.
pub fn submit_transactions_ignoring_follow_events(mut self) -> Self {
self.submit_transactions_ignoring_follow_events = true;
self
}
/// A low-level API to build the backend and driver which requires polling the driver for the backend
/// to make progress.
///
@@ -117,6 +143,8 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
methods: rpc_methods,
follow_handle: follow_stream_driver.handle(),
transaction_timeout_secs: self.transaction_timeout_secs,
submit_transactions_ignoring_follow_events: self
.submit_transactions_ignoring_follow_events,
};
let driver = ChainHeadBackendDriver {
driver: follow_stream_driver,
@@ -187,6 +215,8 @@ pub struct ChainHeadBackend<T: Config> {
follow_handle: FollowStreamDriverHandle<T::Hash>,
// How long to wait until giving up on transactions:
transaction_timeout_secs: usize,
// Don't synchronise blocks with chainHead_follow when submitting txs:
submit_transactions_ignoring_follow_events: bool,
}
impl<T: Config> ChainHeadBackend<T> {
@@ -558,170 +588,230 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
let transaction_timeout_secs = self.transaction_timeout_secs as u64;
// Submit a transaction. This makes no attempt to sync with follow events,
async fn submit_transaction_ignoring_follow_events<T: Config>(
extrinsic: &[u8],
methods: &ChainHeadRpcMethods<T>,
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
let tx_progress = methods
.transactionwatch_v1_submit_and_watch(extrinsic)
.await?
.map(|ev| {
ev.map(|tx_status| {
use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus;
match tx_status {
RpcTransactionStatus::Validated => TransactionStatus::Validated,
RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
RpcTransactionStatus::BestChainBlockIncluded { block: None } => {
TransactionStatus::NoLongerInBestBlock
},
RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => {
TransactionStatus::InBestBlock { hash: BlockRef::from_hash(block.hash) }
},
RpcTransactionStatus::Finalized { block } => {
TransactionStatus::InFinalizedBlock { hash: BlockRef::from_hash(block.hash) }
},
RpcTransactionStatus::Error { error } => {
TransactionStatus::Error { message: error }
},
RpcTransactionStatus::Invalid { error } => {
TransactionStatus::Invalid { message: error }
},
RpcTransactionStatus::Dropped { error } => {
TransactionStatus::Dropped { message: error }
},
}
}).map_err(Into::into)
});
// We care about new and finalized block hashes.
enum SeenBlockMarker {
New,
Finalized,
Ok(StreamOf(Box::pin(tx_progress)))
}
// First, subscribe to new blocks.
let mut seen_blocks_sub = self.follow_handle.subscribe().events();
// Submit a transaction. This synchronizes with chainHead_follow events to ensure
// that block hashes returned are ready to be queried.
async fn submit_transaction_tracking_follow_events<T: Config>(
extrinsic: &[u8],
transaction_timeout_secs: u64,
methods: &ChainHeadRpcMethods<T>,
follow_handle: &FollowStreamDriverHandle<T::Hash>,
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
// We care about new and finalized block hashes.
enum SeenBlockMarker {
New,
Finalized,
}
// Then, submit the transaction.
let mut tx_progress = self
.methods
.transactionwatch_v1_submit_and_watch(extrinsic)
.await?;
// First, subscribe to new blocks.
let mut seen_blocks_sub = follow_handle.subscribe().events();
let mut seen_blocks = HashMap::new();
let mut done = false;
// Then, submit the transaction.
let mut tx_progress = methods
.transactionwatch_v1_submit_and_watch(extrinsic)
.await?;
// If we see the finalized event, we start waiting until we find a finalized block that
// matches, so we can guarantee to return a pinned block hash and be properly in sync
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;
let mut seen_blocks = HashMap::new();
let mut done = false;
// Record the start time so that we can time out if things appear to take too long.
let start_instant = web_time::Instant::now();
// If we see the finalized event, we start waiting until we find a finalized block that
// matches, so we can guarantee to return a pinned block hash and be properly in sync
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;
// A quick helper to return a generic error.
let err_other = |s: &str| Some(Err(Error::Other(s.into())));
// Record the start time so that we can time out if things appear to take too long.
let start_instant = web_time::Instant::now();
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
loop {
// Bail early if we're finished; nothing else to do.
if done {
return Poll::Ready(None);
}
// A quick helper to return a generic error.
let err_other = |s: &str| Some(Err(Error::Other(s.into())));
// Bail if we exceed 4 mins; something very likely went wrong.
if start_instant.elapsed().as_secs() > transaction_timeout_secs {
return Poll::Ready(err_other(
"Timeout waiting for the transaction to be finalized",
));
}
// Poll for a follow event, and error if the stream has unexpectedly ended.
let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(None) => {
return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly"))
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
loop {
// Bail early if we're finished; nothing else to do.
if done {
return Poll::Ready(None);
}
Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev),
Poll::Pending => Poll::Pending,
};
let follow_ev_is_pending = follow_ev_poll.is_pending();
// If there was a follow event, then handle it and loop around to see if there are more.
// We want to buffer follow events until we hit Pending, so that we are as up-to-date as possible
// for when we see a BestBlockChanged event, so that we have the best change of already having
// seen the block that it mentions and returning a proper pinned block.
if let Poll::Ready(follow_ev) = follow_ev_poll {
match follow_ev {
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
if finalized_hash.is_none() {
seen_blocks.insert(
ev.block_hash.hash(),
(SeenBlockMarker::New, ev.block_hash),
);
// Bail if we exceed 4 mins; something very likely went wrong.
if start_instant.elapsed().as_secs() > transaction_timeout_secs {
return Poll::Ready(err_other(
"Timeout waiting for the transaction to be finalized",
));
}
// Poll for a follow event, and error if the stream has unexpectedly ended.
let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(None) => {
return Poll::Ready(err_other(
"chainHead_follow stream ended unexpectedly",
))
}
Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev),
Poll::Pending => Poll::Pending,
};
let follow_ev_is_pending = follow_ev_poll.is_pending();
// If there was a follow event, then handle it and loop around to see if there are more.
// We want to buffer follow events until we hit Pending, so that we are as up-to-date as possible
// for when we see a BestBlockChanged event, so that we have the best change of already having
// seen the block that it mentions and returning a proper pinned block.
if let Poll::Ready(follow_ev) = follow_ev_poll {
match follow_ev {
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
if finalized_hash.is_none() {
seen_blocks.insert(
ev.block_hash.hash(),
(SeenBlockMarker::New, ev.block_hash),
);
}
}
}
FollowEvent::Finalized(ev) => {
for block_ref in ev.finalized_block_hashes {
seen_blocks.insert(
block_ref.hash(),
(SeenBlockMarker::Finalized, block_ref),
);
FollowEvent::Finalized(ev) => {
for block_ref in ev.finalized_block_hashes {
seen_blocks.insert(
block_ref.hash(),
(SeenBlockMarker::Finalized, block_ref),
);
}
}
FollowEvent::Stop => {
// If we get this event, we'll lose all of our existing pinned blocks and have a gap
// in which we may lose the finalized block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
// forever.
return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission"));
}
_ => {}
}
FollowEvent::Stop => {
// If we get this event, we'll lose all of our existing pinned blocks and have a gap
// in which we may lose the finalized block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
// forever.
return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission"));
}
_ => {}
}
continue;
}
// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {
if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash)
{
// Found it! Hand back the event with a pinned block. We're done.
done = true;
let ev = TransactionStatus::InFinalizedBlock {
hash: block_ref.into(),
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Not found it! If follow ev is pending, then return pending here and wait for
// a new one to come in, else loop around and see if we get another one immediately.
seen_blocks.clear();
if follow_ev_is_pending {
return Poll::Pending;
} else {
continue;
}
}
}
// If we don't have a finalized block yet, we keep polling for tx progress events.
let tx_progress_ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Some(Ok(ev))) => ev,
};
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus;
let tx_progress_ev = match tx_progress_ev {
RpcTransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
// pinned and accessible.
finalized_hash = Some(block.hash);
continue;
}
RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => {
// Look up a pinned block ref if we can, else return a non-pinned
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
// chainHead_follow subscription.
let block_ref = match seen_blocks.get(&block.hash) {
Some((_, block_ref)) => block_ref.clone().into(),
None => BlockRef::from_hash(block.hash),
};
TransactionStatus::InBestBlock { hash: block_ref }
}
RpcTransactionStatus::BestChainBlockIncluded { block: None } => {
TransactionStatus::NoLongerInBestBlock
}
RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
RpcTransactionStatus::Dropped { error, .. } => {
TransactionStatus::Dropped { message: error }
}
RpcTransactionStatus::Error { error } => {
TransactionStatus::Error { message: error }
}
RpcTransactionStatus::Invalid { error } => {
TransactionStatus::Invalid { message: error }
}
RpcTransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(tx_progress_ev)));
}
});
Ok(StreamOf(Box::pin(tx_stream)))
// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {
if let Some((SeenBlockMarker::Finalized, block_ref)) =
seen_blocks.remove(hash)
{
// Found it! Hand back the event with a pinned block. We're done.
done = true;
let ev = TransactionStatus::InFinalizedBlock {
hash: block_ref.into(),
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Not found it! If follow ev is pending, then return pending here and wait for
// a new one to come in, else loop around and see if we get another one immediately.
seen_blocks.clear();
if follow_ev_is_pending {
return Poll::Pending;
} else {
continue;
}
}
}
// If we don't have a finalized block yet, we keep polling for tx progress events.
let tx_progress_ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Some(Ok(ev))) => ev,
};
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus;
let tx_progress_ev = match tx_progress_ev {
RpcTransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
// pinned and accessible.
finalized_hash = Some(block.hash);
continue;
}
RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => {
// Look up a pinned block ref if we can, else return a non-pinned
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
// chainHead_follow subscription.
let block_ref = match seen_blocks.get(&block.hash) {
Some((_, block_ref)) => block_ref.clone().into(),
None => BlockRef::from_hash(block.hash),
};
TransactionStatus::InBestBlock { hash: block_ref }
}
RpcTransactionStatus::BestChainBlockIncluded { block: None } => {
TransactionStatus::NoLongerInBestBlock
}
RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
RpcTransactionStatus::Dropped { error, .. } => {
TransactionStatus::Dropped { message: error }
}
RpcTransactionStatus::Error { error } => {
TransactionStatus::Error { message: error }
}
RpcTransactionStatus::Invalid { error } => {
TransactionStatus::Invalid { message: error }
}
RpcTransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(tx_progress_ev)));
}
});
Ok(StreamOf(Box::pin(tx_stream)))
}
if self.submit_transactions_ignoring_follow_events {
submit_transaction_ignoring_follow_events(extrinsic, &self.methods).await
} else {
submit_transaction_tracking_follow_events::<T>(
extrinsic,
self.transaction_timeout_secs as u64,
&self.methods,
&self.follow_handle,
)
.await
}
}
async fn call(