tx: Adjust transactions to use the new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-12-14 17:42:52 +00:00
parent 0e893cfe1a
commit e6271dd94e
4 changed files with 62 additions and 55 deletions
+1 -1
View File
@@ -130,7 +130,7 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
use subxt::tx::TxStatus::*;
// Made it into a block, but not finalized.
if let InBlock(details) = ev {
if let InBlock(Some(details)) = ev {
println!(
"Transaction {:?} made it into block {:?}",
details.extrinsic_hash(),
+4 -3
View File
@@ -46,6 +46,7 @@ use super::{
ChainHeadResult,
FollowEvent,
RuntimeEvent,
TransactionEvent,
},
RpcClient,
RpcClientT,
@@ -984,15 +985,15 @@ impl<T: Config> Rpc<T> {
pub async fn watch_extrinsic<X: Encode>(
&self,
extrinsic: X,
) -> Result<Subscription<SubstrateTxStatus<T::Hash, T::Hash>>, Error> {
) -> Result<Subscription<TransactionEvent<T::Hash>>, Error> {
let bytes: Bytes = extrinsic.encode().into();
let params = rpc_params![bytes];
let subscription = self
.client
.subscribe(
"author_submitAndWatchExtrinsic",
"transaction_unstable_submitAndWatch",
params,
"author_unwatchExtrinsic",
"transaction_unstable_unwatch",
)
.await?;
Ok(subscription)
+1 -4
View File
@@ -380,10 +380,7 @@ impl<Hash> From<TransactionEventIR<Hash>> for TransactionEvent<Hash> {
/// Serialize and deserialize helper as string.
mod as_string {
use super::*;
use serde::{
Deserializer,
Serializer,
};
use serde::Deserializer;
pub fn deserialize<'de, D: Deserializer<'de>>(
deserializer: D,
+56 -47
View File
@@ -16,8 +16,12 @@ use crate::{
},
events::EventsClient,
rpc::{
subscription_events::{
TransactionDropped as TransactionDroppedEvent,
TransactionError as TransactionErrorEvent,
TransactionEvent,
},
Subscription,
SubstrateTxStatus,
},
Config,
};
@@ -34,7 +38,7 @@ pub use sp_runtime::traits::SignedExtension;
#[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub struct TxProgress<T: Config, C> {
sub: Option<Subscription<SubstrateTxStatus<T::Hash, T::Hash>>>,
sub: Option<Subscription<TransactionEvent<T::Hash>>>,
ext_hash: T::Hash,
client: C,
}
@@ -47,7 +51,7 @@ impl<T: Config, C> Unpin for TxProgress<T, C> {}
impl<T: Config, C> TxProgress<T, C> {
/// Instantiate a new [`TxProgress`] from a custom subscription.
pub fn new(
sub: Subscription<SubstrateTxStatus<T::Hash, T::Hash>>,
sub: Subscription<TransactionEvent<T::Hash>>,
client: C,
ext_hash: T::Hash,
) -> Self {
@@ -91,9 +95,9 @@ where
while let Some(status) = self.next_item().await {
match status? {
// Finalized or otherwise in a block! Return.
TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s),
TxStatus::InBlock(Some(s)) | TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => {
TxStatus::Error(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
}
// Ignore anything else and wait for next status event:
@@ -119,7 +123,7 @@ where
// Finalized! Return.
TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => {
TxStatus::Error(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
}
// Ignore and wait for next status event:
@@ -162,20 +166,22 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
sub.poll_next_unpin(cx).map_ok(|status| {
match status {
SubstrateTxStatus::Future => TxStatus::Future,
SubstrateTxStatus::Ready => TxStatus::Ready,
SubstrateTxStatus::Broadcast(peers) => TxStatus::Broadcast(peers),
SubstrateTxStatus::InBlock(hash) => {
TxStatus::InBlock(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
TransactionEvent::Validated => TxStatus::Validated,
TransactionEvent::Broadcasted(broadcasted) => {
TxStatus::Broadcast(broadcasted.num_peers)
}
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash),
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash),
SubstrateTxStatus::Dropped => TxStatus::Dropped,
SubstrateTxStatus::Invalid => TxStatus::Invalid,
TransactionEvent::BestChainBlockIncluded(block) => {
TxStatus::InBlock(block.map(|block| {
TxInBlock::new(
block.hash,
block.index,
self.ext_hash,
self.client.clone(),
)
}))
}
TransactionEvent::Invalid(error) => TxStatus::Invalid(error),
TransactionEvent::Dropped(dropped) => TxStatus::Dropped(dropped),
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TxStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
@@ -185,14 +191,15 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTxStatus::FinalityTimeout(hash) => {
TransactionEvent::Error(error) => {
self.sub = None;
TxStatus::FinalityTimeout(hash)
TxStatus::Error(error)
}
SubstrateTxStatus::Finalized(hash) => {
TransactionEvent::Finalized(block) => {
self.sub = None;
TxStatus::Finalized(TxInBlock::new(
hash,
block.hash,
block.index,
self.ext_hash,
self.client.clone(),
))
@@ -251,30 +258,25 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
#[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub enum TxStatus<T: Config, C> {
/// The transaction is part of the "future" queue.
Future,
/// The transaction is part of the "ready" queue.
Ready,
/// The transaction has been broadcast to the given peers.
Broadcast(Vec<String>),
/// The transaction has been included in a block with given hash.
InBlock(TxInBlock<T, C>),
/// The block this transaction was included in has been retracted,
/// probably because it did not make it onto the blocks which were
/// finalized.
Retracted(T::Hash),
/// A block containing the transaction did not reach finality within 512
/// blocks, and so the subscription has ended.
FinalityTimeout(T::Hash),
/// The transaction was validated by the runtime.
Validated,
/// The transaction has been broadcast to a number of peers.
Broadcast(usize),
/// The transaction has been included in a best block with given hash and index.
///
/// # Note
///
/// This may contain `None` if the block is no longer a best
/// block of the chain.
InBlock(Option<TxInBlock<T, C>>),
/// The transaction has been finalized by a finality-gadget, e.g GRANDPA.
Finalized(TxInBlock<T, C>),
/// The transaction has been replaced in the pool by another transaction
/// that provides the same tags. (e.g. same (sender, nonce)).
Usurped(T::Hash),
/// The transaction has been dropped from the pool because of the limit.
Dropped,
/// The transaction is no longer valid in the current state.
Invalid,
/// The transaction could not be processed due to an error.
Error(TransactionErrorEvent),
/// The transaction is marked as invalid.
Invalid(TransactionErrorEvent),
/// The client was not capable of keeping track of this transaction.
Dropped(TransactionDroppedEvent),
}
impl<T: Config, C> TxStatus<T, C> {
@@ -291,7 +293,7 @@ impl<T: Config, C> TxStatus<T, C> {
/// [`None`] if the enum variant is not [`TxStatus::InBlock`].
pub fn as_in_block(&self) -> Option<&TxInBlock<T, C>> {
match self {
Self::InBlock(val) => Some(val),
Self::InBlock(val) => val.as_ref(),
_ => None,
}
}
@@ -302,14 +304,21 @@ impl<T: Config, C> TxStatus<T, C> {
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub struct TxInBlock<T: Config, C> {
block_hash: T::Hash,
index: usize,
ext_hash: T::Hash,
client: C,
}
impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self {
pub(crate) fn new(
block_hash: T::Hash,
index: usize,
ext_hash: T::Hash,
client: C,
) -> Self {
Self {
block_hash,
index,
ext_hash,
client,
}