479010094e
- Remove missing cli crate from workspace members - Fix TOML array syntax errors in pvf and benchmarking-cli Cargo.toml - Fix Rust import ordering with cargo fmt - Fix feature propagation with zepter (try-runtime, runtime-benchmarks, std)
817 lines
28 KiB
Rust
817 lines
28 KiB
Rust
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
|
|
// This file is part of Parity Bridges Common.
|
|
|
|
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Parity Bridges Common is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
//! Loop that is serving single race within message lane. This could be
|
|
//! message delivery race, receiving confirmations race or processing
|
|
//! confirmations race.
|
|
//!
|
|
//! The idea of the race is simple - we have `nonce`-s on source and target
|
|
//! nodes. We're trying to prove that the source node has this nonce (and
|
|
//! associated data - like messages, lane state, etc) to the target node by
|
|
//! generating and submitting proof.
|
|
|
|
use crate::message_lane_loop::{BatchTransaction, ClientState, NoncesSubmitArtifacts};
|
|
|
|
use async_trait::async_trait;
|
|
use futures::{
|
|
future::{FutureExt, TryFutureExt},
|
|
stream::{FusedStream, StreamExt},
|
|
};
|
|
use pezbp_messages::MessageNonce;
|
|
use relay_utils::{
|
|
process_future_result, retry_backoff, FailedClient, MaybeConnectionError,
|
|
TrackedTransactionStatus, TransactionTracker,
|
|
};
|
|
use std::{
|
|
fmt::Debug,
|
|
ops::RangeInclusive,
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
/// One of races within lane.
|
|
pub trait MessageRace {
|
|
/// Header id of the race source.
|
|
type SourceHeaderId: Debug + Clone + PartialEq + Send + Sync;
|
|
/// Header id of the race source.
|
|
type TargetHeaderId: Debug + Clone + PartialEq + Send + Sync;
|
|
|
|
/// Message nonce used in the race.
|
|
type MessageNonce: Debug + Clone;
|
|
/// Proof that is generated and delivered in this race.
|
|
type Proof: Debug + Clone + Send + Sync;
|
|
|
|
/// Name of the race source.
|
|
fn source_name() -> String;
|
|
/// Name of the race target.
|
|
fn target_name() -> String;
|
|
}
|
|
|
|
/// State of race source client.
|
|
type SourceClientState<P> =
|
|
ClientState<<P as MessageRace>::SourceHeaderId, <P as MessageRace>::TargetHeaderId>;
|
|
|
|
/// State of race target client.
|
|
type TargetClientState<P> =
|
|
ClientState<<P as MessageRace>::TargetHeaderId, <P as MessageRace>::SourceHeaderId>;
|
|
|
|
/// Inclusive nonces range.
|
|
pub trait NoncesRange: Debug + Sized {
|
|
/// Get begin of the range.
|
|
fn begin(&self) -> MessageNonce;
|
|
/// Get end of the range.
|
|
fn end(&self) -> MessageNonce;
|
|
/// Returns new range with current range nonces that are greater than the passed `nonce`.
|
|
/// If there are no such nonces, `None` is returned.
|
|
fn greater_than(self, nonce: MessageNonce) -> Option<Self>;
|
|
}
|
|
|
|
/// Nonces on the race source client.
|
|
#[derive(Debug, Clone)]
|
|
pub struct SourceClientNonces<NoncesRange> {
|
|
/// New nonces range known to the client. `New` here means all nonces generated after
|
|
/// `prev_latest_nonce` passed to the `SourceClient::nonces` method.
|
|
pub new_nonces: NoncesRange,
|
|
/// The latest nonce that is confirmed to the bridged client. This nonce only makes
|
|
/// sense in some races. In other races it is `None`.
|
|
pub confirmed_nonce: Option<MessageNonce>,
|
|
}
|
|
|
|
/// Nonces on the race target client.
|
|
#[derive(Debug, Clone)]
|
|
pub struct TargetClientNonces<TargetNoncesData> {
|
|
/// The latest nonce that is known to the target client.
|
|
pub latest_nonce: MessageNonce,
|
|
/// Additional data from target node that may be used by the race.
|
|
pub nonces_data: TargetNoncesData,
|
|
}
|
|
|
|
/// One of message lane clients, which is source client for the race.
|
|
#[async_trait]
|
|
pub trait SourceClient<P: MessageRace> {
|
|
/// Type of error these clients returns.
|
|
type Error: std::fmt::Debug + MaybeConnectionError;
|
|
/// Type of nonces range returned by the source client.
|
|
type NoncesRange: NoncesRange;
|
|
/// Additional proof parameters required to generate proof.
|
|
type ProofParameters;
|
|
|
|
/// Return nonces that are known to the source client.
|
|
async fn nonces(
|
|
&self,
|
|
at_block: P::SourceHeaderId,
|
|
prev_latest_nonce: MessageNonce,
|
|
) -> Result<(P::SourceHeaderId, SourceClientNonces<Self::NoncesRange>), Self::Error>;
|
|
/// Generate proof for delivering to the target client.
|
|
async fn generate_proof(
|
|
&self,
|
|
at_block: P::SourceHeaderId,
|
|
nonces: RangeInclusive<MessageNonce>,
|
|
proof_parameters: Self::ProofParameters,
|
|
) -> Result<(P::SourceHeaderId, RangeInclusive<MessageNonce>, P::Proof), Self::Error>;
|
|
}
|
|
|
|
/// One of message lane clients, which is target client for the race.
|
|
#[async_trait]
|
|
pub trait TargetClient<P: MessageRace> {
|
|
/// Type of error these clients returns.
|
|
type Error: std::fmt::Debug + MaybeConnectionError;
|
|
/// Type of the additional data from the target client, used by the race.
|
|
type TargetNoncesData: std::fmt::Debug;
|
|
/// Type of batch transaction that submits finality and proof to the target node.
|
|
type BatchTransaction: BatchTransaction<P::SourceHeaderId> + Clone;
|
|
/// Transaction tracker to track submitted transactions.
|
|
type TransactionTracker: TransactionTracker<HeaderId = P::TargetHeaderId>;
|
|
|
|
/// Ask headers relay to relay finalized headers up to (and including) given header
|
|
/// from race source to race target.
|
|
///
|
|
/// The client may return `Some(_)`, which means that nothing has happened yet and
|
|
/// the caller must generate and append proof to the batch transaction
|
|
/// to actually send it (along with required header) to the node.
|
|
///
|
|
/// If function has returned `None`, it means that the caller now must wait for the
|
|
/// appearance of the required header `id` at the target client.
|
|
async fn require_source_header(
|
|
&self,
|
|
id: P::SourceHeaderId,
|
|
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
|
|
|
|
/// Return nonces that are known to the target client.
|
|
async fn nonces(
|
|
&self,
|
|
at_block: P::TargetHeaderId,
|
|
update_metrics: bool,
|
|
) -> Result<(P::TargetHeaderId, TargetClientNonces<Self::TargetNoncesData>), Self::Error>;
|
|
/// Submit proof to the target client.
|
|
async fn submit_proof(
|
|
&self,
|
|
maybe_batch_tx: Option<Self::BatchTransaction>,
|
|
generated_at_block: P::SourceHeaderId,
|
|
nonces: RangeInclusive<MessageNonce>,
|
|
proof: P::Proof,
|
|
) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
|
|
}
|
|
|
|
/// Race strategy.
|
|
#[async_trait]
|
|
pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
|
|
/// Type of nonces range expected from the source client.
|
|
type SourceNoncesRange: NoncesRange;
|
|
/// Additional proof parameters required to generate proof.
|
|
type ProofParameters;
|
|
/// Additional data expected from the target client.
|
|
type TargetNoncesData;
|
|
|
|
/// Return id of source header that is required to be on target to continue synchronization.
|
|
async fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
|
|
&self,
|
|
race_state: RS,
|
|
) -> Option<SourceHeaderId>;
|
|
/// Return the best nonce at source node.
|
|
///
|
|
/// `Some` is returned only if we are sure that the value is greater or equal
|
|
/// than the result of `best_at_target`.
|
|
fn best_at_source(&self) -> Option<MessageNonce>;
|
|
/// Return the best nonce at target node.
|
|
///
|
|
/// May return `None` if value is yet unknown.
|
|
fn best_at_target(&self) -> Option<MessageNonce>;
|
|
|
|
/// Called when nonces are updated at source node of the race.
|
|
fn source_nonces_updated(
|
|
&mut self,
|
|
at_block: SourceHeaderId,
|
|
nonces: SourceClientNonces<Self::SourceNoncesRange>,
|
|
);
|
|
/// Called when we want to wait until next `best_target_nonces_updated` before selecting
|
|
/// any nonces for delivery.
|
|
fn reset_best_target_nonces(&mut self);
|
|
/// Called when best nonces are updated at target node of the race.
|
|
fn best_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
|
|
&mut self,
|
|
nonces: TargetClientNonces<Self::TargetNoncesData>,
|
|
race_state: &mut RS,
|
|
);
|
|
/// Called when finalized nonces are updated at target node of the race.
|
|
fn finalized_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
|
|
&mut self,
|
|
nonces: TargetClientNonces<Self::TargetNoncesData>,
|
|
race_state: &mut RS,
|
|
);
|
|
/// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated
|
|
/// data) from source to target node.
|
|
/// Additionally, parameters required to generate proof are returned.
|
|
async fn select_nonces_to_deliver<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
|
|
&self,
|
|
race_state: RS,
|
|
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>;
|
|
}
|
|
|
|
/// State of the race.
|
|
pub trait RaceState<SourceHeaderId, TargetHeaderId>: Clone + Send + Sync {
|
|
/// Set best finalized source header id at the best block on the target
|
|
/// client (at the `best_finalized_source_header_id_at_best_target`).
|
|
fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId);
|
|
|
|
/// Best finalized source header id at the best block on the target
|
|
/// client (at the `best_finalized_source_header_id_at_best_target`).
|
|
fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId>;
|
|
|
|
/// Returns `true` if we have selected nonces to submit to the target node.
|
|
fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>>;
|
|
/// Reset our nonces selection.
|
|
fn reset_nonces_to_submit(&mut self);
|
|
|
|
/// Returns `true` if we have submitted some nonces to the target node and are
|
|
/// waiting for them to appear there.
|
|
fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>>;
|
|
/// Reset our nonces submission.
|
|
fn reset_nonces_submitted(&mut self);
|
|
}
|
|
|
|
/// State of the race and prepared batch transaction (if available).
|
|
#[derive(Debug, Clone)]
|
|
pub(crate) struct RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> {
|
|
/// Best finalized source header id at the source client.
|
|
pub best_finalized_source_header_id_at_source: Option<SourceHeaderId>,
|
|
/// Best finalized source header id at the best block on the target
|
|
/// client (at the `best_finalized_source_header_id_at_best_target`).
|
|
pub best_finalized_source_header_id_at_best_target: Option<SourceHeaderId>,
|
|
/// The best header id at the target client.
|
|
pub best_target_header_id: Option<TargetHeaderId>,
|
|
/// Best finalized header id at the target client.
|
|
pub best_finalized_target_header_id: Option<TargetHeaderId>,
|
|
/// Range of nonces that we have selected to submit.
|
|
pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>,
|
|
/// Batch transaction ready to include and deliver selected `nonces_to_submit` from the
|
|
/// `state`.
|
|
pub nonces_to_submit_batch: Option<BatchTx>,
|
|
/// Range of nonces that is currently submitted.
|
|
pub nonces_submitted: Option<RangeInclusive<MessageNonce>>,
|
|
}
|
|
|
|
impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> Default
|
|
for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
|
|
{
|
|
fn default() -> Self {
|
|
RaceStateImpl {
|
|
best_finalized_source_header_id_at_source: None,
|
|
best_finalized_source_header_id_at_best_target: None,
|
|
best_target_header_id: None,
|
|
best_finalized_target_header_id: None,
|
|
nonces_to_submit: None,
|
|
nonces_to_submit_batch: None,
|
|
nonces_submitted: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> RaceState<SourceHeaderId, TargetHeaderId>
|
|
for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
|
|
where
|
|
SourceHeaderId: Clone + Send + Sync,
|
|
TargetHeaderId: Clone + Send + Sync,
|
|
Proof: Clone + Send + Sync,
|
|
BatchTx: Clone + Send + Sync,
|
|
{
|
|
fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId) {
|
|
self.best_finalized_source_header_id_at_best_target = Some(id);
|
|
}
|
|
|
|
fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId> {
|
|
self.best_finalized_source_header_id_at_best_target.clone()
|
|
}
|
|
|
|
fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>> {
|
|
self.nonces_to_submit.clone().map(|(_, nonces, _)| nonces)
|
|
}
|
|
|
|
fn reset_nonces_to_submit(&mut self) {
|
|
self.nonces_to_submit = None;
|
|
self.nonces_to_submit_batch = None;
|
|
}
|
|
|
|
fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>> {
|
|
self.nonces_submitted.clone()
|
|
}
|
|
|
|
fn reset_nonces_submitted(&mut self) {
|
|
self.nonces_submitted = None;
|
|
}
|
|
}
|
|
|
|
/// Run race loop until connection with target or source node is lost.
|
|
pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
|
race_source: SC,
|
|
race_source_updated: impl FusedStream<Item = SourceClientState<P>>,
|
|
race_target: TC,
|
|
race_target_updated: impl FusedStream<Item = TargetClientState<P>>,
|
|
mut strategy: impl RaceStrategy<
|
|
P::SourceHeaderId,
|
|
P::TargetHeaderId,
|
|
P::Proof,
|
|
SourceNoncesRange = SC::NoncesRange,
|
|
ProofParameters = SC::ProofParameters,
|
|
TargetNoncesData = TC::TargetNoncesData,
|
|
>,
|
|
) -> Result<(), FailedClient> {
|
|
let mut progress_context = Instant::now();
|
|
let mut race_state = RaceStateImpl::default();
|
|
|
|
let mut source_retry_backoff = retry_backoff();
|
|
let mut source_client_is_online = true;
|
|
let mut source_nonces_required = false;
|
|
let mut source_required_header = None;
|
|
let source_nonces = futures::future::Fuse::terminated();
|
|
let source_generate_proof = futures::future::Fuse::terminated();
|
|
let source_go_offline_future = futures::future::Fuse::terminated();
|
|
|
|
let mut target_retry_backoff = retry_backoff();
|
|
let mut target_client_is_online = true;
|
|
let mut target_best_nonces_required = false;
|
|
let mut target_finalized_nonces_required = false;
|
|
let mut target_batch_transaction = None;
|
|
let target_require_source_header = futures::future::Fuse::terminated();
|
|
let target_best_nonces = futures::future::Fuse::terminated();
|
|
let target_finalized_nonces = futures::future::Fuse::terminated();
|
|
let target_submit_proof = futures::future::Fuse::terminated();
|
|
let target_tx_tracker = futures::future::Fuse::terminated();
|
|
let target_go_offline_future = futures::future::Fuse::terminated();
|
|
|
|
futures::pin_mut!(
|
|
race_source_updated,
|
|
source_nonces,
|
|
source_generate_proof,
|
|
source_go_offline_future,
|
|
race_target_updated,
|
|
target_require_source_header,
|
|
target_best_nonces,
|
|
target_finalized_nonces,
|
|
target_submit_proof,
|
|
target_tx_tracker,
|
|
target_go_offline_future,
|
|
);
|
|
|
|
loop {
|
|
futures::select! {
|
|
// when headers ids are updated
|
|
source_state = race_source_updated.next() => {
|
|
if let Some(source_state) = source_state {
|
|
let is_source_state_updated = race_state.best_finalized_source_header_id_at_source.as_ref()
|
|
!= Some(&source_state.best_finalized_self);
|
|
if is_source_state_updated {
|
|
source_nonces_required = true;
|
|
race_state.best_finalized_source_header_id_at_source
|
|
= Some(source_state.best_finalized_self);
|
|
}
|
|
}
|
|
},
|
|
target_state = race_target_updated.next() => {
|
|
if let Some(target_state) = target_state {
|
|
let is_target_best_state_updated = race_state.best_target_header_id.as_ref()
|
|
!= Some(&target_state.best_self);
|
|
|
|
if is_target_best_state_updated {
|
|
target_best_nonces_required = true;
|
|
race_state.best_target_header_id = Some(target_state.best_self);
|
|
race_state.best_finalized_source_header_id_at_best_target
|
|
= target_state.best_finalized_peer_at_best_self;
|
|
}
|
|
|
|
let is_target_finalized_state_updated = race_state.best_finalized_target_header_id.as_ref()
|
|
!= Some(&target_state.best_finalized_self);
|
|
if is_target_finalized_state_updated {
|
|
target_finalized_nonces_required = true;
|
|
race_state.best_finalized_target_header_id = Some(target_state.best_finalized_self);
|
|
}
|
|
}
|
|
},
|
|
|
|
// when nonces are updated
|
|
nonces = source_nonces => {
|
|
source_nonces_required = false;
|
|
|
|
source_client_is_online = process_future_result(
|
|
nonces,
|
|
&mut source_retry_backoff,
|
|
|(at_block, nonces)| {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
source=%P::source_name(),
|
|
?nonces,
|
|
"Received nonces"
|
|
);
|
|
|
|
strategy.source_nonces_updated(at_block, nonces);
|
|
},
|
|
&mut source_go_offline_future,
|
|
async_std::task::sleep,
|
|
|| format!("Error retrieving nonces from {}", P::source_name()),
|
|
).fail_if_connection_error(FailedClient::Source)?;
|
|
|
|
// ask for more headers if we have nonces to deliver and required headers are missing
|
|
source_required_header = strategy
|
|
.required_source_header_at_target(race_state.clone())
|
|
.await;
|
|
},
|
|
nonces = target_best_nonces => {
|
|
target_best_nonces_required = false;
|
|
|
|
target_client_is_online = process_future_result(
|
|
nonces,
|
|
&mut target_retry_backoff,
|
|
|(_, nonces)| {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
target=%P::target_name(),
|
|
?nonces,
|
|
"Received best nonces"
|
|
);
|
|
|
|
strategy.best_target_nonces_updated(nonces, &mut race_state);
|
|
},
|
|
&mut target_go_offline_future,
|
|
async_std::task::sleep,
|
|
|| format!("Error retrieving best nonces from {}", P::target_name()),
|
|
).fail_if_connection_error(FailedClient::Target)?;
|
|
},
|
|
nonces = target_finalized_nonces => {
|
|
target_finalized_nonces_required = false;
|
|
|
|
target_client_is_online = process_future_result(
|
|
nonces,
|
|
&mut target_retry_backoff,
|
|
|(_, nonces)| {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
target=%P::target_name(),
|
|
?nonces,
|
|
"Received finalized nonces"
|
|
);
|
|
|
|
strategy.finalized_target_nonces_updated(nonces, &mut race_state);
|
|
},
|
|
&mut target_go_offline_future,
|
|
async_std::task::sleep,
|
|
|| format!("Error retrieving finalized nonces from {}", P::target_name()),
|
|
).fail_if_connection_error(FailedClient::Target)?;
|
|
},
|
|
|
|
// proof generation and submission
|
|
maybe_batch_transaction = target_require_source_header => {
|
|
source_required_header = None;
|
|
|
|
target_client_is_online = process_future_result(
|
|
maybe_batch_transaction,
|
|
&mut target_retry_backoff,
|
|
|maybe_batch_transaction: Option<TC::BatchTransaction>| {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
target=%P::target_name(),
|
|
source=%P::source_name(),
|
|
batch_tx=%maybe_batch_transaction
|
|
.as_ref()
|
|
.map(|bt| format!("yes ({:?})", bt.required_header_id()))
|
|
.unwrap_or_else(|| "no".into()),
|
|
"Target client has been asked for more headers."
|
|
);
|
|
|
|
target_batch_transaction = maybe_batch_transaction;
|
|
},
|
|
&mut target_go_offline_future,
|
|
async_std::task::sleep,
|
|
|| format!("Error asking for source headers at {}", P::target_name()),
|
|
).fail_if_connection_error(FailedClient::Target)?;
|
|
},
|
|
proof = source_generate_proof => {
|
|
source_client_is_online = process_future_result(
|
|
proof,
|
|
&mut source_retry_backoff,
|
|
|(at_block, nonces_range, proof, batch_transaction)| {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
source=%P::source_name(),
|
|
?nonces_range,
|
|
"Received proof"
|
|
);
|
|
|
|
race_state.nonces_to_submit = Some((at_block, nonces_range, proof));
|
|
race_state.nonces_to_submit_batch = batch_transaction;
|
|
},
|
|
&mut source_go_offline_future,
|
|
async_std::task::sleep,
|
|
|| format!("Error generating proof at {}", P::source_name()),
|
|
).fail_if_error(FailedClient::Source).map(|_| true)?;
|
|
},
|
|
proof_submit_result = target_submit_proof => {
|
|
target_client_is_online = process_future_result(
|
|
proof_submit_result,
|
|
&mut target_retry_backoff,
|
|
|artifacts: NoncesSubmitArtifacts<TC::TransactionTracker>| {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
target=%P::target_name(),
|
|
nonces=?artifacts.nonces,
|
|
"Successfully submitted proof"
|
|
);
|
|
|
|
race_state.nonces_submitted = Some(artifacts.nonces);
|
|
target_tx_tracker.set(artifacts.tx_tracker.wait().fuse());
|
|
},
|
|
&mut target_go_offline_future,
|
|
async_std::task::sleep,
|
|
|| format!("Error submitting proof {}", P::target_name()),
|
|
).fail_if_connection_error(FailedClient::Target)?;
|
|
|
|
// in any case - we don't need to retry submitting the same nonces again until
|
|
// we read nonces from the target client
|
|
race_state.reset_nonces_to_submit();
|
|
// if we have failed to submit transaction AND that is not the connection issue,
|
|
// then we need to read best target nonces before selecting nonces again
|
|
if !target_client_is_online {
|
|
strategy.reset_best_target_nonces();
|
|
}
|
|
},
|
|
target_transaction_status = target_tx_tracker => {
|
|
match (target_transaction_status, race_state.nonces_submitted.as_ref()) {
|
|
(TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => {
|
|
// our transaction has been mined, but was it successful or not? let's check the best
|
|
// nonce at the target node.
|
|
let _ = race_target.nonces(at_block, false)
|
|
.await
|
|
.map_err(|e| format!("failed to read nonces from target node: {e:?}"))
|
|
.and_then(|(_, nonces_at_target)| {
|
|
if nonces_at_target.latest_nonce < *nonces_submitted.end() {
|
|
Err(format!(
|
|
"best nonce at target after tx is {:?} and we've submitted {:?}",
|
|
nonces_at_target.latest_nonce,
|
|
nonces_submitted.end(),
|
|
))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
})
|
|
.map_err(|e| {
|
|
tracing::error!(
|
|
target: "bridge",
|
|
error=%e,
|
|
source=%P::source_name(),
|
|
target=%P::target_name(),
|
|
"Source -> target race transaction failed"
|
|
);
|
|
|
|
race_state.reset_nonces_submitted();
|
|
});
|
|
},
|
|
(TrackedTransactionStatus::Lost, _) => {
|
|
tracing::warn!(
|
|
target: "bridge",
|
|
source=%P::source_name(),
|
|
target=%P::target_name(),
|
|
state=?race_state,
|
|
?strategy,
|
|
"Source -> target race transaction has been lost."
|
|
);
|
|
|
|
race_state.reset_nonces_submitted();
|
|
},
|
|
_ => (),
|
|
}
|
|
},
|
|
|
|
// when we're ready to retry request
|
|
_ = source_go_offline_future => {
|
|
source_client_is_online = true;
|
|
},
|
|
_ = target_go_offline_future => {
|
|
target_client_is_online = true;
|
|
},
|
|
}
|
|
|
|
progress_context = print_race_progress::<P, _>(progress_context, &strategy);
|
|
|
|
if source_client_is_online {
|
|
source_client_is_online = false;
|
|
|
|
// if we've started to submit batch transaction, let's prioritize it
|
|
//
|
|
// we're using `take` here, because we don't need batch transaction (i.e. some
|
|
// underlying finality proof) anymore for our future calls - we were unable to
|
|
// use it for our current state, so why would we need to keep an obsolete proof
|
|
// for the future?
|
|
let target_batch_transaction = target_batch_transaction.take();
|
|
let expected_race_state =
|
|
if let Some(ref target_batch_transaction) = target_batch_transaction {
|
|
// when selecting nonces for the batch transaction, we assume that the required
|
|
// source header is already at the target chain
|
|
let required_source_header_at_target =
|
|
target_batch_transaction.required_header_id();
|
|
let mut expected_race_state = race_state.clone();
|
|
expected_race_state.best_finalized_source_header_id_at_best_target =
|
|
Some(required_source_header_at_target);
|
|
expected_race_state
|
|
} else {
|
|
race_state.clone()
|
|
};
|
|
|
|
let nonces_to_deliver = select_nonces_to_deliver(expected_race_state, &strategy).await;
|
|
let best_at_source = strategy.best_at_source();
|
|
|
|
if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
source=P::source_name(),
|
|
?nonces_range,
|
|
?at_block,
|
|
"Asking to prove"
|
|
);
|
|
|
|
source_generate_proof.set(
|
|
race_source
|
|
.generate_proof(at_block, nonces_range, proof_parameters)
|
|
.and_then(|(at_source_block, nonces, proof)| async {
|
|
Ok((at_source_block, nonces, proof, target_batch_transaction))
|
|
})
|
|
.fuse(),
|
|
);
|
|
} else if let (true, Some(best_at_source)) = (source_nonces_required, best_at_source) {
|
|
tracing::debug!(target: "bridge", source=%P::source_name(), "Asking about message nonces");
|
|
let at_block = race_state
|
|
.best_finalized_source_header_id_at_source
|
|
.as_ref()
|
|
.expect(
|
|
"source_nonces_required is only true when \
|
|
best_finalized_source_header_id_at_source is Some; qed",
|
|
)
|
|
.clone();
|
|
source_nonces.set(race_source.nonces(at_block, best_at_source).fuse());
|
|
} else {
|
|
source_client_is_online = true;
|
|
}
|
|
}
|
|
|
|
if target_client_is_online {
|
|
target_client_is_online = false;
|
|
|
|
if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
target=%P::target_name(),
|
|
?nonces_range,
|
|
"Going to submit proof of messages in range to node{}",
|
|
race_state.nonces_to_submit_batch.as_ref().map(|tx| format!(
|
|
". This transaction is batched with sending the proof for header {:?}.",
|
|
tx.required_header_id())
|
|
).unwrap_or_default(),
|
|
);
|
|
|
|
target_submit_proof.set(
|
|
race_target
|
|
.submit_proof(
|
|
race_state.nonces_to_submit_batch.clone(),
|
|
at_block.clone(),
|
|
nonces_range.clone(),
|
|
proof.clone(),
|
|
)
|
|
.fuse(),
|
|
);
|
|
} else if let Some(source_required_header) = source_required_header.clone() {
|
|
tracing::debug!(
|
|
target: "bridge",
|
|
source=%P::source_name(),
|
|
target=%P::target_name(),
|
|
?source_required_header,
|
|
"Going to require header"
|
|
);
|
|
target_require_source_header
|
|
.set(race_target.require_source_header(source_required_header).fuse());
|
|
} else if target_best_nonces_required {
|
|
tracing::debug!(target: "bridge", target=%P::target_name(), "Asking about best message nonces");
|
|
let at_block = race_state
|
|
.best_target_header_id
|
|
.as_ref()
|
|
.expect("target_best_nonces_required is only true when best_target_header_id is Some; qed")
|
|
.clone();
|
|
target_best_nonces.set(race_target.nonces(at_block, false).fuse());
|
|
} else if target_finalized_nonces_required {
|
|
tracing::debug!(target: "bridge", target=%P::target_name(), "Asking about finalized message nonces");
|
|
let at_block = race_state
|
|
.best_finalized_target_header_id
|
|
.as_ref()
|
|
.expect(
|
|
"target_finalized_nonces_required is only true when \
|
|
best_finalized_target_header_id is Some; qed",
|
|
)
|
|
.clone();
|
|
target_finalized_nonces.set(race_target.nonces(at_block, true).fuse());
|
|
} else {
|
|
target_client_is_online = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Print race progress.
|
|
fn print_race_progress<P, S>(prev_time: Instant, strategy: &S) -> Instant
|
|
where
|
|
P: MessageRace,
|
|
S: RaceStrategy<P::SourceHeaderId, P::TargetHeaderId, P::Proof>,
|
|
{
|
|
let now_time = Instant::now();
|
|
|
|
let need_update = now_time.saturating_duration_since(prev_time) > Duration::from_secs(10);
|
|
if !need_update {
|
|
return prev_time;
|
|
}
|
|
|
|
let now_best_nonce_at_source = strategy.best_at_source();
|
|
let now_best_nonce_at_target = strategy.best_at_target();
|
|
tracing::info!(
|
|
target: "bridge",
|
|
source=%P::source_name(),
|
|
target=%P::target_name(),
|
|
?now_best_nonce_at_target,
|
|
?now_best_nonce_at_source,
|
|
"Synced nonces in source -> target race"
|
|
|
|
);
|
|
now_time
|
|
}
|
|
|
|
async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>(
|
|
race_state: impl RaceState<SourceHeaderId, TargetHeaderId>,
|
|
strategy: &Strategy,
|
|
) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)>
|
|
where
|
|
SourceHeaderId: Clone,
|
|
Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>,
|
|
{
|
|
let best_finalized_source_header_id_at_best_target =
|
|
race_state.best_finalized_source_header_id_at_best_target()?;
|
|
strategy
|
|
.select_nonces_to_deliver(race_state)
|
|
.await
|
|
.map(|(nonces_range, proof_parameters)| {
|
|
(best_finalized_source_header_id_at_best_target, nonces_range, proof_parameters)
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::message_race_strategy::BasicStrategy;
|
|
use relay_utils::HeaderId;
|
|
|
|
#[async_std::test]
|
|
async fn proof_is_generated_at_best_block_known_to_target_node() {
|
|
const GENERATED_AT: u64 = 6;
|
|
const BEST_AT_SOURCE: u64 = 10;
|
|
const BEST_AT_TARGET: u64 = 8;
|
|
|
|
// target node only knows about source' BEST_AT_TARGET block
|
|
// source node has BEST_AT_SOURCE > BEST_AT_TARGET block
|
|
let mut race_state = RaceStateImpl::<_, _, (), ()> {
|
|
best_finalized_source_header_id_at_source: Some(HeaderId(
|
|
BEST_AT_SOURCE,
|
|
BEST_AT_SOURCE,
|
|
)),
|
|
best_finalized_source_header_id_at_best_target: Some(HeaderId(
|
|
BEST_AT_TARGET,
|
|
BEST_AT_TARGET,
|
|
)),
|
|
best_target_header_id: Some(HeaderId(0, 0)),
|
|
best_finalized_target_header_id: Some(HeaderId(0, 0)),
|
|
nonces_to_submit: None,
|
|
nonces_to_submit_batch: None,
|
|
nonces_submitted: None,
|
|
};
|
|
|
|
// we have some nonces to deliver and they're generated at GENERATED_AT < BEST_AT_SOURCE
|
|
let mut strategy = BasicStrategy::<_, _, _, _, _, ()>::new();
|
|
strategy.source_nonces_updated(
|
|
HeaderId(GENERATED_AT, GENERATED_AT),
|
|
SourceClientNonces { new_nonces: 0..=10, confirmed_nonce: None },
|
|
);
|
|
strategy.best_target_nonces_updated(
|
|
TargetClientNonces { latest_nonce: 5u64, nonces_data: () },
|
|
&mut race_state,
|
|
);
|
|
|
|
// the proof will be generated on source, but using BEST_AT_TARGET block
|
|
assert_eq!(
|
|
select_nonces_to_deliver(race_state, &strategy).await,
|
|
Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),))
|
|
);
|
|
}
|
|
}
|