use transaction tracker in messages relay (#1581)

This commit is contained in:
Svyatoslav Nikolsky
2022-09-23 12:02:59 +03:00
committed by Bastian Köcher
parent 86be60ad40
commit e534e90193
8 changed files with 176 additions and 106 deletions
@@ -88,30 +88,3 @@ pub fn transaction_stall_timeout(
.map(|mortality_period| average_block_interval.saturating_mul(mortality_period + 1 + 1)) .map(|mortality_period| average_block_interval.saturating_mul(mortality_period + 1 + 1))
.unwrap_or(default_stall_timeout) .unwrap_or(default_stall_timeout)
} }
/// Returns stall timeout for relay loop that submit transactions to two chains.
///
/// Bidirectional relay may have two active transactions. Even if one of them has been spoiled, we
/// can't just restart the loop - the other transaction may still be alive and we'll be submitting
/// duplicate transaction, which may result in funds loss. So we'll be selecting maximal mortality
/// for choosing loop stall timeout.
pub fn bidirectional_transaction_stall_timeout(
left_mortality_period: Option<u32>,
right_mortality_period: Option<u32>,
left_average_block_interval: Duration,
right_average_block_interval: Duration,
default_stall_timeout: Duration,
) -> Duration {
std::cmp::max(
transaction_stall_timeout(
left_mortality_period,
left_average_block_interval,
default_stall_timeout,
),
transaction_stall_timeout(
right_mortality_period,
right_average_block_interval,
default_stall_timeout,
),
)
}
@@ -164,13 +164,6 @@ where
{ {
let source_client = params.source_client; let source_client = params.source_client;
let target_client = params.target_client; let target_client = params.target_client;
let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout(
params.source_transaction_params.mortality,
params.target_transaction_params.mortality,
P::SourceChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
STALL_TIMEOUT,
);
let relayer_id_at_source: AccountIdOf<P::SourceChain> = let relayer_id_at_source: AccountIdOf<P::SourceChain> =
params.source_transaction_params.signer.public().into(); params.source_transaction_params.signer.public().into();
@@ -202,8 +195,7 @@ where
Max messages in single transaction: {}\n\t\ Max messages in single transaction: {}\n\t\
Max messages size in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\
Max messages weight in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\
Tx mortality: {:?} (~{}m)/{:?} (~{}m)\n\t\ Tx mortality: {:?} (~{}m)/{:?} (~{}m)",
Stall timeout: {:?}",
P::SourceChain::NAME, P::SourceChain::NAME,
P::TargetChain::NAME, P::TargetChain::NAME,
P::SourceChain::NAME, P::SourceChain::NAME,
@@ -223,7 +215,6 @@ where
P::TargetChain::AVERAGE_BLOCK_INTERVAL, P::TargetChain::AVERAGE_BLOCK_INTERVAL,
STALL_TIMEOUT, STALL_TIMEOUT,
).as_secs_f64() / 60.0f64, ).as_secs_f64() / 60.0f64,
stall_timeout,
); );
messages_relay::message_lane_loop::run( messages_relay::message_lane_loop::run(
@@ -232,7 +223,6 @@ where
source_tick: P::SourceChain::AVERAGE_BLOCK_INTERVAL, source_tick: P::SourceChain::AVERAGE_BLOCK_INTERVAL,
target_tick: P::TargetChain::AVERAGE_BLOCK_INTERVAL, target_tick: P::TargetChain::AVERAGE_BLOCK_INTERVAL,
reconnect_delay: relay_utils::relay_loop::RECONNECT_DELAY, reconnect_delay: relay_utils::relay_loop::RECONNECT_DELAY,
stall_timeout,
delivery_params: messages_relay::message_lane_loop::MessageDeliveryParams { delivery_params: messages_relay::message_lane_loop::MessageDeliveryParams {
max_unrewarded_relayer_entries_at_target: max_unrewarded_relayer_entries_at_target:
P::SourceChain::MAX_UNREWARDED_RELAYERS_IN_CONFIRMATION_TX, P::SourceChain::MAX_UNREWARDED_RELAYERS_IN_CONFIRMATION_TX,
@@ -51,7 +51,7 @@ use num_traits::{Bounded, Zero};
use relay_substrate_client::{ use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client,
Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra,
TransactionSignScheme, UnsignedTransaction, TransactionSignScheme, TransactionTracker, UnsignedTransaction,
}; };
use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
use sp_core::{Bytes, Pair}; use sp_core::{Bytes, Pair};
@@ -144,6 +144,8 @@ where
From<<AccountKeyPairOf<P::SourceTransactionSignScheme> as Pair>::Public>, From<<AccountKeyPairOf<P::SourceTransactionSignScheme> as Pair>::Public>,
P::SourceTransactionSignScheme: TransactionSignScheme<Chain = P::SourceChain>, P::SourceTransactionSignScheme: TransactionSignScheme<Chain = P::SourceChain>,
{ {
type TransactionTracker = TransactionTracker<P::SourceChain>;
async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> { async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because // we can't continue to deliver confirmations if source node is out of sync, because
// it may have already received confirmations that we're going to deliver // it may have already received confirmations that we're going to deliver
@@ -338,13 +340,13 @@ where
&self, &self,
_generated_at_block: TargetHeaderIdOf<MessageLaneAdapter<P>>, _generated_at_block: TargetHeaderIdOf<MessageLaneAdapter<P>>,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof, proof: <MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof,
) -> Result<(), SubstrateError> { ) -> Result<Self::TransactionTracker, SubstrateError> {
let genesis_hash = *self.source_client.genesis_hash(); let genesis_hash = *self.source_client.genesis_hash();
let transaction_params = self.transaction_params.clone(); let transaction_params = self.transaction_params.clone();
let (spec_version, transaction_version) = let (spec_version, transaction_version) =
self.source_client.simple_runtime_version().await?; self.source_client.simple_runtime_version().await?;
self.source_client self.source_client
.submit_signed_extrinsic( .submit_and_watch_signed_extrinsic(
self.transaction_params.signer.public().into(), self.transaction_params.signer.public().into(),
SignParam::<P::SourceTransactionSignScheme> { SignParam::<P::SourceTransactionSignScheme> {
spec_version, spec_version,
@@ -362,8 +364,7 @@ where
) )
}, },
) )
.await?; .await
Ok(())
} }
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<MessageLaneAdapter<P>>) { async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<MessageLaneAdapter<P>>) {
@@ -39,13 +39,13 @@ use codec::Encode;
use frame_support::weights::{Weight, WeightToFee}; use frame_support::weights::{Weight, WeightToFee};
use messages_relay::{ use messages_relay::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{TargetClient, TargetClientState}, message_lane_loop::{NoncesSubmitArtifacts, TargetClient, TargetClientState},
}; };
use num_traits::{Bounded, Zero}; use num_traits::{Bounded, Zero};
use relay_substrate_client::{ use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client,
Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra,
TransactionSignScheme, UnsignedTransaction, WeightToFeeOf, TransactionSignScheme, TransactionTracker, UnsignedTransaction, WeightToFeeOf,
}; };
use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
use sp_core::{Bytes, Pair}; use sp_core::{Bytes, Pair};
@@ -145,6 +145,8 @@ where
P::TargetTransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>, P::TargetTransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>, BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>,
{ {
type TransactionTracker = TransactionTracker<P::TargetChain>;
async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> { async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because // we can't continue to deliver confirmations if source node is out of sync, because
// it may have already received confirmations that we're going to deliver // it may have already received confirmations that we're going to deliver
@@ -245,15 +247,16 @@ where
_generated_at_header: SourceHeaderIdOf<MessageLaneAdapter<P>>, _generated_at_header: SourceHeaderIdOf<MessageLaneAdapter<P>>,
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesProof, proof: <MessageLaneAdapter<P> as MessageLane>::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, SubstrateError> { ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, SubstrateError> {
let genesis_hash = *self.target_client.genesis_hash(); let genesis_hash = *self.target_client.genesis_hash();
let transaction_params = self.transaction_params.clone(); let transaction_params = self.transaction_params.clone();
let relayer_id_at_source = self.relayer_id_at_source.clone(); let relayer_id_at_source = self.relayer_id_at_source.clone();
let nonces_clone = nonces.clone(); let nonces_clone = nonces.clone();
let (spec_version, transaction_version) = let (spec_version, transaction_version) =
self.target_client.simple_runtime_version().await?; self.target_client.simple_runtime_version().await?;
self.target_client let tx_tracker = self
.submit_signed_extrinsic( .target_client
.submit_and_watch_signed_extrinsic(
self.transaction_params.signer.public().into(), self.transaction_params.signer.public().into(),
SignParam::<P::TargetTransactionSignScheme> { SignParam::<P::TargetTransactionSignScheme> {
spec_version, spec_version,
@@ -274,7 +277,7 @@ where
}, },
) )
.await?; .await?;
Ok(nonces) Ok(NoncesSubmitArtifacts { nonces, tx_tracker })
} }
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<MessageLaneAdapter<P>>) { async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<MessageLaneAdapter<P>>) {
+121 -14
View File
@@ -33,7 +33,7 @@ use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState, Weight};
use bp_runtime::messages::DispatchFeePayment; use bp_runtime::messages::DispatchFeePayment;
use relay_utils::{ use relay_utils::{
interval, metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient, interval, metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient,
retry_backoff, FailedClient, retry_backoff, FailedClient, TransactionTracker,
}; };
use crate::{ use crate::{
@@ -55,8 +55,6 @@ pub struct Params<Strategy: RelayStrategy> {
pub target_tick: Duration, pub target_tick: Duration,
/// Delay between moments when connection error happens and our reconnect attempt. /// Delay between moments when connection error happens and our reconnect attempt.
pub reconnect_delay: Duration, pub reconnect_delay: Duration,
/// The loop will auto-restart if there has been no updates during this period.
pub stall_timeout: Duration,
/// Message delivery race parameters. /// Message delivery race parameters.
pub delivery_params: MessageDeliveryParams<Strategy>, pub delivery_params: MessageDeliveryParams<Strategy>,
} }
@@ -119,9 +117,20 @@ pub struct MessageProofParameters {
pub dispatch_weight: Weight, pub dispatch_weight: Weight,
} }
/// Artifacts of submitting nonces proof.
pub struct NoncesSubmitArtifacts<T> {
/// Submitted nonces range.
pub nonces: RangeInclusive<MessageNonce>,
/// Submitted transaction tracker.
pub tx_tracker: T,
}
/// Source client trait. /// Source client trait.
#[async_trait] #[async_trait]
pub trait SourceClient<P: MessageLane>: RelayClient { pub trait SourceClient<P: MessageLane>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
/// Returns state of the client. /// Returns state of the client.
async fn state(&self) -> Result<SourceClientState<P>, Self::Error>; async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
@@ -160,7 +169,7 @@ pub trait SourceClient<P: MessageLane>: RelayClient {
&self, &self,
generated_at_block: TargetHeaderIdOf<P>, generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof, proof: P::MessagesReceivingProof,
) -> Result<(), Self::Error>; ) -> Result<Self::TransactionTracker, Self::Error>;
/// We need given finalized target header on source to continue synchronization. /// We need given finalized target header on source to continue synchronization.
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>); async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>);
@@ -172,6 +181,9 @@ pub trait SourceClient<P: MessageLane>: RelayClient {
/// Target client trait. /// Target client trait.
#[async_trait] #[async_trait]
pub trait TargetClient<P: MessageLane>: RelayClient { pub trait TargetClient<P: MessageLane>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
/// Returns state of the client. /// Returns state of the client.
async fn state(&self) -> Result<TargetClientState<P>, Self::Error>; async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
@@ -205,7 +217,7 @@ pub trait TargetClient<P: MessageLane>: RelayClient {
generated_at_header: SourceHeaderIdOf<P>, generated_at_header: SourceHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesProof, proof: P::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, Self::Error>; ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
/// We need given finalized source header on target to continue synchronization. /// We need given finalized source header on target to continue synchronization.
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>); async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>);
@@ -327,7 +339,6 @@ async fn run_until_connection_lost<
delivery_source_state_receiver, delivery_source_state_receiver,
target_client.clone(), target_client.clone(),
delivery_target_state_receiver, delivery_target_state_receiver,
params.stall_timeout,
metrics_msg.clone(), metrics_msg.clone(),
params.delivery_params, params.delivery_params,
) )
@@ -342,7 +353,6 @@ async fn run_until_connection_lost<
receiving_source_state_receiver, receiving_source_state_receiver,
target_client.clone(), target_client.clone(),
receiving_target_state_receiver, receiving_target_state_receiver,
params.stall_timeout,
metrics_msg.clone(), metrics_msg.clone(),
) )
.fuse(); .fuse();
@@ -465,7 +475,7 @@ pub(crate) mod tests {
use futures::stream::StreamExt; use futures::stream::StreamExt;
use parking_lot::Mutex; use parking_lot::Mutex;
use relay_utils::{HeaderId, MaybeConnectionError}; use relay_utils::{HeaderId, MaybeConnectionError, TrackedTransactionStatus};
use crate::relay_strategy::AltruisticStrategy; use crate::relay_strategy::AltruisticStrategy;
@@ -518,19 +528,37 @@ pub(crate) mod tests {
type TargetHeaderHash = TestTargetHeaderHash; type TargetHeaderHash = TestTargetHeaderHash;
} }
#[derive(Debug, Default, Clone)] #[derive(Clone, Debug)]
pub struct TestTransactionTracker(TrackedTransactionStatus);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized)
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
async fn wait(self) -> TrackedTransactionStatus {
self.0
}
}
#[derive(Debug, Clone)]
pub struct TestClientData { pub struct TestClientData {
is_source_fails: bool, is_source_fails: bool,
is_source_reconnected: bool, is_source_reconnected: bool,
source_state: SourceClientState<TestMessageLane>, source_state: SourceClientState<TestMessageLane>,
source_latest_generated_nonce: MessageNonce, source_latest_generated_nonce: MessageNonce,
source_latest_confirmed_received_nonce: MessageNonce, source_latest_confirmed_received_nonce: MessageNonce,
source_tracked_transaction_status: TrackedTransactionStatus,
submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>, submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>,
is_target_fails: bool, is_target_fails: bool,
is_target_reconnected: bool, is_target_reconnected: bool,
target_state: SourceClientState<TestMessageLane>, target_state: SourceClientState<TestMessageLane>,
target_latest_received_nonce: MessageNonce, target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce, target_latest_confirmed_received_nonce: MessageNonce,
target_tracked_transaction_status: TrackedTransactionStatus,
submitted_messages_proofs: Vec<TestMessagesProof>, submitted_messages_proofs: Vec<TestMessagesProof>,
target_to_source_header_required: Option<TestTargetHeaderId>, target_to_source_header_required: Option<TestTargetHeaderId>,
target_to_source_header_requirements: Vec<TestTargetHeaderId>, target_to_source_header_requirements: Vec<TestTargetHeaderId>,
@@ -538,6 +566,31 @@ pub(crate) mod tests {
source_to_target_header_requirements: Vec<TestSourceHeaderId>, source_to_target_header_requirements: Vec<TestSourceHeaderId>,
} }
impl Default for TestClientData {
fn default() -> TestClientData {
TestClientData {
is_source_fails: false,
is_source_reconnected: false,
source_state: Default::default(),
source_latest_generated_nonce: 0,
source_latest_confirmed_received_nonce: 0,
source_tracked_transaction_status: TrackedTransactionStatus::Finalized,
submitted_messages_receiving_proofs: Vec::new(),
is_target_fails: false,
is_target_reconnected: false,
target_state: Default::default(),
target_latest_received_nonce: 0,
target_latest_confirmed_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Finalized,
submitted_messages_proofs: Vec::new(),
target_to_source_header_required: None,
target_to_source_header_requirements: Vec::new(),
source_to_target_header_required: None,
source_to_target_header_requirements: Vec::new(),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct TestSourceClient { pub struct TestSourceClient {
data: Arc<Mutex<TestClientData>>, data: Arc<Mutex<TestClientData>>,
@@ -569,6 +622,8 @@ pub(crate) mod tests {
#[async_trait] #[async_trait]
impl SourceClient<TestMessageLane> for TestSourceClient { impl SourceClient<TestMessageLane> for TestSourceClient {
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result<SourceClientState<TestMessageLane>, TestError> { async fn state(&self) -> Result<SourceClientState<TestMessageLane>, TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
(self.tick)(&mut data); (self.tick)(&mut data);
@@ -648,7 +703,7 @@ pub(crate) mod tests {
&self, &self,
_generated_at_block: TargetHeaderIdOf<TestMessageLane>, _generated_at_block: TargetHeaderIdOf<TestMessageLane>,
proof: TestMessagesReceivingProof, proof: TestMessagesReceivingProof,
) -> Result<(), TestError> { ) -> Result<Self::TransactionTracker, TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
(self.tick)(&mut data); (self.tick)(&mut data);
data.source_state.best_self = data.source_state.best_self =
@@ -656,7 +711,7 @@ pub(crate) mod tests {
data.source_state.best_finalized_self = data.source_state.best_self; data.source_state.best_finalized_self = data.source_state.best_self;
data.submitted_messages_receiving_proofs.push(proof); data.submitted_messages_receiving_proofs.push(proof);
data.source_latest_confirmed_received_nonce = proof; data.source_latest_confirmed_received_nonce = proof;
Ok(()) Ok(TestTransactionTracker(data.source_tracked_transaction_status))
} }
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<TestMessageLane>) { async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<TestMessageLane>) {
@@ -702,6 +757,8 @@ pub(crate) mod tests {
#[async_trait] #[async_trait]
impl TargetClient<TestMessageLane> for TestTargetClient { impl TargetClient<TestMessageLane> for TestTargetClient {
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result<TargetClientState<TestMessageLane>, TestError> { async fn state(&self) -> Result<TargetClientState<TestMessageLane>, TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
(self.tick)(&mut data); (self.tick)(&mut data);
@@ -762,7 +819,7 @@ pub(crate) mod tests {
_generated_at_header: SourceHeaderIdOf<TestMessageLane>, _generated_at_header: SourceHeaderIdOf<TestMessageLane>,
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: TestMessagesProof, proof: TestMessagesProof,
) -> Result<RangeInclusive<MessageNonce>, TestError> { ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
(self.tick)(&mut data); (self.tick)(&mut data);
if data.is_target_fails { if data.is_target_fails {
@@ -777,7 +834,10 @@ pub(crate) mod tests {
target_latest_confirmed_received_nonce; target_latest_confirmed_received_nonce;
} }
data.submitted_messages_proofs.push(proof); data.submitted_messages_proofs.push(proof);
Ok(nonces) Ok(NoncesSubmitArtifacts {
nonces,
tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status),
})
} }
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<TestMessageLane>) { async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<TestMessageLane>) {
@@ -817,7 +877,6 @@ pub(crate) mod tests {
source_tick: Duration::from_millis(100), source_tick: Duration::from_millis(100),
target_tick: Duration::from_millis(100), target_tick: Duration::from_millis(100),
reconnect_delay: Duration::from_millis(0), reconnect_delay: Duration::from_millis(0),
stall_timeout: Duration::from_millis(60 * 1000),
delivery_params: MessageDeliveryParams { delivery_params: MessageDeliveryParams {
max_unrewarded_relayer_entries_at_target: 4, max_unrewarded_relayer_entries_at_target: 4,
max_unconfirmed_nonces_at_target: 4, max_unconfirmed_nonces_at_target: 4,
@@ -889,6 +948,54 @@ pub(crate) mod tests {
assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],); assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
} }
#[test]
fn message_lane_loop_is_able_to_recover_from_race_stall() {
// with this configuration, both source and target clients will lose their transactions =>
// reconnect will happen
let (source_exit_sender, exit_receiver) = unbounded();
let target_exit_sender = source_exit_sender.clone();
let result = run_loop_test(
TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 1,
source_tracked_transaction_status: TrackedTransactionStatus::Lost,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Lost,
..Default::default()
},
Arc::new(move |data: &mut TestClientData| {
if data.is_source_reconnected {
data.source_tracked_transaction_status = TrackedTransactionStatus::Finalized;
}
if data.is_source_reconnected && data.is_target_reconnected {
source_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.target_tracked_transaction_status = TrackedTransactionStatus::Finalized;
}
if data.is_source_reconnected && data.is_target_reconnected {
target_exit_sender.unbounded_send(()).unwrap();
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert!(result.is_source_reconnected);
}
#[test] #[test]
fn message_lane_loop_works() { fn message_lane_loop_works() {
let (exit_sender, exit_receiver) = unbounded(); let (exit_sender, exit_receiver) = unbounded();
@@ -13,7 +13,7 @@
//! Message delivery race delivers proof-of-messages from "lane.source" to "lane.target". //! Message delivery race delivers proof-of-messages from "lane.source" to "lane.target".
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive};
use async_trait::async_trait; use async_trait::async_trait;
use futures::stream::FusedStream; use futures::stream::FusedStream;
@@ -24,7 +24,7 @@ use relay_utils::FailedClient;
use crate::{ use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{ message_lane_loop::{
MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, NoncesSubmitArtifacts,
SourceClient as MessageLaneSourceClient, SourceClientState, SourceClient as MessageLaneSourceClient, SourceClientState,
TargetClient as MessageLaneTargetClient, TargetClientState, TargetClient as MessageLaneTargetClient, TargetClientState,
}, },
@@ -43,7 +43,6 @@ pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
source_state_updates: impl FusedStream<Item = SourceClientState<P>>, source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
target_client: impl MessageLaneTargetClient<P>, target_client: impl MessageLaneTargetClient<P>,
target_state_updates: impl FusedStream<Item = TargetClientState<P>>, target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
metrics_msg: Option<MessageLaneLoopMetrics>, metrics_msg: Option<MessageLaneLoopMetrics>,
params: MessageDeliveryParams<Strategy>, params: MessageDeliveryParams<Strategy>,
) -> Result<(), FailedClient> { ) -> Result<(), FailedClient> {
@@ -60,7 +59,6 @@ pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
_phantom: Default::default(), _phantom: Default::default(),
}, },
target_state_updates, target_state_updates,
stall_timeout,
MessageDeliveryStrategy::<P, Strategy, _, _> { MessageDeliveryStrategy::<P, Strategy, _, _> {
lane_source_client: source_client, lane_source_client: source_client,
lane_target_client: target_client, lane_target_client: target_client,
@@ -174,6 +172,7 @@ where
{ {
type Error = C::Error; type Error = C::Error;
type TargetNoncesData = DeliveryRaceTargetNoncesData; type TargetNoncesData = DeliveryRaceTargetNoncesData;
type TransactionTracker = C::TransactionTracker;
async fn require_source_header(&self, id: SourceHeaderIdOf<P>) { async fn require_source_header(&self, id: SourceHeaderIdOf<P>) {
self.client.require_source_header_on_target(id).await self.client.require_source_header_on_target(id).await
@@ -215,7 +214,7 @@ where
generated_at_block: SourceHeaderIdOf<P>, generated_at_block: SourceHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesProof, proof: P::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, Self::Error> { ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error> {
self.client.submit_messages_proof(generated_at_block, nonces, proof).await self.client.submit_messages_proof(generated_at_block, nonces, proof).await
} }
} }
@@ -20,7 +20,7 @@
//! associated data - like messages, lane state, etc) to the target node by //! associated data - like messages, lane state, etc) to the target node by
//! generating and submitting proof. //! generating and submitting proof.
use crate::message_lane_loop::ClientState; use crate::message_lane_loop::{ClientState, NoncesSubmitArtifacts};
use async_trait::async_trait; use async_trait::async_trait;
use bp_messages::MessageNonce; use bp_messages::MessageNonce;
@@ -28,7 +28,10 @@ use futures::{
future::FutureExt, future::FutureExt,
stream::{FusedStream, StreamExt}, stream::{FusedStream, StreamExt},
}; };
use relay_utils::{process_future_result, retry_backoff, FailedClient, MaybeConnectionError}; use relay_utils::{
process_future_result, retry_backoff, FailedClient, MaybeConnectionError,
TrackedTransactionStatus, TransactionTracker,
};
use std::{ use std::{
fmt::Debug, fmt::Debug,
ops::RangeInclusive, ops::RangeInclusive,
@@ -124,6 +127,8 @@ pub trait TargetClient<P: MessageRace> {
type Error: std::fmt::Debug + MaybeConnectionError; type Error: std::fmt::Debug + MaybeConnectionError;
/// Type of the additional data from the target client, used by the race. /// Type of the additional data from the target client, used by the race.
type TargetNoncesData: std::fmt::Debug; type TargetNoncesData: std::fmt::Debug;
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
/// Ask headers relay to relay finalized headers up to (and including) given header /// Ask headers relay to relay finalized headers up to (and including) given header
/// from race source to race target. /// from race source to race target.
@@ -141,7 +146,7 @@ pub trait TargetClient<P: MessageRace> {
generated_at_block: P::SourceHeaderId, generated_at_block: P::SourceHeaderId,
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: P::Proof, proof: P::Proof,
) -> Result<RangeInclusive<MessageNonce>, Self::Error>; ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
} }
/// Race strategy. /// Race strategy.
@@ -222,7 +227,6 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
race_source_updated: impl FusedStream<Item = SourceClientState<P>>, race_source_updated: impl FusedStream<Item = SourceClientState<P>>,
race_target: TC, race_target: TC,
race_target_updated: impl FusedStream<Item = TargetClientState<P>>, race_target_updated: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
mut strategy: impl RaceStrategy< mut strategy: impl RaceStrategy<
P::SourceHeaderId, P::SourceHeaderId,
P::TargetHeaderId, P::TargetHeaderId,
@@ -234,7 +238,6 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
) -> Result<(), FailedClient> { ) -> Result<(), FailedClient> {
let mut progress_context = Instant::now(); let mut progress_context = Instant::now();
let mut race_state = RaceState::default(); let mut race_state = RaceState::default();
let mut stall_countdown = Instant::now();
let mut source_retry_backoff = retry_backoff(); let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = true; let mut source_client_is_online = true;
@@ -250,6 +253,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
let target_best_nonces = futures::future::Fuse::terminated(); let target_best_nonces = futures::future::Fuse::terminated();
let target_finalized_nonces = futures::future::Fuse::terminated(); let target_finalized_nonces = futures::future::Fuse::terminated();
let target_submit_proof = futures::future::Fuse::terminated(); let target_submit_proof = futures::future::Fuse::terminated();
let target_tx_tracker = futures::future::Fuse::terminated();
let target_go_offline_future = futures::future::Fuse::terminated(); let target_go_offline_future = futures::future::Fuse::terminated();
futures::pin_mut!( futures::pin_mut!(
@@ -261,6 +265,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
target_best_nonces, target_best_nonces,
target_finalized_nonces, target_finalized_nonces,
target_submit_proof, target_submit_proof,
target_tx_tracker,
target_go_offline_future, target_go_offline_future,
); );
@@ -343,11 +348,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
nonces, nonces,
); );
let prev_best_at_target = strategy.best_at_target();
strategy.best_target_nonces_updated(nonces, &mut race_state); strategy.best_target_nonces_updated(nonces, &mut race_state);
if strategy.best_at_target() != prev_best_at_target {
stall_countdown = Instant::now();
}
}, },
&mut target_go_offline_future, &mut target_go_offline_future,
async_std::task::sleep, async_std::task::sleep,
@@ -400,23 +401,37 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
target_client_is_online = process_future_result( target_client_is_online = process_future_result(
proof_submit_result, proof_submit_result,
&mut target_retry_backoff, &mut target_retry_backoff,
|nonces_range| { |artifacts: NoncesSubmitArtifacts<TC::TransactionTracker>| {
log::debug!( log::debug!(
target: "bridge", target: "bridge",
"Successfully submitted proof of nonces {:?} to {}", "Successfully submitted proof of nonces {:?} to {}",
nonces_range, artifacts.nonces,
P::target_name(), P::target_name(),
); );
race_state.nonces_to_submit = None; race_state.nonces_to_submit = None;
race_state.nonces_submitted = Some(nonces_range); race_state.nonces_submitted = Some(artifacts.nonces);
stall_countdown = Instant::now(); target_tx_tracker.set(artifacts.tx_tracker.wait().fuse());
}, },
&mut target_go_offline_future, &mut target_go_offline_future,
async_std::task::sleep, async_std::task::sleep,
|| format!("Error submitting proof {}", P::target_name()), || format!("Error submitting proof {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?; ).fail_if_connection_error(FailedClient::Target)?;
}, },
target_transaction_status = target_tx_tracker => {
if target_transaction_status == TrackedTransactionStatus::Lost {
log::warn!(
target: "bridge",
"{} -> {} race has stalled. State: {:?}. Strategy: {:?}",
P::source_name(),
P::target_name(),
race_state,
strategy,
);
return Err(FailedClient::Both);
}
},
// when we're ready to retry request // when we're ready to retry request
_ = source_go_offline_future => { _ = source_go_offline_future => {
@@ -429,24 +444,6 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
progress_context = print_race_progress::<P, _>(progress_context, &strategy); progress_context = print_race_progress::<P, _>(progress_context, &strategy);
if stall_countdown.elapsed() > stall_timeout {
log::warn!(
target: "bridge",
"{} -> {} race has stalled. State: {:?}. Strategy: {:?}",
P::source_name(),
P::target_name(),
race_state,
strategy,
);
return Err(FailedClient::Both)
} else if race_state.nonces_to_submit.is_none() &&
race_state.nonces_submitted.is_none() &&
strategy.is_empty()
{
stall_countdown = Instant::now();
}
if source_client_is_online { if source_client_is_online {
source_client_is_online = false; source_client_is_online = false;
@@ -16,7 +16,7 @@
use crate::{ use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{ message_lane_loop::{
SourceClient as MessageLaneSourceClient, SourceClientState, NoncesSubmitArtifacts, SourceClient as MessageLaneSourceClient, SourceClientState,
TargetClient as MessageLaneTargetClient, TargetClientState, TargetClient as MessageLaneTargetClient, TargetClientState,
}, },
message_race_loop::{ message_race_loop::{
@@ -31,7 +31,7 @@ use async_trait::async_trait;
use bp_messages::MessageNonce; use bp_messages::MessageNonce;
use futures::stream::FusedStream; use futures::stream::FusedStream;
use relay_utils::FailedClient; use relay_utils::FailedClient;
use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; use std::{marker::PhantomData, ops::RangeInclusive};
/// Message receiving confirmations delivery strategy. /// Message receiving confirmations delivery strategy.
type ReceivingConfirmationsBasicStrategy<P> = BasicStrategy< type ReceivingConfirmationsBasicStrategy<P> = BasicStrategy<
@@ -49,7 +49,6 @@ pub async fn run<P: MessageLane>(
source_state_updates: impl FusedStream<Item = SourceClientState<P>>, source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
target_client: impl MessageLaneTargetClient<P>, target_client: impl MessageLaneTargetClient<P>,
target_state_updates: impl FusedStream<Item = TargetClientState<P>>, target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
metrics_msg: Option<MessageLaneLoopMetrics>, metrics_msg: Option<MessageLaneLoopMetrics>,
) -> Result<(), FailedClient> { ) -> Result<(), FailedClient> {
crate::message_race_loop::run( crate::message_race_loop::run(
@@ -65,7 +64,6 @@ pub async fn run<P: MessageLane>(
_phantom: Default::default(), _phantom: Default::default(),
}, },
source_state_updates, source_state_updates,
stall_timeout,
ReceivingConfirmationsBasicStrategy::<P>::new(), ReceivingConfirmationsBasicStrategy::<P>::new(),
) )
.await .await
@@ -157,6 +155,7 @@ where
{ {
type Error = C::Error; type Error = C::Error;
type TargetNoncesData = (); type TargetNoncesData = ();
type TransactionTracker = C::TransactionTracker;
async fn require_source_header(&self, id: TargetHeaderIdOf<P>) { async fn require_source_header(&self, id: TargetHeaderIdOf<P>) {
self.client.require_target_header_on_source(id).await self.client.require_target_header_on_source(id).await
@@ -182,9 +181,10 @@ where
generated_at_block: TargetHeaderIdOf<P>, generated_at_block: TargetHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesReceivingProof, proof: P::MessagesReceivingProof,
) -> Result<RangeInclusive<MessageNonce>, Self::Error> { ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error> {
self.client.submit_messages_receiving_proof(generated_at_block, proof).await?; let tx_tracker =
Ok(nonces) self.client.submit_messages_receiving_proof(generated_at_block, proof).await?;
Ok(NoncesSubmitArtifacts { nonces, tx_tracker })
} }
} }