Read extrinsic dispatch result for mined transaction (#1582)

* read extrinsic dispatch result for mined transaction

* commit for the history

* Revert "commit for the history"

This reverts commit 99341b04750639db296172cc1432bd70e458ef4b.

* Revert "read extrinsic dispatch result for mined transaction"

This reverts commit 662b776cbf992be9f1637e52f023b782e8c441d1.

* check for successfult transaction in finality relay

* check for successful transaction in parachains relay

* TrackedTransactionStatus ->TrackedTransactionStatus<HeaderId>

* check for successful transaction in messages relay

* fix compilation

* message_lane_loop_is_able_to_recover_from_unsuccessful_transaction

* fixed too-complex-type clippy error

* aaand compilation
This commit is contained in:
Svyatoslav Nikolsky
2022-10-03 09:25:48 +03:00
committed by Bastian Köcher
parent 1abd5cb60d
commit 43afa02372
12 changed files with 460 additions and 112 deletions
@@ -467,7 +467,8 @@ impl<C: Chain> Client<C> {
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Index) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<TransactionTracker<C>> {
) -> Result<TransactionTracker<C, Self>> {
let self_clone = self.clone();
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
let best_header = self.best_header().await?;
@@ -494,6 +495,7 @@ impl<C: Chain> Client<C> {
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
let tracker = TransactionTracker::new(
self_clone,
stall_timeout,
tx_hash,
Subscription(Mutex::new(receiver)),
@@ -16,13 +16,28 @@
//! Helper for tracking transaction invalidation events.
use crate::{Chain, HashOf, Subscription, TransactionStatusOf};
use crate::{Chain, Client, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};
use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
use relay_utils::TrackedTransactionStatus;
use relay_utils::{HeaderId, TrackedTransactionStatus};
use sp_runtime::traits::Header as _;
use std::time::Duration;
/// Transaction tracker environment.
#[async_trait]
pub trait Environment<C: Chain>: Send + Sync {
/// Returns header id by its hash.
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error>;
}
#[async_trait]
impl<C: Chain> Environment<C> for Client<C> {
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error> {
self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash))
}
}
/// Substrate transaction tracker implementation.
///
/// Substrate node provides RPC API to submit and watch for transaction events. This way
@@ -43,20 +58,22 @@ use std::time::Duration;
/// it is lost.
///
/// This struct implements third option as it seems to be the most optimal.
pub struct TransactionTracker<C: Chain> {
pub struct TransactionTracker<C: Chain, E> {
environment: E,
transaction_hash: HashOf<C>,
stall_timeout: Duration,
subscription: Subscription<TransactionStatusOf<C>>,
}
impl<C: Chain> TransactionTracker<C> {
impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
/// Create transaction tracker.
pub fn new(
environment: E,
stall_timeout: Duration,
transaction_hash: HashOf<C>,
subscription: Subscription<TransactionStatusOf<C>>,
) -> Self {
Self { stall_timeout, transaction_hash, subscription }
Self { environment, stall_timeout, transaction_hash, subscription }
}
/// Wait for final transaction status and return it along with last known internal invalidation
@@ -65,10 +82,11 @@ impl<C: Chain> TransactionTracker<C> {
self,
wait_for_stall_timeout: impl Future<Output = ()>,
wait_for_stall_timeout_rest: impl Future<Output = ()>,
) -> (TrackedTransactionStatus, Option<InvalidationStatus>) {
) -> (TrackedTransactionStatus<HeaderIdOf<C>>, Option<InvalidationStatus<HeaderIdOf<C>>>) {
// sometimes we want to wait for the rest of the stall timeout even if
// `wait_for_invalidation` has been "select"ed first => it is shared
let wait_for_invalidation = watch_transaction_status::<C, _>(
let wait_for_invalidation = watch_transaction_status::<_, C, _>(
self.environment,
self.transaction_hash,
self.subscription.into_stream(),
);
@@ -86,8 +104,8 @@ impl<C: Chain> TransactionTracker<C> {
(TrackedTransactionStatus::Lost, None)
},
Either::Right((invalidation_status, _)) => match invalidation_status {
InvalidationStatus::Finalized =>
(TrackedTransactionStatus::Finalized, Some(invalidation_status)),
InvalidationStatus::Finalized(at_block) =>
(TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)),
InvalidationStatus::Invalid =>
(TrackedTransactionStatus::Lost, Some(invalidation_status)),
InvalidationStatus::Lost => {
@@ -111,8 +129,10 @@ impl<C: Chain> TransactionTracker<C> {
}
#[async_trait]
impl<C: Chain> relay_utils::TransactionTracker for TransactionTracker<C> {
async fn wait(self) -> TrackedTransactionStatus {
impl<C: Chain, E: Environment<C>> relay_utils::TransactionTracker for TransactionTracker<C, E> {
type HeaderId = HeaderIdOf<C>;
async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<C>> {
let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared();
let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0
@@ -125,9 +145,9 @@ impl<C: Chain> relay_utils::TransactionTracker for TransactionTracker<C> {
/// ignored - relay loops are detecting the mining/finalization using their own
/// techniques. That's why we're using `InvalidationStatus` here.
#[derive(Debug, PartialEq)]
enum InvalidationStatus {
/// Transaction has been included into block and finalized.
Finalized,
enum InvalidationStatus<BlockId> {
/// Transaction has been included into block and finalized at given block.
Finalized(BlockId),
/// Transaction has been invalidated.
Invalid,
/// We have lost track of transaction status.
@@ -135,10 +155,15 @@ enum InvalidationStatus {
}
/// Watch for transaction status until transaction is finalized or we lose track of its status.
async fn watch_transaction_status<C: Chain, S: Stream<Item = TransactionStatusOf<C>>>(
async fn watch_transaction_status<
E: Environment<C>,
C: Chain,
S: Stream<Item = TransactionStatusOf<C>>,
>(
environment: E,
transaction_hash: HashOf<C>,
subscription: S,
) -> InvalidationStatus {
) -> InvalidationStatus<HeaderIdOf<C>> {
futures::pin_mut!(subscription);
loop {
@@ -153,7 +178,23 @@ async fn watch_transaction_status<C: Chain, S: Stream<Item = TransactionStatusOf
transaction_hash,
block_hash,
);
return InvalidationStatus::Finalized
let header_id = match environment.header_id_by_hash(block_hash).await {
Ok(header_id) => header_id,
Err(e) => {
log::error!(
target: "bridge",
"Failed to read header {:?} when watching for {} transaction {:?}: {:?}",
block_hash,
C::NAME,
transaction_hash,
e,
);
// that's the best option we have here
return InvalidationStatus::Lost
},
};
return InvalidationStatus::Finalized(header_id)
},
Some(TransactionStatusOf::<C>::Invalid) => {
// if node says that the transaction is invalid, there are still chances that
@@ -247,11 +288,27 @@ mod tests {
use futures::{FutureExt, SinkExt};
use sc_transaction_pool_api::TransactionStatus;
struct TestEnvironment(Result<HeaderIdOf<TestChain>, Error>);
#[async_trait]
impl Environment<TestChain> for TestEnvironment {
async fn header_id_by_hash(
&self,
_hash: HashOf<TestChain>,
) -> Result<HeaderIdOf<TestChain>, Error> {
self.0.as_ref().map_err(|_| Error::UninitializedBridgePallet).cloned()
}
}
async fn on_transaction_status(
status: TransactionStatus<HashOf<TestChain>, HashOf<TestChain>>,
) -> Option<(TrackedTransactionStatus, InvalidationStatus)> {
) -> Option<(
TrackedTransactionStatus<HeaderIdOf<TestChain>>,
InvalidationStatus<HeaderIdOf<TestChain>>,
)> {
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain>::new(
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
@@ -270,7 +327,23 @@ mod tests {
async fn returns_finalized_on_finalized() {
assert_eq!(
on_transaction_status(TransactionStatus::Finalized(Default::default())).await,
Some((TrackedTransactionStatus::Finalized, InvalidationStatus::Finalized)),
Some((
TrackedTransactionStatus::Finalized(Default::default()),
InvalidationStatus::Finalized(Default::default())
)),
);
}
#[async_std::test]
async fn returns_lost_on_finalized_and_environment_error() {
assert_eq!(
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Err(Error::UninitializedBridgePallet)),
Default::default(),
futures::stream::iter([TransactionStatus::Finalized(Default::default())])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}
@@ -343,8 +416,12 @@ mod tests {
#[async_std::test]
async fn lost_on_subscription_error() {
assert_eq!(
watch_transaction_status::<TestChain, _>(Default::default(), futures::stream::iter([]))
.now_or_never(),
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Default::default(),
futures::stream::iter([])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}
@@ -352,7 +429,8 @@ mod tests {
#[async_std::test]
async fn lost_on_timeout_when_waiting_for_invalidation_status() {
let (_sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain>::new(
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
+48 -8
View File
@@ -290,15 +290,55 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
// wait till exit signal, or new source block
select! {
transaction_status = last_transaction_tracker => {
if transaction_status == TrackedTransactionStatus::Lost {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
match transaction_status {
TrackedTransactionStatus::Finalized(_) => {
// transaction has been finalized, but it may have been finalized in the "failed" state. So
// let's check if the block number has been actually updated. If it is not, then we are stalled.
//
// please also note that we're restarting the loop if we have failed to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
let last_submitted_header_number = last_submitted_header_number
.expect("always Some when last_transaction_tracker is set;\
last_transaction_tracker is set;\
qed");
if last_submitted_header_number > best_id_at_target.0 {
Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target,
last_submitted_header_number,
))
} else {
Ok(())
}
})
.map_err(|e| {
log::error!(
target: "bridge",
"Failed Finality synchronization from {} to {} has stalled. Transaction failed: {}. \
Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
e,
);
return Err(FailedClient::Both);
FailedClient::Both
})?;
},
TrackedTransactionStatus::Lost => {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
return Err(FailedClient::Both);
},
}
},
_ = async_std::task::sleep(next_tick).fuse() => {},
@@ -48,17 +48,19 @@ type TestNumber = u64;
type TestHash = u64;
#[derive(Clone, Debug)]
struct TestTransactionTracker(TrackedTransactionStatus);
struct TestTransactionTracker(TrackedTransactionStatus<HeaderId<TestHash, TestNumber>>);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized)
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
async fn wait(self) -> TrackedTransactionStatus {
type HeaderId = HeaderId<TestHash, TestNumber>;
async fn wait(self) -> TrackedTransactionStatus<HeaderId<TestHash, TestNumber>> {
self.0
}
}
@@ -224,7 +226,9 @@ fn prepare_test_clients(
target_best_block_id: HeaderId(5, 5),
target_headers: vec![],
target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized),
target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized(
Default::default(),
)),
}));
(
TestSourceClient {
@@ -581,3 +585,13 @@ fn stalls_when_transaction_tracker_returns_error() {
assert_eq!(result, Err(FailedClient::Both));
}
#[test]
fn stalls_when_transaction_tracker_returns_finalized_but_transaction_fails() {
let (_, result) = run_sync_loop(|data| {
data.target_best_block_id = HeaderId(5, 5);
data.target_best_block_id.0 == 16
});
assert_eq!(result, Err(FailedClient::Both));
}
@@ -89,7 +89,7 @@ where
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>,
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;
async fn best_finalized_source_block_id(&self) -> Result<HeaderIdOf<P::SourceChain>, Error> {
// we can't continue to relay finality if target node is out of sync, because
@@ -144,7 +144,7 @@ where
From<<AccountKeyPairOf<P::SourceTransactionSignScheme> as Pair>::Public>,
P::SourceTransactionSignScheme: TransactionSignScheme<Chain = P::SourceChain>,
{
type TransactionTracker = TransactionTracker<P::SourceChain>;
type TransactionTracker = TransactionTracker<P::SourceChain, Client<P::SourceChain>>;
async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
@@ -145,7 +145,7 @@ where
P::TargetTransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;
async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
@@ -86,7 +86,7 @@ where
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error> {
let best_header = self.client.best_header().await?;
+132 -13
View File
@@ -129,7 +129,7 @@ pub struct NoncesSubmitArtifacts<T> {
#[async_trait]
pub trait SourceClient<P: MessageLane>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
type TransactionTracker: TransactionTracker<HeaderId = SourceHeaderIdOf<P>>;
/// Returns state of the client.
async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
@@ -182,7 +182,7 @@ pub trait SourceClient<P: MessageLane>: RelayClient {
#[async_trait]
pub trait TargetClient<P: MessageLane>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
type TransactionTracker: TransactionTracker<HeaderId = TargetHeaderIdOf<P>>;
/// Returns state of the client.
async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
@@ -529,17 +529,19 @@ pub(crate) mod tests {
}
#[derive(Clone, Debug)]
pub struct TestTransactionTracker(TrackedTransactionStatus);
pub struct TestTransactionTracker(TrackedTransactionStatus<TestTargetHeaderId>);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized)
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
async fn wait(self) -> TrackedTransactionStatus {
type HeaderId = TestTargetHeaderId;
async fn wait(self) -> TrackedTransactionStatus<TestTargetHeaderId> {
self.0
}
}
@@ -551,14 +553,14 @@ pub(crate) mod tests {
source_state: SourceClientState<TestMessageLane>,
source_latest_generated_nonce: MessageNonce,
source_latest_confirmed_received_nonce: MessageNonce,
source_tracked_transaction_status: TrackedTransactionStatus,
source_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>,
is_target_fails: bool,
is_target_reconnected: bool,
target_state: SourceClientState<TestMessageLane>,
target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce,
target_tracked_transaction_status: TrackedTransactionStatus,
target_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
submitted_messages_proofs: Vec<TestMessagesProof>,
target_to_source_header_required: Option<TestTargetHeaderId>,
target_to_source_header_requirements: Vec<TestTargetHeaderId>,
@@ -574,14 +576,20 @@ pub(crate) mod tests {
source_state: Default::default(),
source_latest_generated_nonce: 0,
source_latest_confirmed_received_nonce: 0,
source_tracked_transaction_status: TrackedTransactionStatus::Finalized,
source_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
0,
Default::default(),
)),
submitted_messages_receiving_proofs: Vec::new(),
is_target_fails: false,
is_target_reconnected: false,
target_state: Default::default(),
target_latest_received_nonce: 0,
target_latest_confirmed_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Finalized,
target_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
0,
Default::default(),
)),
submitted_messages_proofs: Vec::new(),
target_to_source_header_required: None,
target_to_source_header_requirements: Vec::new(),
@@ -595,6 +603,7 @@ pub(crate) mod tests {
pub struct TestSourceClient {
data: Arc<Mutex<TestClientData>>,
tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
}
impl Default for TestSourceClient {
@@ -602,6 +611,7 @@ pub(crate) mod tests {
TestSourceClient {
data: Arc::new(Mutex::new(TestClientData::default())),
tick: Arc::new(|_| {}),
post_tick: Arc::new(|_| {}),
}
}
}
@@ -615,6 +625,7 @@ pub(crate) mod tests {
let mut data = self.data.lock();
(self.tick)(&mut data);
data.is_source_reconnected = true;
(self.post_tick)(&mut data);
}
Ok(())
}
@@ -630,6 +641,7 @@ pub(crate) mod tests {
if data.is_source_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok(data.source_state.clone())
}
@@ -642,6 +654,7 @@ pub(crate) mod tests {
if data.is_source_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.source_latest_generated_nonce))
}
@@ -651,6 +664,7 @@ pub(crate) mod tests {
) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok((id, data.source_latest_confirmed_received_nonce))
}
@@ -685,6 +699,7 @@ pub(crate) mod tests {
> {
let mut data = self.data.lock();
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok((
id,
nonces.clone(),
@@ -711,6 +726,7 @@ pub(crate) mod tests {
data.source_state.best_finalized_self = data.source_state.best_self;
data.submitted_messages_receiving_proofs.push(proof);
data.source_latest_confirmed_received_nonce = proof;
(self.post_tick)(&mut data);
Ok(TestTransactionTracker(data.source_tracked_transaction_status))
}
@@ -719,6 +735,7 @@ pub(crate) mod tests {
data.target_to_source_header_required = Some(id);
data.target_to_source_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
}
async fn estimate_confirmation_transaction(&self) -> TestSourceChainBalance {
@@ -730,6 +747,7 @@ pub(crate) mod tests {
pub struct TestTargetClient {
data: Arc<Mutex<TestClientData>>,
tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
}
impl Default for TestTargetClient {
@@ -737,6 +755,7 @@ pub(crate) mod tests {
TestTargetClient {
data: Arc::new(Mutex::new(TestClientData::default())),
tick: Arc::new(|_| {}),
post_tick: Arc::new(|_| {}),
}
}
}
@@ -750,6 +769,7 @@ pub(crate) mod tests {
let mut data = self.data.lock();
(self.tick)(&mut data);
data.is_target_reconnected = true;
(self.post_tick)(&mut data);
}
Ok(())
}
@@ -765,6 +785,7 @@ pub(crate) mod tests {
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok(data.target_state.clone())
}
@@ -777,6 +798,7 @@ pub(crate) mod tests {
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.target_latest_received_nonce))
}
@@ -804,6 +826,7 @@ pub(crate) mod tests {
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.target_latest_confirmed_received_nonce))
}
@@ -834,6 +857,7 @@ pub(crate) mod tests {
target_latest_confirmed_received_nonce;
}
data.submitted_messages_proofs.push(proof);
(self.post_tick)(&mut data);
Ok(NoncesSubmitArtifacts {
nonces,
tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status),
@@ -845,6 +869,7 @@ pub(crate) mod tests {
data.source_to_target_header_required = Some(id);
data.source_to_target_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
}
async fn estimate_delivery_transaction_in_source_tokens(
@@ -863,14 +888,24 @@ pub(crate) mod tests {
fn run_loop_test(
data: TestClientData,
source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
source_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> TestClientData {
async_std::task::block_on(async {
let data = Arc::new(Mutex::new(data));
let source_client = TestSourceClient { data: data.clone(), tick: source_tick };
let target_client = TestTargetClient { data: data.clone(), tick: target_tick };
let source_client = TestSourceClient {
data: data.clone(),
tick: source_tick,
post_tick: source_post_tick,
};
let target_client = TestTargetClient {
data: data.clone(),
tick: target_tick,
post_tick: target_post_tick,
};
let _ = run(
Params {
lane: [0, 0, 0, 0],
@@ -928,6 +963,7 @@ pub(crate) mod tests {
data.is_target_fails = true;
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.is_target_fails = false;
@@ -942,6 +978,7 @@ pub(crate) mod tests {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
@@ -976,24 +1013,104 @@ pub(crate) mod tests {
},
Arc::new(move |data: &mut TestClientData| {
if data.is_source_reconnected {
data.source_tracked_transaction_status = TrackedTransactionStatus::Finalized;
data.source_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
source_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.target_tracked_transaction_status = TrackedTransactionStatus::Finalized;
data.target_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
target_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert!(result.is_source_reconnected);
}
#[test]
fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
// with this configuration, both source and target clients will mine their transactions, but
// their corresponding nonce won't be udpated => reconnect will happen
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 1,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
..Default::default()
},
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
// syncing target headers -> source chain
if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
if *last_requirement != data.source_state.best_finalized_peer_at_best_self {
data.source_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
}),
Arc::new(move |data: &mut TestClientData| {
// if it is the first time we're submitting delivery proof, let's revert changes
// to source status => then the delivery confirmation transaction is "finalized",
// but the state is not altered
if data.submitted_messages_receiving_proofs.len() == 1 {
data.source_latest_confirmed_received_nonce = 0;
}
}),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
// syncing source headers -> target chain
if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
if *last_requirement != data.target_state.best_finalized_peer_at_best_self {
data.target_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
// if source has received all messages receiving confirmations => stop
if data.source_latest_confirmed_received_nonce == 1 {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(move |data: &mut TestClientData| {
// if it is the first time we're submitting messages proof, let's revert changes
// to target status => then the messages delivery transaction is "finalized", but
// the state is not altered
if data.submitted_messages_proofs.len() == 1 {
data.target_latest_received_nonce = 0;
data.target_latest_confirmed_received_nonce = 0;
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert!(result.is_source_reconnected);
assert_eq!(result.submitted_messages_proofs.len(), 2);
assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
}
#[test]
@@ -1037,6 +1154,7 @@ pub(crate) mod tests {
}
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.target_state.best_self =
@@ -1061,6 +1179,7 @@ pub(crate) mod tests {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
@@ -128,7 +128,7 @@ pub trait TargetClient<P: MessageRace> {
/// Type of the additional data from the target client, used by the race.
type TargetNoncesData: std::fmt::Debug;
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
type TransactionTracker: TransactionTracker<HeaderId = P::TargetHeaderId>;
/// Ask headers relay to relay finalized headers up to (and including) given header
/// from race source to race target.
@@ -419,17 +419,49 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
).fail_if_error(FailedClient::Target).map(|_| true)?;
},
target_transaction_status = target_tx_tracker => {
if target_transaction_status == TrackedTransactionStatus::Lost {
log::warn!(
target: "bridge",
"{} -> {} race has stalled. State: {:?}. Strategy: {:?}",
P::source_name(),
P::target_name(),
race_state,
strategy,
);
match (target_transaction_status, race_state.nonces_submitted.as_ref()) {
(TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => {
// our transaction has been mined, but was it successful or not? let's check the best
// nonce at the target node.
race_target.nonces(at_block, false)
.await
.map_err(|e| format!("failed to read nonces from target node: {:?}", e))
.and_then(|(_, nonces_at_target)| {
if nonces_at_target.latest_nonce < *nonces_submitted.end() {
Err(format!(
"best nonce at target after tx is {:?} and we've submitted {:?}",
nonces_at_target.latest_nonce,
nonces_submitted.end(),
))
} else {
Ok(())
}
})
.map_err(|e| {
log::error!(
target: "bridge",
"{} -> {} race has stalled. Transaction failed: {}. Going to restart",
P::source_name(),
P::target_name(),
e,
);
return Err(FailedClient::Both);
FailedClient::Both
})?;
},
(TrackedTransactionStatus::Lost, _) => {
log::warn!(
target: "bridge",
"{} -> {} race has stalled. State: {:?}. Strategy: {:?}",
P::source_name(),
P::target_name(),
race_state,
strategy,
);
return Err(FailedClient::Both);
},
_ => (),
}
},
+104 -44
View File
@@ -124,7 +124,7 @@ pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
@@ -260,13 +260,13 @@ where
// check if our transaction has been mined
if let Some(tracker) = submitted_heads_tracker.take() {
match tracker.update(&heads_at_target).await {
match tracker.update(&best_target_block, &heads_at_target).await {
SubmittedHeadsStatus::Waiting(tracker) => {
// no news about our transaction and we shall keep waiting
submitted_heads_tracker = Some(tracker);
continue
},
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized) => {
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
// all heads have been updated, we don't need this tracker anymore
},
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) => {
@@ -529,9 +529,24 @@ enum SubmittedHeadsStatus<P: ParachainsPipeline> {
/// Heads are not yet updated.
Waiting(SubmittedHeadsTracker<P>),
/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
Final(TrackedTransactionStatus),
Final(TrackedTransactionStatus<HeaderIdOf<P::TargetChain>>),
}
/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
type SharedTransactionTracker<P> = Shared<
Pin<
Box<
dyn Future<
Output = TrackedTransactionStatus<
HeaderIdOf<<P as ParachainsPipeline>::TargetChain>,
>,
> + Send,
>,
>,
>;
/// Submitted parachain heads transaction.
struct SubmittedHeadsTracker<P: ParachainsPipeline> {
/// Ids of parachains which heads were updated in the tracked transaction.
@@ -541,7 +556,7 @@ struct SubmittedHeadsTracker<P: ParachainsPipeline> {
/// Future that waits for submitted transaction finality or loss.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
transaction_tracker: Shared<Pin<Box<dyn Future<Output = TrackedTransactionStatus> + Send>>>,
transaction_tracker: SharedTransactionTracker<P>,
}
impl<P: ParachainsPipeline> SubmittedHeadsTracker<P>
@@ -552,7 +567,7 @@ where
pub fn new(
awaiting_update: impl IntoIterator<Item = ParaId>,
relay_block_number: BlockNumberOf<P::SourceChain>,
transaction_tracker: impl TransactionTracker + 'static,
transaction_tracker: impl TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>> + 'static,
) -> Self {
SubmittedHeadsTracker {
awaiting_update: awaiting_update.into_iter().collect(),
@@ -564,6 +579,7 @@ where
/// Returns `None` if all submitted parachain heads have been updated.
pub async fn update(
mut self,
at_target_block: &HeaderIdOf<P::TargetChain>,
heads_at_target: &BTreeMap<ParaId, Option<BestParaHeadHash>>,
) -> SubmittedHeadsStatus<P> {
// remove all pending heads that were synced
@@ -590,14 +606,23 @@ where
// if we have synced all required heads, we are done
if self.awaiting_update.is_empty() {
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized)
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(
*at_target_block,
))
}
// if underlying transaction tracker has reported that the transaction is lost, we may
// then restart our sync
let transaction_tracker = self.transaction_tracker.clone();
if let Poll::Ready(TrackedTransactionStatus::Lost) = poll!(transaction_tracker) {
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost)
match poll!(transaction_tracker) {
Poll::Ready(TrackedTransactionStatus::Lost) =>
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost),
Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
// so we are here and our transaction is mined+finalized, but some of heads were not
// updated => we're considering our loop as stalled
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost)
},
_ => (),
}
SubmittedHeadsStatus::Waiting(self)
@@ -644,12 +669,17 @@ mod tests {
}
#[derive(Clone, Debug)]
struct TestTransactionTracker(TrackedTransactionStatus);
struct TestTransactionTracker(Option<TrackedTransactionStatus<HeaderIdOf<TestChain>>>);
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
async fn wait(self) -> TrackedTransactionStatus {
self.0
type HeaderId = HeaderIdOf<TestChain>;
async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<TestChain>> {
match self.0 {
Some(status) => status,
None => futures::future::pending().await,
}
}
}
@@ -785,7 +815,9 @@ mod tests {
if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
exit_signal_sender.send(()).await.unwrap();
}
Ok(TestTransactionTracker(TrackedTransactionStatus::Finalized))
Ok(TestTransactionTracker(Some(
TrackedTransactionStatus::Finalized(Default::default()),
)))
}
}
@@ -938,10 +970,31 @@ mod tests {
SubmittedHeadsTracker::new(
vec![ParaId(PARA_ID), ParaId(PARA_1_ID)],
SOURCE_BLOCK_NUMBER,
TestTransactionTracker(TrackedTransactionStatus::Finalized),
TestTransactionTracker(None),
)
}
fn all_expected_tracker_heads() -> BTreeMap<ParaId, Option<BestParaHeadHash>> {
vec![
(
ParaId(PARA_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
}),
),
(
ParaId(PARA_1_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
}),
),
]
.into_iter()
.collect()
}
impl From<SubmittedHeadsStatus<TestParachainsPipeline>> for Option<BTreeSet<ParaId>> {
fn from(status: SubmittedHeadsStatus<TestParachainsPipeline>) -> Option<BTreeSet<ParaId>> {
match status {
@@ -955,7 +1008,10 @@ mod tests {
async fn tx_tracker_update_when_nothing_is_updated() {
assert_eq!(
Some(test_tx_tracker().awaiting_update),
test_tx_tracker().update(&vec![].into_iter().collect()).await.into(),
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &vec![].into_iter().collect())
.await
.into(),
);
}
@@ -965,6 +1021,7 @@ mod tests {
Some(test_tx_tracker().awaiting_update),
test_tx_tracker()
.update(
&HeaderId(0, Default::default()),
&vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash {
@@ -986,6 +1043,7 @@ mod tests {
Some(vec![ParaId(PARA_1_ID)].into_iter().collect::<BTreeSet<_>>()),
test_tx_tracker()
.update(
&HeaderId(0, Default::default()),
&vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash {
@@ -1006,50 +1064,52 @@ mod tests {
assert_eq!(
Option::<BTreeSet<_>>::None,
test_tx_tracker()
.update(
&vec![
(
ParaId(PARA_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
})
),
(
ParaId(PARA_1_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
})
),
]
.into_iter()
.collect()
)
.update(&HeaderId(0, Default::default()), &all_expected_tracker_heads())
.await
.into(),
);
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_stalled() {
async fn tx_tracker_update_when_tx_is_lost() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared();
assert_eq!(
Option::<BTreeSet<_>>::None,
tx_tracker.update(&vec![].into_iter().collect()).await.into(),
);
assert!(matches!(
tx_tracker
.update(&HeaderId(0, Default::default()), &vec![].into_iter().collect())
.await,
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost),
));
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_finalized() {
async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Finalized).boxed().shared();
futures::future::ready(TrackedTransactionStatus::Finalized(Default::default()))
.boxed()
.shared();
assert!(matches!(
tx_tracker.update(&vec![].into_iter().collect()).await,
SubmittedHeadsStatus::Waiting(_),
tx_tracker
.update(&HeaderId(0, Default::default()), &vec![].into_iter().collect())
.await,
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost),
));
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_finalized_and_heads_are_updated() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Finalized(Default::default()))
.boxed()
.shared();
assert!(matches!(
tx_tracker
.update(&HeaderId(0, Default::default()), &all_expected_tracker_heads())
.await,
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(_)),
));
}
+7 -4
View File
@@ -122,18 +122,21 @@ pub trait MaybeConnectionError {
/// Final status of the tracked transaction.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TrackedTransactionStatus {
pub enum TrackedTransactionStatus<BlockId> {
/// Transaction has been lost.
Lost,
/// Transaction has been mined and finalized.
Finalized,
/// Transaction has been mined and finalized at given block.
Finalized(BlockId),
}
/// Transaction tracker.
#[async_trait]
pub trait TransactionTracker: Send {
/// Header id, used by the chain.
type HeaderId: Clone + Send;
/// Wait until transaction is either finalized or invalidated/lost.
async fn wait(self) -> TrackedTransactionStatus;
async fn wait(self) -> TrackedTransactionStatus<Self::HeaderId>;
}
/// Stringified error that may be either connection-related or not.