diff --git a/examples/examples/submit_and_watch.rs b/examples/examples/submit_and_watch.rs index fba15707e7..1781eb2b29 100644 --- a/examples/examples/submit_and_watch.rs +++ b/examples/examples/submit_and_watch.rs @@ -130,7 +130,7 @@ async fn handle_transfer_events() -> Result<(), Box> { use subxt::tx::TxStatus::*; // Made it into a block, but not finalized. - if let InBlock(details) = ev { + if let InBlock(Some(details)) = ev { println!( "Transaction {:?} made it into block {:?}", details.extrinsic_hash(), diff --git a/subxt/src/rpc/rpc.rs b/subxt/src/rpc/rpc.rs index f0dcd6aa40..a481c7bfb1 100644 --- a/subxt/src/rpc/rpc.rs +++ b/subxt/src/rpc/rpc.rs @@ -46,6 +46,7 @@ use super::{ ChainHeadResult, FollowEvent, RuntimeEvent, + TransactionEvent, }, RpcClient, RpcClientT, @@ -984,15 +985,15 @@ impl Rpc { pub async fn watch_extrinsic( &self, extrinsic: X, - ) -> Result>, Error> { + ) -> Result>, Error> { let bytes: Bytes = extrinsic.encode().into(); let params = rpc_params![bytes]; let subscription = self .client .subscribe( - "author_submitAndWatchExtrinsic", + "transaction_unstable_submitAndWatch", params, - "author_unwatchExtrinsic", + "transaction_unstable_unwatch", ) .await?; Ok(subscription) diff --git a/subxt/src/rpc/subscription_events.rs b/subxt/src/rpc/subscription_events.rs index 3a721121a9..b81d58a796 100644 --- a/subxt/src/rpc/subscription_events.rs +++ b/subxt/src/rpc/subscription_events.rs @@ -380,10 +380,7 @@ impl From> for TransactionEvent { /// Serialize and deserialize helper as string. mod as_string { use super::*; - use serde::{ - Deserializer, - Serializer, - }; + use serde::Deserializer; pub fn deserialize<'de, D: Deserializer<'de>>( deserializer: D, diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 0db6a53d80..a1d3e44ca4 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -16,8 +16,12 @@ use crate::{ }, events::EventsClient, rpc::{ + subscription_events::{ + TransactionDropped as TransactionDroppedEvent, + TransactionError as TransactionErrorEvent, + TransactionEvent, + }, Subscription, - SubstrateTxStatus, }, Config, }; @@ -34,7 +38,7 @@ pub use sp_runtime::traits::SignedExtension; #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub struct TxProgress { - sub: Option>>, + sub: Option>>, ext_hash: T::Hash, client: C, } @@ -47,7 +51,7 @@ impl Unpin for TxProgress {} impl TxProgress { /// Instantiate a new [`TxProgress`] from a custom subscription. pub fn new( - sub: Subscription>, + sub: Subscription>, client: C, ext_hash: T::Hash, ) -> Self { @@ -91,9 +95,9 @@ where while let Some(status) = self.next_item().await { match status? { // Finalized or otherwise in a block! Return. - TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s), + TxStatus::InBlock(Some(s)) | TxStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. - TxStatus::FinalityTimeout(_) => { + TxStatus::Error(_) => { return Err(TransactionError::FinalitySubscriptionTimeout.into()) } // Ignore anything else and wait for next status event: @@ -119,7 +123,7 @@ where // Finalized! Return. TxStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. - TxStatus::FinalityTimeout(_) => { + TxStatus::Error(_) => { return Err(TransactionError::FinalitySubscriptionTimeout.into()) } // Ignore and wait for next status event: @@ -162,20 +166,22 @@ impl> Stream for TxProgress { sub.poll_next_unpin(cx).map_ok(|status| { match status { - SubstrateTxStatus::Future => TxStatus::Future, - SubstrateTxStatus::Ready => TxStatus::Ready, - SubstrateTxStatus::Broadcast(peers) => TxStatus::Broadcast(peers), - SubstrateTxStatus::InBlock(hash) => { - TxStatus::InBlock(TxInBlock::new( - hash, - self.ext_hash, - self.client.clone(), - )) + TransactionEvent::Validated => TxStatus::Validated, + TransactionEvent::Broadcasted(broadcasted) => { + TxStatus::Broadcast(broadcasted.num_peers) } - SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), - SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), - SubstrateTxStatus::Dropped => TxStatus::Dropped, - SubstrateTxStatus::Invalid => TxStatus::Invalid, + TransactionEvent::BestChainBlockIncluded(block) => { + TxStatus::InBlock(block.map(|block| { + TxInBlock::new( + block.hash, + block.index, + self.ext_hash, + self.client.clone(), + ) + })) + } + TransactionEvent::Invalid(error) => TxStatus::Invalid(error), + TransactionEvent::Dropped(dropped) => TxStatus::Dropped(dropped), // Only the following statuses are actually considered "final" (see the substrate // docs on `TxStatus`). Basically, either the transaction makes it into a // block, or we eventually give up on waiting for it to make it into a block. @@ -185,14 +191,15 @@ impl> Stream for TxProgress { // nonce might still be valid on some fork on another node which ends up being finalized. // Equally, a transaction `Dropped` from one node may still be in the transaction pool, // and make it into a block, on another node. Likewise with `Usurped`. - SubstrateTxStatus::FinalityTimeout(hash) => { + TransactionEvent::Error(error) => { self.sub = None; - TxStatus::FinalityTimeout(hash) + TxStatus::Error(error) } - SubstrateTxStatus::Finalized(hash) => { + TransactionEvent::Finalized(block) => { self.sub = None; TxStatus::Finalized(TxInBlock::new( - hash, + block.hash, + block.index, self.ext_hash, self.client.clone(), )) @@ -251,30 +258,25 @@ impl> Stream for TxProgress { #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub enum TxStatus { - /// The transaction is part of the "future" queue. - Future, - /// The transaction is part of the "ready" queue. - Ready, - /// The transaction has been broadcast to the given peers. - Broadcast(Vec), - /// The transaction has been included in a block with given hash. - InBlock(TxInBlock), - /// The block this transaction was included in has been retracted, - /// probably because it did not make it onto the blocks which were - /// finalized. - Retracted(T::Hash), - /// A block containing the transaction did not reach finality within 512 - /// blocks, and so the subscription has ended. - FinalityTimeout(T::Hash), + /// The transaction was validated by the runtime. + Validated, + /// The transaction has been broadcast to a number of peers. + Broadcast(usize), + /// The transaction has been included in a best block with given hash and index. + /// + /// # Note + /// + /// This may contain `None` if the block is no longer a best + /// block of the chain. + InBlock(Option>), /// The transaction has been finalized by a finality-gadget, e.g GRANDPA. Finalized(TxInBlock), - /// The transaction has been replaced in the pool by another transaction - /// that provides the same tags. (e.g. same (sender, nonce)). - Usurped(T::Hash), - /// The transaction has been dropped from the pool because of the limit. - Dropped, - /// The transaction is no longer valid in the current state. - Invalid, + /// The transaction could not be processed due to an error. + Error(TransactionErrorEvent), + /// The transaction is marked as invalid. + Invalid(TransactionErrorEvent), + /// The client was not capable of keeping track of this transaction. + Dropped(TransactionDroppedEvent), } impl TxStatus { @@ -291,7 +293,7 @@ impl TxStatus { /// [`None`] if the enum variant is not [`TxStatus::InBlock`]. pub fn as_in_block(&self) -> Option<&TxInBlock> { match self { - Self::InBlock(val) => Some(val), + Self::InBlock(val) => val.as_ref(), _ => None, } } @@ -302,14 +304,21 @@ impl TxStatus { #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub struct TxInBlock { block_hash: T::Hash, + index: usize, ext_hash: T::Hash, client: C, } impl> TxInBlock { - pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { + pub(crate) fn new( + block_hash: T::Hash, + index: usize, + ext_hash: T::Hash, + client: C, + ) -> Self { Self { block_hash, + index, ext_hash, client, }