diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index b949de2be4..80638fdd78 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -10,6 +10,7 @@ use crate::utils::strip_compact_prefix; use crate::{ backend::{BlockRef, StreamOfResults, TransactionStatus as BackendTxStatus}, client::OnlineClientT, + config::Header, error::{DispatchError, Error, RpcError, TransactionError}, events::EventsClient, Config, @@ -82,6 +83,20 @@ where /// that this is true. In those cases the stream is closed however, so you currently have no way to find /// out if they finally made it into a block or not. pub async fn wait_for_finalized(mut self) -> Result, Error> { + // Wait for the tx to be reported in a block by the transaction status. + let tx_in_block = self.wait_for_tx_finalized().await?; + + // Tx reported as finalized must be synchronized with the chain head: + // - tx finalized number must be lower than the chain head finalized number. + // - tx finalized hash must be equal to the chain head finalized hash. + self.wait_for_chain_head_finalized(tx_in_block).await + } + + /// Wait for the transaction to be reported as finalized. + /// + /// This relies entirely by the transaction status, that could be out of sync with + /// the chain head. + async fn wait_for_tx_finalized(&mut self) -> Result, Error> { while let Some(status) = self.next().await { match status? { // Finalized! Return. @@ -101,6 +116,80 @@ where Err(RpcError::SubscriptionDropped.into()) } + /// Wait for the block hash of the transaction to be reported as finalized + /// by the chain head. + async fn wait_for_chain_head_finalized( + &mut self, + tx_in_block: TxInBlock, + ) -> Result, Error> { + // Correlate the tx status with the head of the chain. + let head = self.client.backend().latest_finalized_block_ref().await?; + + // This optimizes the most common case, where the tx status is in sync with the head of the chain. + if tx_in_block.block_hash() == head.hash() { + return Ok(tx_in_block); + } + + // Must determine if the hash is older or newer than the chain head. + let Some(tx_num) = self + .client + .backend() + .block_header(tx_in_block.block_hash()) + .await? + .map(|header: ::Header| header.number()) + else { + return Err(TransactionError::Error( + "Transaction block header not found. This is a bug, please open an issue".into(), + ) + .into()); + }; + + let Some(head_num) = self + .client + .backend() + .block_header(head.hash()) + .await? + .map(|header| header.number()) + else { + return Err(TransactionError::Error( + "Chain finalized block header not found. This is a bug, please open an issue" + .into(), + ) + .into()); + }; + + let tx_num: u64 = tx_num.into(); + let head_num: u64 = head_num.into(); + // Rely on substrate to report a valid TX status included in a prior finalized block. + if tx_num < head_num { + return Ok(tx_in_block); + } + + // Wait for the chain head to report the tx block hash as finalized, or to report + // a newer finalized block hash (in terms of block number). + let mut finalized_stream = self + .client + .backend() + .stream_finalized_block_headers() + .await?; + + while let Some(finalized) = finalized_stream.next().await { + let (finalized, _block_ref) = finalized?; + + // By the time we open this subscription stream, tx finalized number may (very unlikely) lag behind. + let finalized_num = finalized.number().into(); + if tx_num < finalized_num { + return Ok(tx_in_block); + } + + if finalized.hash() == tx_in_block.block_hash() { + return Ok(tx_in_block); + } + } + + Ok(tx_in_block) + } + /// Wait for the transaction to be finalized, and for the transaction events to indicate /// that the transaction was successful. Returns the events associated with the transaction, /// as well as a couple of other details (block hash and extrinsic hash).