TransactionInvalidationTracker (#1544)

* TransactionInvalidationTracker

* TransacitonInvalidationTracker -> TransactionTracker

* change sign_transaction method

* clippy and spelling

* removed comment

* more transactiontracker tests

* stalls_when_transaction_tracker_returns_error

* remove test code

* remove "impl TransactionTracker for ()"

* enum TrackedTransactionStatus

* test TransactionTracker in on_transaction_status

* do_wait
This commit is contained in:
Svyatoslav Nikolsky
2022-09-14 17:30:57 +03:00
committed by Bastian Köcher
parent 58fe2749d2
commit 70d6e91f20
8 changed files with 514 additions and 145 deletions
+44 -35
View File
@@ -29,7 +29,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt};
use num_traits::{One, Saturating};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
HeaderId, MaybeConnectionError,
HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
};
use std::{
pin::Pin,
@@ -86,6 +86,9 @@ pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
/// Target client used in finality synchronization loop.
#[async_trait]
pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
/// Get best finalized source block number.
async fn best_finalized_source_block_id(
&self,
@@ -96,7 +99,7 @@ pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
&self,
header: P::Header,
proof: P::FinalityProof,
) -> Result<(), Self::Error>;
) -> Result<Self::TransactionTracker, Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs
@@ -153,8 +156,6 @@ pub(crate) enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
Target(TargetError),
/// Finality proof for mandatory header is missing from the source node.
MissingMandatoryFinalityProof(P::Number),
/// The synchronization has stalled.
Stalled,
}
impl<P, SourceError, TargetError> Error<P, SourceError, TargetError>
@@ -167,7 +168,6 @@ where
match *self {
Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
Error::Stalled => Err(FailedClient::Both),
_ => Ok(()),
}
}
@@ -175,9 +175,9 @@ where
/// Information about transaction that we have submitted.
#[derive(Debug, Clone)]
pub(crate) struct Transaction<Number> {
/// Time when we have submitted this transaction.
pub time: Instant,
pub(crate) struct Transaction<Tracker, Number> {
/// Submitted transaction tracker.
pub tracker: Tracker,
/// The number of the header we have submitted.
pub submitted_header_number: Number,
}
@@ -206,11 +206,12 @@ pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsS
&'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
/// Recent finality proofs that we have read from the stream.
pub(crate) recent_finality_proofs: &'a mut FinalityProofs<P>,
/// Last transaction that we have submitted to the target node.
pub(crate) last_transaction: Option<Transaction<P::Number>>,
/// Number of the last header, submitted to the target node.
pub(crate) submitted_header_number: Option<P::Number>,
}
async fn run_until_connection_lost<P: FinalitySyncPipeline>(
/// Run finality relay loop until connection to one of nodes is lost.
pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
@@ -230,8 +231,9 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
})
};
let last_transaction_tracker = futures::future::Fuse::terminated();
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
futures::pin_mut!(last_transaction_tracker, exit_signal);
let mut finality_proofs_stream = RestartableFinalityProofsStream {
needs_restart: false,
@@ -241,7 +243,7 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
let mut progress = (Instant::now(), None);
let mut retry_backoff = retry_backoff();
let mut last_transaction = None;
let mut last_submitted_header_number = None;
loop {
// run loop iteration
@@ -252,7 +254,7 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
progress: &mut progress,
finality_proofs_stream: &mut finality_proofs_stream,
recent_finality_proofs: &mut recent_finality_proofs,
last_transaction: last_transaction.clone(),
submitted_header_number: last_submitted_header_number,
},
&sync_params,
&metrics_sync,
@@ -261,8 +263,14 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
// deal with errors
let next_tick = match iteration_result {
Ok(updated_last_transaction) => {
last_transaction = updated_last_transaction;
Ok(Some(updated_last_transaction)) => {
last_transaction_tracker.set(updated_last_transaction.tracker.wait().fuse());
last_submitted_header_number =
Some(updated_last_transaction.submitted_header_number);
retry_backoff.reset();
sync_params.tick
},
Ok(None) => {
retry_backoff.reset();
sync_params.tick
},
@@ -281,6 +289,18 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
// wait till exit signal, or new source block
select! {
transaction_status = last_transaction_tracker => {
if transaction_status == TrackedTransactionStatus::Lost {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
return Err(FailedClient::Both);
}
},
_ = async_std::task::sleep(next_tick).fuse() => {},
_ = exit_signal => return Ok(()),
}
@@ -293,7 +313,7 @@ pub(crate) async fn run_loop_iteration<P, SC, TC>(
state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
sync_params: &FinalitySyncParams,
metrics_sync: &Option<SyncLoopMetrics>,
) -> Result<Option<Transaction<P::Number>>, Error<P, SC::Error, TC::Error>>
) -> Result<Option<Transaction<TC::TransactionTracker, P::Number>>, Error<P, SC::Error, TC::Error>>
where
P: FinalitySyncPipeline,
SC: SourceClient<P>,
@@ -333,20 +353,11 @@ where
// if we have already submitted header, then we just need to wait for it
// if we're waiting too much, then we believe our transaction has been lost and restart sync
if let Some(last_transaction) = state.last_transaction {
if best_number_at_target >= last_transaction.submitted_header_number {
if let Some(submitted_header_number) = state.submitted_header_number {
if best_number_at_target >= submitted_header_number {
// transaction has been mined && we can continue
} else if last_transaction.time.elapsed() > sync_params.stall_timeout {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
return Err(Error::Stalled)
} else {
return Ok(Some(last_transaction))
return Ok(None)
}
}
@@ -363,22 +374,20 @@ where
.await?
{
Some((header, justification)) => {
let new_transaction =
Transaction { time: Instant::now(), submitted_header_number: header.number() };
let submitted_header_number = header.number();
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
new_transaction.submitted_header_number,
submitted_header_number,
P::TARGET_NAME,
);
target_client
let tracker = target_client
.submit_finality_proof(header, justification)
.await
.map_err(Error::Target)?;
Ok(Some(new_transaction))
Ok(Some(Transaction { tracker, submitted_header_number }))
},
None => Ok(None),
}