// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . //! Helper for tracking transaction invalidation events. use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf}; use async_trait::async_trait; use futures::{future::Either, Future, FutureExt, Stream, StreamExt}; use pezsp_runtime::traits::Header as _; use relay_utils::{HeaderId, TrackedTransactionStatus}; use std::time::Duration; /// Transaction tracker environment. #[async_trait] pub trait Environment: Send + Sync { /// Returns header id by its hash. async fn header_id_by_hash(&self, hash: HashOf) -> Result, Error>; } // TODO (https://github.com/pezkuwichain/pezkuwi-sdk/issues/232): remove `Environment` trait // after test client is implemented #[async_trait] impl> Environment for T { async fn header_id_by_hash(&self, hash: HashOf) -> Result, Error> { self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash)) } } /// Bizinikiwi transaction tracker implementation. /// /// Bizinikiwi node provides RPC API to submit and watch for transaction events. This way /// we may know when transaction is included into block, finalized or rejected. There are /// some edge cases, when we can't fully trust this mechanism - e.g. transaction may broadcasted /// and then dropped out of node transaction pool (some other cases are also possible - node /// restarts, connection lost, ...). Then we can't know for sure - what is currently happening /// with our transaction. Is the transaction really lost? Is it still alive on the chain network? /// /// We have several options to handle such cases: /// /// 1) hope that the transaction is still alive and wait for its mining until it is spoiled; /// /// 2) assume that the transaction is lost and resubmit another transaction instantly; /// /// 3) wait for some time (if transaction is mortal - then until block where it dies; if it is /// immortal - then for some time that we assume is long enough to mine it) and assume that it is /// lost. /// /// This struct implements third option as it seems to be the most optimal. pub struct TransactionTracker { environment: E, transaction_hash: HashOf, stall_timeout: Duration, subscription: Subscription>, } impl> TransactionTracker { /// Create transaction tracker. pub fn new( environment: E, stall_timeout: Duration, transaction_hash: HashOf, subscription: Subscription>, ) -> Self { Self { environment, stall_timeout, transaction_hash, subscription } } // TODO (https://github.com/pezkuwichain/pezkuwi-sdk/issues/232): remove me after // test client is implemented /// Converts self into tracker with different environment. pub fn switch_environment>( self, environment: NewE, ) -> TransactionTracker { TransactionTracker { environment, stall_timeout: self.stall_timeout, transaction_hash: self.transaction_hash, subscription: self.subscription, } } /// Wait for final transaction status and return it along with last known internal invalidation /// status. async fn do_wait( self, wait_for_stall_timeout: impl Future, wait_for_stall_timeout_rest: impl Future, ) -> (TrackedTransactionStatus>, Option>>) { // sometimes we want to wait for the rest of the stall timeout even if // `wait_for_invalidation` has been "select"ed first => it is shared let wait_for_invalidation = watch_transaction_status::<_, C, _>( self.environment, self.transaction_hash, self.subscription, ); futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation); match futures::future::select(wait_for_stall_timeout, wait_for_invalidation).await { Either::Left((_, _)) => { tracing::trace!( target: "bridge", node=%C::NAME, transaction=?self.transaction_hash, "Transaction is considered lost after timeout (no status response from the node)" ); (TrackedTransactionStatus::Lost, None) }, Either::Right((invalidation_status, _)) => match invalidation_status { InvalidationStatus::Finalized(at_block) => { (TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)) }, InvalidationStatus::Invalid => { (TrackedTransactionStatus::Lost, Some(invalidation_status)) }, InvalidationStatus::Lost => { // wait for the rest of stall timeout - this way we'll be sure that the // transaction is actually dead if it has been crafted properly wait_for_stall_timeout_rest.await; // if someone is still watching for our transaction, then we're reporting // an error here (which is treated as "transaction lost") tracing::trace!( target: "bridge", node=%C::NAME, transaction=?self.transaction_hash, "Transaction is considered lost after timeout" ); (TrackedTransactionStatus::Lost, Some(invalidation_status)) }, }, } } } #[async_trait] impl> relay_utils::TransactionTracker for TransactionTracker { type HeaderId = HeaderIdOf; async fn wait(self) -> TrackedTransactionStatus> { let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared(); let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone(); self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0 } } /// Transaction invalidation status. /// /// Note that in places where the `TransactionTracker` is used, the finalization event will be /// ignored - relay loops are detecting the mining/finalization using their own /// techniques. That's why we're using `InvalidationStatus` here. #[derive(Debug, PartialEq)] enum InvalidationStatus { /// Transaction has been included into block and finalized at given block. Finalized(BlockId), /// Transaction has been invalidated. Invalid, /// We have lost track of transaction status. Lost, } /// Watch for transaction status until transaction is finalized or we lose track of its status. async fn watch_transaction_status< E: Environment, C: Chain, S: Stream>, >( environment: E, transaction_hash: HashOf, subscription: S, ) -> InvalidationStatus> { futures::pin_mut!(subscription); loop { match subscription.next().await { Some(TransactionStatusOf::::Finalized((block_hash, _))) => { // the only "successful" outcome of this method is when the block with transaction // has been finalized tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, block=?block_hash, "Transaction has been finalized" ); let header_id = match environment.header_id_by_hash(block_hash).await { Ok(header_id) => header_id, Err(e) => { tracing::error!( target: "bridge", error=?e, node=%C::NAME, transaction=?transaction_hash, block=?block_hash, "Failed to read header when watching for transaction", ); // that's the best option we have here return InvalidationStatus::Lost; }, }; return InvalidationStatus::Finalized(header_id); }, Some(TransactionStatusOf::::Invalid) => { // if node says that the transaction is invalid, there are still chances that // it is not actually invalid - e.g. if the block where transaction has been // revalidated is retracted and transaction (at some other node pool) becomes // valid again on other fork. But let's assume that the chances of this event // are almost zero - there's a lot of things that must happen for this to be the // case. tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, "Transaction has been invalidated" ); return InvalidationStatus::Invalid; }, Some(TransactionStatusOf::::Future) | Some(TransactionStatusOf::::Ready) | Some(TransactionStatusOf::::Broadcast(_)) => { // nothing important (for us) has happened }, Some(TransactionStatusOf::::InBlock(block_hash)) => { // TODO: read matching system event (ExtrinsicSuccess or ExtrinsicFailed), log it // here and use it later (on finality) for reporting invalid transaction // https://github.com/pezkuwichain/pezkuwi-sdk/issues/79 tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, block=?block_hash, "Transaction has been included" ); }, Some(TransactionStatusOf::::Retracted(block_hash)) => { tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, block=?block_hash, "Transaction has been retracted" ); }, Some(TransactionStatusOf::::FinalityTimeout(block_hash)) => { // finality is lagging? let's wait a bit more and report a stall tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, block=?block_hash, "Transaction has not been finalized for too long" ); return InvalidationStatus::Lost; }, Some(TransactionStatusOf::::Usurped(new_transaction_hash)) => { // this may be result of our transaction resubmitter work or some manual // intervention. In both cases - let's start stall timeout, because the meaning // of transaction may have changed tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, new_transaction=?new_transaction_hash, "Transaction has been usurped" ); return InvalidationStatus::Lost; }, Some(TransactionStatusOf::::Dropped) => { // the transaction has been removed from the pool because of its limits. Let's wait // a bit and report a stall tracing::trace!( target: "bridge", node=%C::NAME, transaction=?transaction_hash, "Transaction has been dropped from the pool" ); return InvalidationStatus::Lost; }, None => { // the status of transaction is unknown to us (the subscription has been closed?). // Let's wait a bit and report a stall return InvalidationStatus::Lost; }, } } } #[cfg(test)] mod tests { use super::*; use crate::{test_chain::TestChain, StreamDescription}; use futures::{FutureExt, SinkExt}; use pezsc_transaction_pool_api::TransactionStatus; struct TestEnvironment(Result, Error>); #[async_trait] impl Environment for TestEnvironment { async fn header_id_by_hash( &self, _hash: HashOf, ) -> Result, Error> { self.0.as_ref().map_err(|_| Error::BridgePalletIsNotInitialized).cloned() } } async fn on_transaction_status( status: TransactionStatus, HashOf>, ) -> Option<( TrackedTransactionStatus>, InvalidationStatus>, )> { let (mut sender, receiver) = futures::channel::mpsc::channel(1); let tx_tracker = TransactionTracker::::new( TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), Subscription::new_forwarded( StreamDescription::new("test".into(), "test".into()), receiver, ), ); // we can't do `.now_or_never()` on `do_wait()` call, because `Subscription` has its own // background thread, which may cause additional async task switches => let's leave some // relatively small timeout here let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100)); let wait_for_stall_timeout_rest = futures::future::ready(()); sender.send(Ok(status)).await.unwrap(); let (ts, is) = tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await; is.map(|is| (ts, is)) } #[async_std::test] async fn returns_finalized_on_finalized() { assert_eq!( on_transaction_status(TransactionStatus::Finalized(Default::default())).await, Some(( TrackedTransactionStatus::Finalized(Default::default()), InvalidationStatus::Finalized(Default::default()) )), ); } #[async_std::test] async fn returns_lost_on_finalized_and_environment_error() { assert_eq!( watch_transaction_status::<_, TestChain, _>( TestEnvironment(Err(Error::BridgePalletIsNotInitialized)), Default::default(), futures::stream::iter([TransactionStatus::Finalized(Default::default())]) ) .now_or_never(), Some(InvalidationStatus::Lost), ); } #[async_std::test] async fn returns_invalid_on_invalid() { assert_eq!( on_transaction_status(TransactionStatus::Invalid).await, Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)), ); } #[async_std::test] async fn waits_on_future() { assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,); } #[async_std::test] async fn waits_on_ready() { assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,); } #[async_std::test] async fn waits_on_broadcast() { assert_eq!( on_transaction_status(TransactionStatus::Broadcast(Default::default())).await, None, ); } #[async_std::test] async fn waits_on_in_block() { assert_eq!( on_transaction_status(TransactionStatus::InBlock(Default::default())).await, None, ); } #[async_std::test] async fn waits_on_retracted() { assert_eq!( on_transaction_status(TransactionStatus::Retracted(Default::default())).await, None, ); } #[async_std::test] async fn lost_on_finality_timeout() { assert_eq!( on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await, Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), ); } #[async_std::test] async fn lost_on_usurped() { assert_eq!( on_transaction_status(TransactionStatus::Usurped(Default::default())).await, Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), ); } #[async_std::test] async fn lost_on_dropped() { assert_eq!( on_transaction_status(TransactionStatus::Dropped).await, Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), ); } #[async_std::test] async fn lost_on_subscription_error() { assert_eq!( watch_transaction_status::<_, TestChain, _>( TestEnvironment(Ok(HeaderId(0, Default::default()))), Default::default(), futures::stream::iter([]) ) .now_or_never(), Some(InvalidationStatus::Lost), ); } #[async_std::test] async fn lost_on_timeout_when_waiting_for_invalidation_status() { let (_sender, receiver) = futures::channel::mpsc::channel(1); let tx_tracker = TransactionTracker::::new( TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), Subscription::new_forwarded( StreamDescription::new("test".into(), "test".into()), receiver, ), ); let wait_for_stall_timeout = futures::future::ready(()).shared(); let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone(); let wait_result = tx_tracker .do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest) .now_or_never(); assert_eq!(wait_result, Some((TrackedTransactionStatus::Lost, None))); } }