wait_for_finalized behavior if the tx dropped, usurped or invalid (#897)

* consider dropped, invalid and usurped as finalized

* test structure

* unit tests

* move tests to tx_progress file

* integrate pr review comments

* integrate pr review comments (including revert)
This commit is contained in:
Tadeo Hepperle
2023-04-06 13:23:56 +02:00
committed by GitHub
parent 8a7c17289a
commit a69b3e45e7
3 changed files with 190 additions and 31 deletions
+10 -1
View File
@@ -119,11 +119,20 @@ pub enum TransactionError {
/// The finality subscription expired (after ~512 blocks we give up if the /// The finality subscription expired (after ~512 blocks we give up if the
/// block hasn't yet been finalized). /// block hasn't yet been finalized).
#[error("The finality subscription expired")] #[error("The finality subscription expired")]
FinalitySubscriptionTimeout, FinalityTimeout,
/// The block hash that the transaction was added to could not be found. /// The block hash that the transaction was added to could not be found.
/// This is probably because the block was retracted before being finalized. /// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")] #[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
BlockNotFound, BlockNotFound,
/// The transaction was deemed invalid in the current chain state.
#[error("The transaction is no longer valid")]
Invalid,
/// The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority
#[error("The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority.")]
Usurped,
/// The transaction was dropped because of some limit
#[error("The transaction was dropped from the pool because of a limit.")]
Dropped,
} }
/// Something went wrong trying to encode a storage address. /// Something went wrong trying to encode a storage address.
+2 -1
View File
@@ -166,7 +166,8 @@ impl<Res> std::fmt::Debug for Subscription<Res> {
} }
impl<Res> Subscription<Res> { impl<Res> Subscription<Res> {
fn new(inner: RpcSubscription) -> Self { /// Creates a new [`Subscription`].
pub fn new(inner: RpcSubscription) -> Self {
Self { Self {
inner, inner,
_marker: std::marker::PhantomData, _marker: std::marker::PhantomData,
+178 -29
View File
@@ -69,10 +69,10 @@ where
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TxProgress::next_item()`] instead. /// transaction progresses, use [`TxProgress::next_item()`] instead.
/// ///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// may well indicate with some probability that the transaction will not make it into a block, /// probability that the transaction will not make it into a block but there is no guarantee
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower /// that this is true. In those cases the stream is closed however, so you currently have no way to find
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. /// out if they finally made it into a block or not.
pub async fn wait_for_in_block(mut self) -> Result<TxInBlock<T, C>, Error> { pub async fn wait_for_in_block(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await { while let Some(status) = self.next_item().await {
match status? { match status? {
@@ -80,8 +80,11 @@ where
TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s), TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error. // Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => { TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into()) return Err(TransactionError::FinalityTimeout.into())
} }
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
TxStatus::Dropped => return Err(TransactionError::Dropped.into()),
// Ignore anything else and wait for next status event: // Ignore anything else and wait for next status event:
_ => continue, _ => continue,
} }
@@ -95,10 +98,10 @@ where
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TxProgress::next_item()`] instead. /// transaction progresses, use [`TxProgress::next_item()`] instead.
/// ///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// may well indicate with some probability that the transaction will not make it into a block, /// probability that the transaction will not make it into a block but there is no guarantee
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower /// that this is true. In those cases the stream is closed however, so you currently have no way to find
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. /// out if they finally made it into a block or not.
pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, Error> { pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await { while let Some(status) = self.next_item().await {
match status? { match status? {
@@ -106,8 +109,11 @@ where
TxStatus::Finalized(s) => return Ok(s), TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error. // Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => { TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into()) return Err(TransactionError::FinalityTimeout.into())
} }
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
TxStatus::Dropped => return Err(TransactionError::Dropped.into()),
// Ignore and wait for next status event: // Ignore and wait for next status event:
_ => continue, _ => continue,
} }
@@ -122,10 +128,10 @@ where
/// **Note:** consumes self. If you'd like to perform multiple actions as progress is made, /// **Note:** consumes self. If you'd like to perform multiple actions as progress is made,
/// use [`TxProgress::next_item()`] instead. /// use [`TxProgress::next_item()`] instead.
/// ///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// may well indicate with some probability that the transaction will not make it into a block, /// probability that the transaction will not make it into a block but there is no guarantee
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower /// that this is true. In those cases the stream is closed however, so you currently have no way to find
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. /// out if they finally made it into a block or not.
pub async fn wait_for_finalized_success( pub async fn wait_for_finalized_success(
self, self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> { ) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
@@ -134,7 +140,7 @@ where
} }
} }
impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> { impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
type Item = Result<TxStatus<T, C>, Error>; type Item = Result<TxStatus<T, C>, Error>;
fn poll_next( fn poll_next(
@@ -155,13 +161,19 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone())) TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
} }
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash),
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), // Only the following statuses are considered "final", in a sense that they end the stream (see the substrate
SubstrateTxStatus::Dropped => TxStatus::Dropped, // docs on `TxStatus`):
SubstrateTxStatus::Invalid => TxStatus::Invalid, //
// Only the following statuses are actually considered "final" (see the substrate // - Usurped
// docs on `TxStatus`). Basically, either the transaction makes it into a // - Finalized
// block, or we eventually give up on waiting for it to make it into a block. // - FinalityTimeout
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. // - Invalid
// - Dropped
//
// Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually,
// the server considers them final and closes the connection, when they are encountered.
// In those cases the stream is closed however, so you currently have no way to find
// out if they finally made it into a block or not.
// //
// As an example, a transaction that is `Invalid` on one node due to having the wrong // As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized. // nonce might still be valid on some fork on another node which ends up being finalized.
@@ -175,6 +187,18 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
self.sub = None; self.sub = None;
TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone())) TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
} }
SubstrateTxStatus::Usurped(hash) => {
self.sub = None;
TxStatus::Usurped(hash)
}
SubstrateTxStatus::Dropped => {
self.sub = None;
TxStatus::Dropped
}
SubstrateTxStatus::Invalid => {
self.sub = None;
TxStatus::Invalid
}
} }
}) })
} }
@@ -209,6 +233,12 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
/// there might be cases where transactions alternate between `Future` and `Ready` /// there might be cases where transactions alternate between `Future` and `Ready`
/// pool, and are `Broadcast` in the meantime. /// pool, and are `Broadcast` in the meantime.
/// ///
/// You are free to unsubscribe from notifications at any point.
/// The first one will be emitted when the block in which the transaction was included gets
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality
/// within 512 blocks. This either indicates that finality is not available for your chain,
/// or that finality gadget is lagging behind.
///
/// Note that there are conditions that may cause transactions to reappear in the pool: /// Note that there are conditions that may cause transactions to reappear in the pool:
/// ///
/// 1. Due to possible forks, the transaction that ends up being included /// 1. Due to possible forks, the transaction that ends up being included
@@ -220,12 +250,17 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
/// pool about such cases). /// pool about such cases).
/// 4. `Retracted` transactions might be included in a future block. /// 4. `Retracted` transactions might be included in a future block.
/// ///
/// The stream is considered finished only when either the `Finalized` or `FinalityTimeout` /// Even though these cases can happen, the server-side of the stream is closed, if one of the following is encountered:
/// event is triggered. You are however free to unsubscribe from notifications at any point. /// - Usurped
/// The first one will be emitted when the block in which the transaction was included gets /// - Finalized
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality /// - FinalityTimeout
/// within 512 blocks. This either indicates that finality is not available for your chain, /// - Invalid
/// or that finality gadget is lagging behind. /// - Dropped
///
/// In any of these cases the client side TxProgress stream is also closed.
/// In those cases the stream is closed however, so you currently have no way to find
/// out if they finally made it into a block or not.
#[derive(Derivative)] #[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))] #[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub enum TxStatus<T: Config, C> { pub enum TxStatus<T: Config, C> {
@@ -284,7 +319,7 @@ pub struct TxInBlock<T: Config, C> {
client: C, client: C,
} }
impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> { impl<T: Config, C> TxInBlock<T, C> {
pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self {
Self { Self {
block_hash, block_hash,
@@ -302,7 +337,9 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
pub fn extrinsic_hash(&self) -> T::Hash { pub fn extrinsic_hash(&self) -> T::Hash {
self.ext_hash self.ext_hash
} }
}
impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
/// Fetch the events associated with this transaction. If the transaction /// Fetch the events associated with this transaction. If the transaction
/// was successful (ie no `ExtrinsicFailed`) events were found, then we return /// was successful (ie no `ExtrinsicFailed`) events were found, then we return
/// the events associated with it. If the transaction was not successful, or /// the events associated with it. If the transaction was not successful, or
@@ -370,3 +407,115 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
)) ))
} }
} }
#[cfg(test)]
mod test {
use std::pin::Pin;
use futures::Stream;
use crate::{
client::{OfflineClientT, OnlineClientT},
config::{
extrinsic_params::BaseExtrinsicParams,
polkadot::{PlainTip, PolkadotConfig},
WithExtrinsicParams,
},
error::RpcError,
rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription},
tx::TxProgress,
Config, Error, SubstrateConfig,
};
use serde_json::value::RawValue;
#[derive(Clone, Debug)]
struct MockClient;
impl OfflineClientT<PolkadotConfig> for MockClient {
fn metadata(&self) -> crate::Metadata {
panic!("just a mock impl to satisfy trait bounds")
}
fn genesis_hash(&self) -> <PolkadotConfig as crate::Config>::Hash {
panic!("just a mock impl to satisfy trait bounds")
}
fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion {
panic!("just a mock impl to satisfy trait bounds")
}
}
type MockTxProgress = TxProgress<PolkadotConfig, MockClient>;
type MockHash = <WithExtrinsicParams<
SubstrateConfig,
BaseExtrinsicParams<SubstrateConfig, PlainTip>,
> as Config>::Hash;
type MockSubstrateTxStatus = SubstrateTxStatus<MockHash, MockHash>;
impl OnlineClientT<PolkadotConfig> for MockClient {
fn rpc(&self) -> &crate::rpc::Rpc<PolkadotConfig> {
panic!("just a mock impl to satisfy trait bounds")
}
}
#[tokio::test]
async fn wait_for_finalized_returns_err_when_usurped() {
let tx_progress = mock_tx_progress(vec![
SubstrateTxStatus::Ready,
SubstrateTxStatus::Usurped(Default::default()),
]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Usurped))
));
}
#[tokio::test]
async fn wait_for_finalized_returns_err_when_dropped() {
let tx_progress =
mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Dropped))
));
}
#[tokio::test]
async fn wait_for_finalized_returns_err_when_invalid() {
let tx_progress =
mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Invalid))
));
}
fn mock_tx_progress(statuses: Vec<MockSubstrateTxStatus>) -> MockTxProgress {
let sub = create_substrate_tx_status_subscription(statuses);
TxProgress::new(sub, MockClient, Default::default())
}
fn create_substrate_tx_status_subscription(
elements: Vec<MockSubstrateTxStatus>,
) -> Subscription<MockSubstrateTxStatus> {
let rpc_substription_stream: Pin<
Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>,
> = Box::pin(futures::stream::iter(elements.into_iter().map(|e| {
let s = serde_json::to_string(&e).unwrap();
let r: Box<RawValue> = RawValue::from_string(s).unwrap();
Ok(r)
})));
let rpc_subscription: RpcSubscription = RpcSubscription {
stream: rpc_substription_stream,
id: None,
};
let sub: Subscription<MockSubstrateTxStatus> = Subscription::new(rpc_subscription);
sub
}
}