mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 02:51:01 +00:00
Relay DummyOrdered messages (#318)
* DummyOrdered relay: initial commit * 1 ML file -> 3 files * extracted generic message race loop * uncommented race error procesing * lane loop tests * cargo fmt * moved HeaderId to utils.rs * restart lane loop on stall * message delivery strategy tests * removed obsolete code * clippy * Update relays/ethereum/src/message_lane_loop.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * Update relays/ethereum/src/message_lane_loop.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * added more races docs * Update relays/ethereum/src/message_race_delivery.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * update docs * Update relays/ethereum/src/message_race_loop.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * deal with TODOs * removed docs * docs * cargo fmt --all * Update relays/ethereum/src/message_race_loop.rs Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com> Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
committed by
Bastian Köcher
parent
463a01716c
commit
bed44dec13
@@ -21,8 +21,8 @@ use crate::ethereum_types::{
|
|||||||
use crate::rpc::{Ethereum, EthereumRpc};
|
use crate::rpc::{Ethereum, EthereumRpc};
|
||||||
use crate::rpc_errors::{EthereumNodeError, RpcError};
|
use crate::rpc_errors::{EthereumNodeError, RpcError};
|
||||||
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
|
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
|
||||||
use crate::sync_types::{HeaderId, SubmittedHeaders};
|
use crate::sync_types::SubmittedHeaders;
|
||||||
use crate::utils::MaybeConnectionError;
|
use crate::utils::{HeaderId, MaybeConnectionError};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use codec::{Decode, Encode};
|
use codec::{Decode, Encode};
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ use crate::instances::BridgeInstance;
|
|||||||
use crate::rpc::SubstrateRpc;
|
use crate::rpc::SubstrateRpc;
|
||||||
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
|
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
|
||||||
use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader, SubstrateHeaderId};
|
use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader, SubstrateHeaderId};
|
||||||
use crate::sync_types::HeaderId;
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
use codec::{Decode, Encode};
|
use codec::{Decode, Encode};
|
||||||
use num_traits::Zero;
|
use num_traits::Zero;
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ use crate::substrate_client::{
|
|||||||
SubmitEthereumExchangeTransactionProof, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams,
|
SubmitEthereumExchangeTransactionProof, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams,
|
||||||
};
|
};
|
||||||
use crate::substrate_types::into_substrate_ethereum_receipt;
|
use crate::substrate_types::into_substrate_ethereum_receipt;
|
||||||
use crate::sync_types::HeaderId;
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bp_currency_exchange::MaybeLockFundsTransaction;
|
use bp_currency_exchange::MaybeLockFundsTransaction;
|
||||||
|
|||||||
@@ -15,7 +15,8 @@
|
|||||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
|
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
|
||||||
use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
||||||
|
use crate::utils::HeaderId;
|
||||||
use codec::Encode;
|
use codec::Encode;
|
||||||
|
|
||||||
pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64};
|
pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64};
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ pub trait SourceBlock {
|
|||||||
type Transaction: SourceTransaction;
|
type Transaction: SourceTransaction;
|
||||||
|
|
||||||
/// Return hash of the block.
|
/// Return hash of the block.
|
||||||
fn id(&self) -> crate::sync_types::HeaderId<Self::Hash, Self::Number>;
|
fn id(&self) -> crate::utils::HeaderId<Self::Hash, Self::Number>;
|
||||||
/// Return block transactions iterator.
|
/// Return block transactions iterator.
|
||||||
fn transactions(&self) -> Vec<Self::Transaction>;
|
fn transactions(&self) -> Vec<Self::Transaction>;
|
||||||
}
|
}
|
||||||
@@ -81,7 +81,7 @@ pub type TransactionOf<P> = <<P as TransactionProofPipeline>::Block as SourceBlo
|
|||||||
pub type TransactionHashOf<P> = <TransactionOf<P> as SourceTransaction>::Hash;
|
pub type TransactionHashOf<P> = <TransactionOf<P> as SourceTransaction>::Hash;
|
||||||
|
|
||||||
/// Header id.
|
/// Header id.
|
||||||
pub type HeaderId<P> = crate::sync_types::HeaderId<BlockHashOf<P>, BlockNumberOf<P>>;
|
pub type HeaderId<P> = crate::utils::HeaderId<BlockHashOf<P>, BlockNumberOf<P>>;
|
||||||
|
|
||||||
/// Source client API.
|
/// Source client API.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -443,7 +443,7 @@ async fn wait_header_finalized<P: TransactionProofPipeline>(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::sync_types::HeaderId;
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::{
|
use std::{
|
||||||
|
|||||||
@@ -14,7 +14,9 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use crate::sync_types::{HeaderId, HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
||||||
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
use linked_hash_map::LinkedHashMap;
|
use linked_hash_map::LinkedHashMap;
|
||||||
use num_traits::{One, Zero};
|
use num_traits::{One, Zero};
|
||||||
use std::{
|
use std::{
|
||||||
@@ -777,7 +779,7 @@ fn queued_incomplete_header<Id: Clone + Eq + std::hash::Hash, T>(
|
|||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, H256};
|
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, H256};
|
||||||
use crate::sync_types::{HeaderId, QueuedHeader};
|
use crate::sync_types::QueuedHeader;
|
||||||
|
|
||||||
pub(crate) fn header(number: u64) -> QueuedHeader<EthereumHeadersSyncPipeline> {
|
pub(crate) fn header(number: u64) -> QueuedHeader<EthereumHeadersSyncPipeline> {
|
||||||
QueuedHeader::new(Header {
|
QueuedHeader::new(Header {
|
||||||
|
|||||||
@@ -27,6 +27,10 @@ mod exchange_loop;
|
|||||||
mod exchange_loop_metrics;
|
mod exchange_loop_metrics;
|
||||||
mod headers;
|
mod headers;
|
||||||
mod instances;
|
mod instances;
|
||||||
|
mod message_lane;
|
||||||
|
mod message_lane_loop;
|
||||||
|
mod message_race_delivery;
|
||||||
|
mod message_race_loop;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
mod rpc;
|
mod rpc;
|
||||||
mod rpc_errors;
|
mod rpc_errors;
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity Bridges Common.
|
||||||
|
|
||||||
|
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity Bridges Common is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! One-way message lane types. Within single one-way lane we have three 'races' where we try to:
|
||||||
|
//!
|
||||||
|
//! 1) relay new messages from source to target node;
|
||||||
|
//! 2) relay proof-of-receiving from target to source node;
|
||||||
|
//! 3) relay proof-of-processing from target no source node.
|
||||||
|
|
||||||
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
/// One-way message lane.
|
||||||
|
pub trait MessageLane {
|
||||||
|
/// Name of the messages source.
|
||||||
|
const SOURCE_NAME: &'static str;
|
||||||
|
/// Name of the messages target.
|
||||||
|
const TARGET_NAME: &'static str;
|
||||||
|
|
||||||
|
/// Message nonce type.
|
||||||
|
type MessageNonce: Clone + Copy + Debug + Default + From<u32> + Ord + std::ops::Add<Output = Self::MessageNonce>;
|
||||||
|
|
||||||
|
/// Messages proof.
|
||||||
|
type MessagesProof: Clone;
|
||||||
|
|
||||||
|
/// Number of the source header.
|
||||||
|
type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq;
|
||||||
|
/// Hash of the source header.
|
||||||
|
type SourceHeaderHash: Clone + Debug + Default + PartialEq;
|
||||||
|
|
||||||
|
/// Number of the target header.
|
||||||
|
type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq;
|
||||||
|
/// Hash of the target header.
|
||||||
|
type TargetHeaderHash: Clone + Debug + Default + PartialEq;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Source header id within given one-way message lane.
|
||||||
|
pub type SourceHeaderIdOf<P> = HeaderId<<P as MessageLane>::SourceHeaderHash, <P as MessageLane>::SourceHeaderNumber>;
|
||||||
|
|
||||||
|
/// Target header id within given one-way message lane.
|
||||||
|
pub type TargetHeaderIdOf<P> = HeaderId<<P as MessageLane>::TargetHeaderHash, <P as MessageLane>::TargetHeaderNumber>;
|
||||||
@@ -0,0 +1,591 @@
|
|||||||
|
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity Bridges Common.
|
||||||
|
|
||||||
|
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity Bridges Common is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Message delivery loop. Designed to work with message-lane pallet.
|
||||||
|
//!
|
||||||
|
//! Single relay instance delivers messages of single lane in single direction.
|
||||||
|
//! To serve two-way lane, you would need two instances of relay.
|
||||||
|
//! To serve N two-way lanes, you would need N*2 instances of relay.
|
||||||
|
//!
|
||||||
|
//! Please keep in mind that the best header in this file is actually best
|
||||||
|
//! finalized header. I.e. when talking about headers in lane context, we
|
||||||
|
//! only care about finalized headers.
|
||||||
|
|
||||||
|
// Until there'll be actual message-lane in the runtime.
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
|
||||||
|
use crate::message_race_delivery::run as run_message_delivery_race;
|
||||||
|
use crate::utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
|
||||||
|
use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
|
||||||
|
|
||||||
|
/// Source client trait.
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
pub trait SourceClient<P: MessageLane>: Clone {
|
||||||
|
/// Type of error this clients returns.
|
||||||
|
type Error: std::fmt::Debug + MaybeConnectionError;
|
||||||
|
|
||||||
|
/// Try to reconnect to source node.
|
||||||
|
fn reconnect(self) -> Self;
|
||||||
|
|
||||||
|
/// Returns state of the client.
|
||||||
|
async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
|
||||||
|
|
||||||
|
/// Get nonce of instance of latest generated message.
|
||||||
|
async fn latest_generated_nonce(
|
||||||
|
&self,
|
||||||
|
id: SourceHeaderIdOf<P>,
|
||||||
|
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error>;
|
||||||
|
|
||||||
|
/// Prove messages in inclusive range [begin; end].
|
||||||
|
async fn prove_messages(
|
||||||
|
&self,
|
||||||
|
id: SourceHeaderIdOf<P>,
|
||||||
|
nonces: RangeInclusive<P::MessageNonce>,
|
||||||
|
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<P::MessageNonce>, P::MessagesProof), Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Target client trait.
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
pub trait TargetClient<P: MessageLane>: Clone {
|
||||||
|
/// Type of error this clients returns.
|
||||||
|
type Error: std::fmt::Debug + MaybeConnectionError;
|
||||||
|
|
||||||
|
/// Try to reconnect to source node.
|
||||||
|
fn reconnect(self) -> Self;
|
||||||
|
|
||||||
|
/// Returns state of the client.
|
||||||
|
async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
|
||||||
|
|
||||||
|
/// Get nonce of latest message, which receival has been confirmed.
|
||||||
|
async fn latest_received_nonce(
|
||||||
|
&self,
|
||||||
|
id: TargetHeaderIdOf<P>,
|
||||||
|
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error>;
|
||||||
|
|
||||||
|
/// Submit messages proof.
|
||||||
|
async fn submit_messages_proof(
|
||||||
|
&self,
|
||||||
|
generated_at_header: SourceHeaderIdOf<P>,
|
||||||
|
nonces: RangeInclusive<P::MessageNonce>,
|
||||||
|
proof: P::MessagesProof,
|
||||||
|
) -> Result<RangeInclusive<P::MessageNonce>, Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State of the client.
|
||||||
|
#[derive(Clone, Debug, Default, PartialEq)]
|
||||||
|
pub struct ClientState<SelfHeaderId, PeerHeaderId> {
|
||||||
|
/// Best header id of this chain.
|
||||||
|
pub best_self: SelfHeaderId,
|
||||||
|
/// Best header id of the peer chain.
|
||||||
|
pub best_peer: PeerHeaderId,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State of source client in one-way message lane.
|
||||||
|
pub type SourceClientState<P> = ClientState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>;
|
||||||
|
|
||||||
|
/// State of target client in one-way message lane.
|
||||||
|
pub type TargetClientState<P> = ClientState<TargetHeaderIdOf<P>, SourceHeaderIdOf<P>>;
|
||||||
|
|
||||||
|
/// Both clients state.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct ClientsState<P: MessageLane> {
|
||||||
|
/// Source client state.
|
||||||
|
pub source: Option<SourceClientState<P>>,
|
||||||
|
/// Target client state.
|
||||||
|
pub target: Option<TargetClientState<P>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run message lane service loop.
|
||||||
|
pub fn run<P: MessageLane>(
|
||||||
|
mut source_client: impl SourceClient<P>,
|
||||||
|
source_tick: Duration,
|
||||||
|
mut target_client: impl TargetClient<P>,
|
||||||
|
target_tick: Duration,
|
||||||
|
reconnect_delay: Duration,
|
||||||
|
stall_timeout: Duration,
|
||||||
|
exit_signal: impl Future<Output = ()>,
|
||||||
|
) {
|
||||||
|
let mut local_pool = futures::executor::LocalPool::new();
|
||||||
|
let exit_signal = exit_signal.shared();
|
||||||
|
|
||||||
|
local_pool.run_until(async move {
|
||||||
|
loop {
|
||||||
|
let result = run_until_connection_lost(
|
||||||
|
source_client.clone(),
|
||||||
|
source_tick,
|
||||||
|
target_client.clone(),
|
||||||
|
target_tick,
|
||||||
|
stall_timeout,
|
||||||
|
exit_signal.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => break,
|
||||||
|
Err(failed_client) => {
|
||||||
|
async_std::task::sleep(reconnect_delay).await;
|
||||||
|
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
|
||||||
|
source_client = source_client.reconnect();
|
||||||
|
}
|
||||||
|
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
|
||||||
|
target_client = target_client.reconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Restarting lane {} -> {}",
|
||||||
|
P::SOURCE_NAME,
|
||||||
|
P::TARGET_NAME,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received.
|
||||||
|
async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||||
|
source_client: SC,
|
||||||
|
source_tick: Duration,
|
||||||
|
target_client: TC,
|
||||||
|
target_tick: Duration,
|
||||||
|
stall_timeout: Duration,
|
||||||
|
exit_signal: impl Future<Output = ()>,
|
||||||
|
) -> Result<(), FailedClient> {
|
||||||
|
let mut source_retry_backoff = retry_backoff();
|
||||||
|
let mut source_client_is_online = false;
|
||||||
|
let mut source_state_required = true;
|
||||||
|
let source_state = source_client.state().fuse();
|
||||||
|
let source_go_offline_future = futures::future::Fuse::terminated();
|
||||||
|
let source_tick_stream = interval(source_tick).fuse();
|
||||||
|
|
||||||
|
let mut target_retry_backoff = retry_backoff();
|
||||||
|
let mut target_client_is_online = false;
|
||||||
|
let mut target_state_required = true;
|
||||||
|
let target_state = target_client.state().fuse();
|
||||||
|
let target_go_offline_future = futures::future::Fuse::terminated();
|
||||||
|
let target_tick_stream = interval(target_tick).fuse();
|
||||||
|
|
||||||
|
let (
|
||||||
|
(delivery_source_state_sender, delivery_source_state_receiver),
|
||||||
|
(delivery_target_state_sender, delivery_target_state_receiver),
|
||||||
|
) = (unbounded(), unbounded());
|
||||||
|
let delivery_race_loop = run_message_delivery_race(
|
||||||
|
source_client.clone(),
|
||||||
|
delivery_source_state_receiver,
|
||||||
|
target_client.clone(),
|
||||||
|
delivery_target_state_receiver,
|
||||||
|
stall_timeout,
|
||||||
|
)
|
||||||
|
.fuse();
|
||||||
|
|
||||||
|
let exit_signal = exit_signal.fuse();
|
||||||
|
|
||||||
|
futures::pin_mut!(
|
||||||
|
source_state,
|
||||||
|
source_go_offline_future,
|
||||||
|
source_tick_stream,
|
||||||
|
target_state,
|
||||||
|
target_go_offline_future,
|
||||||
|
target_tick_stream,
|
||||||
|
delivery_race_loop,
|
||||||
|
exit_signal
|
||||||
|
);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
futures::select! {
|
||||||
|
new_source_state = source_state => {
|
||||||
|
source_state_required = false;
|
||||||
|
|
||||||
|
source_client_is_online = process_future_result(
|
||||||
|
new_source_state,
|
||||||
|
&mut source_retry_backoff,
|
||||||
|
|new_source_state| {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Received state from {} node: {:?}",
|
||||||
|
P::SOURCE_NAME,
|
||||||
|
new_source_state,
|
||||||
|
);
|
||||||
|
let _ = delivery_source_state_sender.unbounded_send(new_source_state);
|
||||||
|
},
|
||||||
|
&mut source_go_offline_future,
|
||||||
|
|delay| async_std::task::sleep(delay),
|
||||||
|
|| format!("Error retrieving state from {} node", P::SOURCE_NAME),
|
||||||
|
).fail_if_connection_error(FailedClient::Source)?;
|
||||||
|
},
|
||||||
|
_ = source_go_offline_future => {
|
||||||
|
source_client_is_online = true;
|
||||||
|
},
|
||||||
|
_ = source_tick_stream.next() => {
|
||||||
|
source_state_required = true;
|
||||||
|
},
|
||||||
|
new_target_state = target_state => {
|
||||||
|
target_state_required = false;
|
||||||
|
|
||||||
|
target_client_is_online = process_future_result(
|
||||||
|
new_target_state,
|
||||||
|
&mut target_retry_backoff,
|
||||||
|
|new_target_state| {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Received state from {} node: {:?}",
|
||||||
|
P::TARGET_NAME,
|
||||||
|
new_target_state,
|
||||||
|
);
|
||||||
|
let _ = delivery_target_state_sender.unbounded_send(new_target_state);
|
||||||
|
},
|
||||||
|
&mut target_go_offline_future,
|
||||||
|
|delay| async_std::task::sleep(delay),
|
||||||
|
|| format!("Error retrieving state from {} node", P::TARGET_NAME),
|
||||||
|
).fail_if_connection_error(FailedClient::Target)?;
|
||||||
|
},
|
||||||
|
_ = target_go_offline_future => {
|
||||||
|
target_client_is_online = true;
|
||||||
|
},
|
||||||
|
_ = target_tick_stream.next() => {
|
||||||
|
target_state_required = true;
|
||||||
|
},
|
||||||
|
|
||||||
|
delivery_error = delivery_race_loop => {
|
||||||
|
match delivery_error {
|
||||||
|
Ok(_) => unreachable!("only ends with error; qed"),
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
() = exit_signal => {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if source_client_is_online && source_state_required {
|
||||||
|
log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
|
||||||
|
source_state.set(source_client.state().fuse());
|
||||||
|
source_client_is_online = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if target_client_is_online && target_state_required {
|
||||||
|
log::debug!(target: "bridge", "Asking {} node about its state", P::TARGET_NAME);
|
||||||
|
target_state.set(target_client.state().fuse());
|
||||||
|
target_client_is_online = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::utils::HeaderId;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub fn header_id(number: TestSourceHeaderNumber) -> HeaderId<TestSourceHeaderNumber, TestSourceHeaderHash> {
|
||||||
|
HeaderId(number, number)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type TestMessageNonce = u64;
|
||||||
|
pub type TestMessagesProof = RangeInclusive<TestMessageNonce>;
|
||||||
|
|
||||||
|
pub type TestSourceHeaderNumber = u64;
|
||||||
|
pub type TestSourceHeaderHash = u64;
|
||||||
|
|
||||||
|
pub type TestTargetHeaderNumber = u64;
|
||||||
|
pub type TestTargetHeaderHash = u64;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum TestError {
|
||||||
|
Logic,
|
||||||
|
Connection,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MaybeConnectionError for TestError {
|
||||||
|
fn is_connection_error(&self) -> bool {
|
||||||
|
match *self {
|
||||||
|
TestError::Logic => false,
|
||||||
|
TestError::Connection => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TestMessageLane;
|
||||||
|
|
||||||
|
impl MessageLane for TestMessageLane {
|
||||||
|
const SOURCE_NAME: &'static str = "TestSource";
|
||||||
|
const TARGET_NAME: &'static str = "TestTarget";
|
||||||
|
|
||||||
|
type MessageNonce = TestMessageNonce;
|
||||||
|
type MessagesProof = TestMessagesProof;
|
||||||
|
|
||||||
|
type SourceHeaderNumber = TestSourceHeaderNumber;
|
||||||
|
type SourceHeaderHash = TestSourceHeaderHash;
|
||||||
|
|
||||||
|
type TargetHeaderNumber = TestTargetHeaderNumber;
|
||||||
|
type TargetHeaderHash = TestTargetHeaderHash;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
pub struct TestClientData {
|
||||||
|
is_source_fails: bool,
|
||||||
|
is_source_reconnected: bool,
|
||||||
|
source_state: SourceClientState<TestMessageLane>,
|
||||||
|
source_latest_generated_nonce: TestMessageNonce,
|
||||||
|
is_target_fails: bool,
|
||||||
|
is_target_reconnected: bool,
|
||||||
|
target_state: SourceClientState<TestMessageLane>,
|
||||||
|
target_latest_received_nonce: TestMessageNonce,
|
||||||
|
submitted_messages_proofs: Vec<TestMessagesProof>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TestSourceClient {
|
||||||
|
data: Arc<Mutex<TestClientData>>,
|
||||||
|
tick: Arc<dyn Fn(&mut TestClientData)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl SourceClient<TestMessageLane> for TestSourceClient {
|
||||||
|
type Error = TestError;
|
||||||
|
|
||||||
|
fn reconnect(self) -> Self {
|
||||||
|
{
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
data.is_source_reconnected = true;
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn state(&self) -> Result<SourceClientState<TestMessageLane>, Self::Error> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
if data.is_source_fails {
|
||||||
|
return Err(TestError::Connection);
|
||||||
|
}
|
||||||
|
Ok(data.source_state.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get nonce of instance of latest generated message.
|
||||||
|
async fn latest_generated_nonce(
|
||||||
|
&self,
|
||||||
|
id: SourceHeaderIdOf<TestMessageLane>,
|
||||||
|
) -> Result<(SourceHeaderIdOf<TestMessageLane>, TestMessageNonce), Self::Error> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
if data.is_source_fails {
|
||||||
|
return Err(TestError::Connection);
|
||||||
|
}
|
||||||
|
Ok((id, data.source_latest_generated_nonce))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn prove_messages(
|
||||||
|
&self,
|
||||||
|
id: SourceHeaderIdOf<TestMessageLane>,
|
||||||
|
nonces: RangeInclusive<TestMessageNonce>,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
SourceHeaderIdOf<TestMessageLane>,
|
||||||
|
RangeInclusive<TestMessageNonce>,
|
||||||
|
TestMessagesProof,
|
||||||
|
),
|
||||||
|
Self::Error,
|
||||||
|
> {
|
||||||
|
Ok((id, nonces.clone(), nonces))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TestTargetClient {
|
||||||
|
data: Arc<Mutex<TestClientData>>,
|
||||||
|
tick: Arc<dyn Fn(&mut TestClientData)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl TargetClient<TestMessageLane> for TestTargetClient {
|
||||||
|
type Error = TestError;
|
||||||
|
|
||||||
|
fn reconnect(self) -> Self {
|
||||||
|
{
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
data.is_target_reconnected = true;
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn state(&self) -> Result<TargetClientState<TestMessageLane>, Self::Error> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
if data.is_target_fails {
|
||||||
|
return Err(TestError::Connection);
|
||||||
|
}
|
||||||
|
Ok(data.target_state.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn latest_received_nonce(
|
||||||
|
&self,
|
||||||
|
id: TargetHeaderIdOf<TestMessageLane>,
|
||||||
|
) -> Result<(TargetHeaderIdOf<TestMessageLane>, TestMessageNonce), Self::Error> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
if data.is_target_fails {
|
||||||
|
return Err(TestError::Connection);
|
||||||
|
}
|
||||||
|
Ok((id, data.target_latest_received_nonce))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Submit messages proof.
|
||||||
|
async fn submit_messages_proof(
|
||||||
|
&self,
|
||||||
|
_generated_at_header: SourceHeaderIdOf<TestMessageLane>,
|
||||||
|
nonces: RangeInclusive<TestMessageNonce>,
|
||||||
|
proof: TestMessagesProof,
|
||||||
|
) -> Result<RangeInclusive<TestMessageNonce>, Self::Error> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
(self.tick)(&mut *data);
|
||||||
|
if data.is_target_fails {
|
||||||
|
return Err(TestError::Connection);
|
||||||
|
}
|
||||||
|
data.target_state.best_self =
|
||||||
|
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
|
||||||
|
data.target_latest_received_nonce = *proof.end();
|
||||||
|
data.submitted_messages_proofs.push(proof);
|
||||||
|
Ok(nonces)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_loop_test(
|
||||||
|
data: TestClientData,
|
||||||
|
source_tick: Arc<dyn Fn(&mut TestClientData)>,
|
||||||
|
target_tick: Arc<dyn Fn(&mut TestClientData)>,
|
||||||
|
exit_signal: impl Future<Output = ()>,
|
||||||
|
) -> TestClientData {
|
||||||
|
async_std::task::block_on(async {
|
||||||
|
let data = Arc::new(Mutex::new(data));
|
||||||
|
|
||||||
|
let source_client = TestSourceClient {
|
||||||
|
data: data.clone(),
|
||||||
|
tick: source_tick,
|
||||||
|
};
|
||||||
|
let target_client = TestTargetClient {
|
||||||
|
data: data.clone(),
|
||||||
|
tick: target_tick,
|
||||||
|
};
|
||||||
|
run(
|
||||||
|
source_client,
|
||||||
|
Duration::from_millis(100),
|
||||||
|
target_client,
|
||||||
|
Duration::from_millis(100),
|
||||||
|
Duration::from_millis(0),
|
||||||
|
Duration::from_secs(60),
|
||||||
|
exit_signal,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = data.lock().clone();
|
||||||
|
result
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn message_lane_loop_is_able_to_recover_from_connection_errors() {
|
||||||
|
// with this configuration, source client will return Err, making source client
|
||||||
|
// reconnect. Then the target client will fail with Err + reconnect. Then we finally
|
||||||
|
// able to deliver messages.
|
||||||
|
let (exit_sender, exit_receiver) = unbounded();
|
||||||
|
let result = run_loop_test(
|
||||||
|
TestClientData {
|
||||||
|
is_source_fails: true,
|
||||||
|
source_state: ClientState {
|
||||||
|
best_self: HeaderId(0, 0),
|
||||||
|
best_peer: HeaderId(0, 0),
|
||||||
|
},
|
||||||
|
source_latest_generated_nonce: 1,
|
||||||
|
target_state: ClientState {
|
||||||
|
best_self: HeaderId(0, 0),
|
||||||
|
best_peer: HeaderId(0, 0),
|
||||||
|
},
|
||||||
|
target_latest_received_nonce: 0,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
Arc::new(|data: &mut TestClientData| {
|
||||||
|
if data.is_source_reconnected {
|
||||||
|
data.is_source_fails = false;
|
||||||
|
data.is_target_fails = true;
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
Arc::new(move |data: &mut TestClientData| {
|
||||||
|
if data.is_target_reconnected {
|
||||||
|
data.is_target_fails = false;
|
||||||
|
}
|
||||||
|
if data.target_state.best_peer.0 < 10 {
|
||||||
|
data.target_state.best_peer =
|
||||||
|
HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1);
|
||||||
|
}
|
||||||
|
if !data.submitted_messages_proofs.is_empty() {
|
||||||
|
exit_sender.unbounded_send(()).unwrap();
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
exit_receiver.into_future().map(|(_, _)| ()),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(result.submitted_messages_proofs, vec![1..=1],);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn message_lane_loop_works() {
|
||||||
|
// with this configuration, target client must first sync headers [1; 10] and
|
||||||
|
// then submit proof-of-messages [0; 10] at once
|
||||||
|
let (exit_sender, exit_receiver) = unbounded();
|
||||||
|
let result = run_loop_test(
|
||||||
|
TestClientData {
|
||||||
|
source_state: ClientState {
|
||||||
|
best_self: HeaderId(10, 10),
|
||||||
|
best_peer: HeaderId(0, 0),
|
||||||
|
},
|
||||||
|
source_latest_generated_nonce: 10,
|
||||||
|
target_state: ClientState {
|
||||||
|
best_self: HeaderId(0, 0),
|
||||||
|
best_peer: HeaderId(0, 0),
|
||||||
|
},
|
||||||
|
target_latest_received_nonce: 0,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
Arc::new(|_: &mut TestClientData| {}),
|
||||||
|
Arc::new(move |data: &mut TestClientData| {
|
||||||
|
if data.target_state.best_peer.0 < 10 {
|
||||||
|
data.target_state.best_peer =
|
||||||
|
HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1);
|
||||||
|
}
|
||||||
|
if data
|
||||||
|
.submitted_messages_proofs
|
||||||
|
.last()
|
||||||
|
.map(|last| *last.end() == 10)
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
exit_sender.unbounded_send(()).unwrap();
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
exit_receiver.into_future().map(|(_, _)| ()),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(result.submitted_messages_proofs, vec![1..=4, 5..=8, 9..=10],);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,385 @@
|
|||||||
|
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity Bridges Common.
|
||||||
|
|
||||||
|
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity Bridges Common is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
//! Message delivery race delivers proof-of-messages from lane.source to lane.target.
|
||||||
|
|
||||||
|
use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
|
||||||
|
use crate::message_lane_loop::{
|
||||||
|
SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient,
|
||||||
|
TargetClientState,
|
||||||
|
};
|
||||||
|
use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient};
|
||||||
|
use crate::utils::FailedClient;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::stream::FusedStream;
|
||||||
|
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration};
|
||||||
|
|
||||||
|
/// Maximal number of messages to relay in single transaction.
|
||||||
|
const MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX: u32 = 4;
|
||||||
|
|
||||||
|
/// Run message delivery race.
|
||||||
|
pub async fn run<P: MessageLane>(
|
||||||
|
source_client: impl MessageLaneSourceClient<P>,
|
||||||
|
source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
|
||||||
|
target_client: impl MessageLaneTargetClient<P>,
|
||||||
|
target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
|
||||||
|
stall_timeout: Duration,
|
||||||
|
) -> Result<(), FailedClient> {
|
||||||
|
crate::message_race_loop::run(
|
||||||
|
MessageDeliveryRaceSource {
|
||||||
|
client: source_client,
|
||||||
|
_phantom: Default::default(),
|
||||||
|
},
|
||||||
|
source_state_updates,
|
||||||
|
MessageDeliveryRaceTarget {
|
||||||
|
client: target_client,
|
||||||
|
_phantom: Default::default(),
|
||||||
|
},
|
||||||
|
target_state_updates,
|
||||||
|
stall_timeout,
|
||||||
|
MessageDeliveryStrategy::<P>::default(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Message delivery race.
|
||||||
|
struct MessageDeliveryRace<P>(std::marker::PhantomData<P>);
|
||||||
|
|
||||||
|
impl<P: MessageLane> MessageRace for MessageDeliveryRace<P> {
|
||||||
|
type SourceHeaderId = SourceHeaderIdOf<P>;
|
||||||
|
type TargetHeaderId = TargetHeaderIdOf<P>;
|
||||||
|
|
||||||
|
type MessageNonce = P::MessageNonce;
|
||||||
|
type Proof = P::MessagesProof;
|
||||||
|
|
||||||
|
fn source_name() -> String {
|
||||||
|
format!("{}::MessagesDelivery", P::SOURCE_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn target_name() -> String {
|
||||||
|
format!("{}::MessagesDelivery", P::TARGET_NAME)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Message delivery race source, which is a source of the lane.
|
||||||
|
struct MessageDeliveryRaceSource<P: MessageLane, C> {
|
||||||
|
client: C,
|
||||||
|
_phantom: PhantomData<P>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl<P, C> SourceClient<MessageDeliveryRace<P>> for MessageDeliveryRaceSource<P, C>
|
||||||
|
where
|
||||||
|
P: MessageLane,
|
||||||
|
C: MessageLaneSourceClient<P>,
|
||||||
|
{
|
||||||
|
type Error = C::Error;
|
||||||
|
|
||||||
|
async fn latest_nonce(
|
||||||
|
&self,
|
||||||
|
at_block: SourceHeaderIdOf<P>,
|
||||||
|
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error> {
|
||||||
|
self.client.latest_generated_nonce(at_block).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn generate_proof(
|
||||||
|
&self,
|
||||||
|
at_block: SourceHeaderIdOf<P>,
|
||||||
|
nonces: RangeInclusive<P::MessageNonce>,
|
||||||
|
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<P::MessageNonce>, P::MessagesProof), Self::Error> {
|
||||||
|
self.client.prove_messages(at_block, nonces).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Message delivery race target, which is a target of the lane.
|
||||||
|
struct MessageDeliveryRaceTarget<P: MessageLane, C> {
|
||||||
|
client: C,
|
||||||
|
_phantom: PhantomData<P>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl<P, C> TargetClient<MessageDeliveryRace<P>> for MessageDeliveryRaceTarget<P, C>
|
||||||
|
where
|
||||||
|
P: MessageLane,
|
||||||
|
C: MessageLaneTargetClient<P>,
|
||||||
|
{
|
||||||
|
type Error = C::Error;
|
||||||
|
|
||||||
|
async fn latest_nonce(
|
||||||
|
&self,
|
||||||
|
at_block: TargetHeaderIdOf<P>,
|
||||||
|
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error> {
|
||||||
|
self.client.latest_received_nonce(at_block).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn submit_proof(
|
||||||
|
&self,
|
||||||
|
generated_at_block: SourceHeaderIdOf<P>,
|
||||||
|
nonces: RangeInclusive<P::MessageNonce>,
|
||||||
|
proof: P::MessagesProof,
|
||||||
|
) -> Result<RangeInclusive<P::MessageNonce>, Self::Error> {
|
||||||
|
self.client
|
||||||
|
.submit_messages_proof(generated_at_block, nonces, proof)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Message delivery strategy.
|
||||||
|
struct MessageDeliveryStrategy<P: MessageLane> {
|
||||||
|
/// All queued nonces.
|
||||||
|
source_queue: VecDeque<(SourceHeaderIdOf<P>, P::MessageNonce)>,
|
||||||
|
/// Best nonce known to target node.
|
||||||
|
target_nonce: P::MessageNonce,
|
||||||
|
/// Unused generic types dump.
|
||||||
|
_phantom: PhantomData<P>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P: MessageLane> Default for MessageDeliveryStrategy<P> {
|
||||||
|
fn default() -> Self {
|
||||||
|
MessageDeliveryStrategy {
|
||||||
|
source_queue: VecDeque::new(),
|
||||||
|
target_nonce: Default::default(),
|
||||||
|
_phantom: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessageNonce, P::MessagesProof>
|
||||||
|
for MessageDeliveryStrategy<P>
|
||||||
|
{
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.source_queue.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn source_nonce_updated(&mut self, at_block: SourceHeaderIdOf<P>, nonce: P::MessageNonce) {
|
||||||
|
if nonce <= self.target_nonce {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.source_queue.back() {
|
||||||
|
Some((_, prev_nonce)) if *prev_nonce < nonce => (),
|
||||||
|
Some(_) => return,
|
||||||
|
None => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
self.source_queue.push_back((at_block, nonce))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn target_nonce_updated(
|
||||||
|
&mut self,
|
||||||
|
nonce: P::MessageNonce,
|
||||||
|
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessageNonce, P::MessagesProof>,
|
||||||
|
) {
|
||||||
|
if nonce < self.target_nonce {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(true) = self
|
||||||
|
.source_queue
|
||||||
|
.front()
|
||||||
|
.map(|(_, source_nonce)| *source_nonce <= nonce)
|
||||||
|
{
|
||||||
|
self.source_queue.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
let need_to_select_new_nonces = race_state
|
||||||
|
.nonces_to_submit
|
||||||
|
.as_ref()
|
||||||
|
.map(|(_, nonces, _)| *nonces.end() <= nonce)
|
||||||
|
.unwrap_or(false);
|
||||||
|
if need_to_select_new_nonces {
|
||||||
|
race_state.nonces_to_submit = None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let need_new_nonces_to_submit = race_state
|
||||||
|
.nonces_submitted
|
||||||
|
.as_ref()
|
||||||
|
.map(|nonces| *nonces.end() <= nonce)
|
||||||
|
.unwrap_or(false);
|
||||||
|
if need_new_nonces_to_submit {
|
||||||
|
race_state.nonces_submitted = None;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.target_nonce = nonce;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_nonces_to_deliver(
|
||||||
|
&mut self,
|
||||||
|
race_state: &RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessageNonce, P::MessagesProof>,
|
||||||
|
) -> Option<RangeInclusive<P::MessageNonce>> {
|
||||||
|
// if we have already selected nonces that we want to submit, do nothing
|
||||||
|
if race_state.nonces_to_submit.is_some() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we already submitted some nonces, do nothing
|
||||||
|
if race_state.nonces_submitted.is_some() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1) we want to deliver all nonces, starting from `target_nonce + 1`
|
||||||
|
// 2) we want to deliver at most `MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX` nonces in this batch
|
||||||
|
// 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized
|
||||||
|
// by target client
|
||||||
|
let nonces_begin = self.target_nonce + 1.into();
|
||||||
|
let best_header_at_target = &race_state.target_state.as_ref()?.best_peer;
|
||||||
|
let mut nonces_end = None;
|
||||||
|
for i in 0..MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX {
|
||||||
|
let nonce = nonces_begin + i.into();
|
||||||
|
|
||||||
|
// if queue is empty, we don't need to prove anything
|
||||||
|
let (first_queued_at, first_queued_nonce) = match self.source_queue.front() {
|
||||||
|
Some((first_queued_at, first_queued_nonce)) => (first_queued_at.clone(), *first_queued_nonce),
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
// if header that has queued the message is not yet finalized at bridged chain,
|
||||||
|
// we can't prove anything
|
||||||
|
if first_queued_at.0 > best_header_at_target.0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ok, we may deliver this nonce
|
||||||
|
nonces_end = Some(nonce);
|
||||||
|
|
||||||
|
// probably remove it from the queue?
|
||||||
|
if nonce == first_queued_nonce {
|
||||||
|
self.source_queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nonces_end.map(|nonces_end| RangeInclusive::new(nonces_begin, nonces_end))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::message_lane_loop::{
|
||||||
|
tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof},
|
||||||
|
ClientState,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strategy_is_empty_works() {
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
assert_eq!(strategy.is_empty(), true);
|
||||||
|
strategy.source_nonce_updated(header_id(1), 1);
|
||||||
|
assert_eq!(strategy.is_empty(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn source_nonce_is_never_lower_than_known_target_nonce() {
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
strategy.target_nonce_updated(10, &mut Default::default());
|
||||||
|
strategy.source_nonce_updated(header_id(1), 5);
|
||||||
|
assert_eq!(strategy.source_queue, vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn source_nonce_is_never_lower_than_latest_known_source_nonce() {
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
strategy.source_nonce_updated(header_id(1), 5);
|
||||||
|
strategy.source_nonce_updated(header_id(2), 3);
|
||||||
|
strategy.source_nonce_updated(header_id(2), 5);
|
||||||
|
assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn target_nonce_is_never_lower_than_latest_known_target_nonce() {
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
strategy.target_nonce_updated(10, &mut Default::default());
|
||||||
|
strategy.target_nonce_updated(5, &mut Default::default());
|
||||||
|
assert_eq!(strategy.target_nonce, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn updated_target_nonce_removes_queued_entries() {
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
strategy.source_nonce_updated(header_id(1), 5);
|
||||||
|
strategy.source_nonce_updated(header_id(2), 10);
|
||||||
|
strategy.source_nonce_updated(header_id(3), 15);
|
||||||
|
strategy.source_nonce_updated(header_id(4), 20);
|
||||||
|
strategy.target_nonce_updated(15, &mut Default::default());
|
||||||
|
assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn selected_nonces_are_dropped_on_target_nonce_update() {
|
||||||
|
let mut state = RaceState::default();
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
state.nonces_to_submit = Some((header_id(1), 5..=10, 5..=10));
|
||||||
|
strategy.target_nonce_updated(7, &mut state);
|
||||||
|
assert!(state.nonces_to_submit.is_some());
|
||||||
|
strategy.target_nonce_updated(10, &mut state);
|
||||||
|
assert!(state.nonces_to_submit.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn submitted_nonces_are_dropped_on_target_nonce_update() {
|
||||||
|
let mut state = RaceState::default();
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
state.nonces_submitted = Some(5..=10);
|
||||||
|
strategy.target_nonce_updated(7, &mut state);
|
||||||
|
assert!(state.nonces_submitted.is_some());
|
||||||
|
strategy.target_nonce_updated(10, &mut state);
|
||||||
|
assert!(state.nonces_submitted.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn nothing_is_selected_if_something_is_already_selected() {
|
||||||
|
let mut state = RaceState::default();
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
state.nonces_to_submit = Some((header_id(1), 1..=10, 1..=10));
|
||||||
|
strategy.source_nonce_updated(header_id(1), 10);
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn nothing_is_selected_if_something_is_already_submitted() {
|
||||||
|
let mut state = RaceState::default();
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
state.nonces_submitted = Some(1..=10);
|
||||||
|
strategy.source_nonce_updated(header_id(1), 10);
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn select_nonces_to_deliver_works() {
|
||||||
|
let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default();
|
||||||
|
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::default();
|
||||||
|
strategy.source_nonce_updated(header_id(1), 1);
|
||||||
|
strategy.source_nonce_updated(header_id(2), 2);
|
||||||
|
strategy.source_nonce_updated(header_id(3), 6);
|
||||||
|
strategy.source_nonce_updated(header_id(5), 8);
|
||||||
|
|
||||||
|
state.target_state = Some(ClientState {
|
||||||
|
best_self: header_id(0),
|
||||||
|
best_peer: header_id(4),
|
||||||
|
});
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), Some(1..=4));
|
||||||
|
strategy.target_nonce_updated(4, &mut state);
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), Some(5..=6));
|
||||||
|
strategy.target_nonce_updated(6, &mut state);
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||||
|
|
||||||
|
state.target_state = Some(ClientState {
|
||||||
|
best_self: header_id(0),
|
||||||
|
best_peer: header_id(5),
|
||||||
|
});
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), Some(7..=8));
|
||||||
|
strategy.target_nonce_updated(8, &mut state);
|
||||||
|
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,359 @@
|
|||||||
|
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity Bridges Common.
|
||||||
|
|
||||||
|
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity Bridges Common is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
//! Loop that is serving single race within message lane. This could be
|
||||||
|
//! message delivery race, receiving confirmations race or processing
|
||||||
|
//! confirmations race.
|
||||||
|
//!
|
||||||
|
//! The idea of the race is simple - we have `nonce`-s on source and target
|
||||||
|
//! nodes. We're trying to prove that the source node has this nonce (and
|
||||||
|
//! associated data - like messages, lane state, etc) to the target node by
|
||||||
|
//! generating and submitting proof.
|
||||||
|
|
||||||
|
// Until there'll be actual message-lane in the runtime.
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::message_lane_loop::ClientState;
|
||||||
|
use crate::utils::{process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::{
|
||||||
|
future::FutureExt,
|
||||||
|
stream::{FusedStream, StreamExt},
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
fmt::Debug,
|
||||||
|
ops::RangeInclusive,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// One of races within lane.
|
||||||
|
pub trait MessageRace {
|
||||||
|
/// Header id of the race source.
|
||||||
|
type SourceHeaderId: Debug + Clone + PartialEq;
|
||||||
|
/// Header id of the race source.
|
||||||
|
type TargetHeaderId: Debug + Clone + PartialEq;
|
||||||
|
|
||||||
|
/// Message nonce used in the race.
|
||||||
|
type MessageNonce: Debug + Clone;
|
||||||
|
/// Proof that is generated and delivered in this race.
|
||||||
|
type Proof: Clone;
|
||||||
|
|
||||||
|
/// Name of the race source.
|
||||||
|
fn source_name() -> String;
|
||||||
|
/// Name of the race target.
|
||||||
|
fn target_name() -> String;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State of race source client.
|
||||||
|
type SourceClientState<P> = ClientState<<P as MessageRace>::SourceHeaderId, <P as MessageRace>::TargetHeaderId>;
|
||||||
|
|
||||||
|
/// State of race target client.
|
||||||
|
type TargetClientState<P> = ClientState<<P as MessageRace>::TargetHeaderId, <P as MessageRace>::SourceHeaderId>;
|
||||||
|
|
||||||
|
/// One of message lane clients, which is source client for the race.
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
pub trait SourceClient<P: MessageRace> {
|
||||||
|
/// Type of error this clients returns.
|
||||||
|
type Error: std::fmt::Debug + MaybeConnectionError;
|
||||||
|
|
||||||
|
/// Return latest nonce that is known to the source client.
|
||||||
|
async fn latest_nonce(
|
||||||
|
&self,
|
||||||
|
at_block: P::SourceHeaderId,
|
||||||
|
) -> Result<(P::SourceHeaderId, P::MessageNonce), Self::Error>;
|
||||||
|
/// Generate proof for delivering to the target client.
|
||||||
|
async fn generate_proof(
|
||||||
|
&self,
|
||||||
|
at_block: P::SourceHeaderId,
|
||||||
|
nonces: RangeInclusive<P::MessageNonce>,
|
||||||
|
) -> Result<(P::SourceHeaderId, RangeInclusive<P::MessageNonce>, P::Proof), Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One of message lane clients, which is target client for the race.
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
pub trait TargetClient<P: MessageRace> {
|
||||||
|
/// Type of error this clients returns.
|
||||||
|
type Error: std::fmt::Debug + MaybeConnectionError;
|
||||||
|
|
||||||
|
/// Return latest nonce that is known to the target client.
|
||||||
|
async fn latest_nonce(
|
||||||
|
&self,
|
||||||
|
at_block: P::TargetHeaderId,
|
||||||
|
) -> Result<(P::TargetHeaderId, P::MessageNonce), Self::Error>;
|
||||||
|
/// Submit proof to the target client.
|
||||||
|
async fn submit_proof(
|
||||||
|
&self,
|
||||||
|
generated_at_block: P::SourceHeaderId,
|
||||||
|
nonces: RangeInclusive<P::MessageNonce>,
|
||||||
|
proof: P::Proof,
|
||||||
|
) -> Result<RangeInclusive<P::MessageNonce>, Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Race strategy.
|
||||||
|
pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, MessageNonce, Proof> {
|
||||||
|
/// Should return true if nothing has to be synced.
|
||||||
|
fn is_empty(&self) -> bool;
|
||||||
|
/// Called when latest nonce is updated at source node of the race.
|
||||||
|
fn source_nonce_updated(&mut self, at_block: SourceHeaderId, nonce: MessageNonce);
|
||||||
|
/// Called when latest nonce is updated at target node of the race.
|
||||||
|
fn target_nonce_updated(
|
||||||
|
&mut self,
|
||||||
|
nonce: MessageNonce,
|
||||||
|
race_state: &mut RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof>,
|
||||||
|
);
|
||||||
|
/// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated
|
||||||
|
/// data) from source to target node.
|
||||||
|
fn select_nonces_to_deliver(
|
||||||
|
&mut self,
|
||||||
|
race_state: &RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof>,
|
||||||
|
) -> Option<RangeInclusive<MessageNonce>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State of the race.
|
||||||
|
pub struct RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof> {
|
||||||
|
/// Source state, if known.
|
||||||
|
pub source_state: Option<ClientState<SourceHeaderId, TargetHeaderId>>,
|
||||||
|
/// Target state, if known.
|
||||||
|
pub target_state: Option<ClientState<TargetHeaderId, SourceHeaderId>>,
|
||||||
|
/// Range of nonces that we have selected to submit.
|
||||||
|
pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>,
|
||||||
|
/// Range of nonces that is currently submitted.
|
||||||
|
pub nonces_submitted: Option<RangeInclusive<MessageNonce>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run race loop until connection with target or source node is lost.
|
||||||
|
pub async fn run<P: MessageRace>(
|
||||||
|
race_source: impl SourceClient<P>,
|
||||||
|
race_source_updated: impl FusedStream<Item = SourceClientState<P>>,
|
||||||
|
race_target: impl TargetClient<P>,
|
||||||
|
race_target_updated: impl FusedStream<Item = TargetClientState<P>>,
|
||||||
|
stall_timeout: Duration,
|
||||||
|
mut strategy: impl RaceStrategy<P::SourceHeaderId, P::TargetHeaderId, P::MessageNonce, P::Proof>,
|
||||||
|
) -> Result<(), FailedClient> {
|
||||||
|
let mut race_state = RaceState::default();
|
||||||
|
let mut stall_countdown = Instant::now();
|
||||||
|
|
||||||
|
let mut source_retry_backoff = retry_backoff();
|
||||||
|
let mut source_client_is_online = true;
|
||||||
|
let mut source_latest_nonce_required = false;
|
||||||
|
let source_latest_nonce = futures::future::Fuse::terminated();
|
||||||
|
let source_generate_proof = futures::future::Fuse::terminated();
|
||||||
|
let source_go_offline_future = futures::future::Fuse::terminated();
|
||||||
|
|
||||||
|
let mut target_retry_backoff = retry_backoff();
|
||||||
|
let mut target_client_is_online = true;
|
||||||
|
let mut target_latest_nonce_required = false;
|
||||||
|
let target_latest_nonce = futures::future::Fuse::terminated();
|
||||||
|
let target_submit_proof = futures::future::Fuse::terminated();
|
||||||
|
let target_go_offline_future = futures::future::Fuse::terminated();
|
||||||
|
|
||||||
|
futures::pin_mut!(
|
||||||
|
race_source_updated,
|
||||||
|
source_latest_nonce,
|
||||||
|
source_generate_proof,
|
||||||
|
source_go_offline_future,
|
||||||
|
race_target_updated,
|
||||||
|
target_latest_nonce,
|
||||||
|
target_submit_proof,
|
||||||
|
target_go_offline_future,
|
||||||
|
);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
futures::select! {
|
||||||
|
// when headers ids are updated
|
||||||
|
source_state = race_source_updated.next() => {
|
||||||
|
if let Some(source_state) = source_state {
|
||||||
|
if race_state.source_state.as_ref() != Some(&source_state) {
|
||||||
|
source_latest_nonce_required = true;
|
||||||
|
race_state.source_state = Some(source_state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
target_state = race_target_updated.next() => {
|
||||||
|
if let Some(target_state) = target_state {
|
||||||
|
if race_state.target_state.as_ref() != Some(&target_state) {
|
||||||
|
target_latest_nonce_required = true;
|
||||||
|
race_state.target_state = Some(target_state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// when nonces are updated
|
||||||
|
latest_nonce = source_latest_nonce => {
|
||||||
|
source_latest_nonce_required = false;
|
||||||
|
|
||||||
|
source_client_is_online = process_future_result(
|
||||||
|
latest_nonce,
|
||||||
|
&mut source_retry_backoff,
|
||||||
|
|(at_block, latest_nonce)| {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Received latest nonce from {}: {:?}",
|
||||||
|
P::source_name(),
|
||||||
|
latest_nonce,
|
||||||
|
);
|
||||||
|
|
||||||
|
strategy.source_nonce_updated(at_block, latest_nonce);
|
||||||
|
},
|
||||||
|
&mut source_go_offline_future,
|
||||||
|
|delay| async_std::task::sleep(delay),
|
||||||
|
|| format!("Error retrieving latest nonce from {}", P::source_name()),
|
||||||
|
).fail_if_connection_error(FailedClient::Source)?;
|
||||||
|
},
|
||||||
|
latest_nonce = target_latest_nonce => {
|
||||||
|
target_latest_nonce_required = false;
|
||||||
|
|
||||||
|
target_client_is_online = process_future_result(
|
||||||
|
latest_nonce,
|
||||||
|
&mut target_retry_backoff,
|
||||||
|
|(_, latest_nonce)| {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Received latest nonce from {}: {:?}",
|
||||||
|
P::target_name(),
|
||||||
|
latest_nonce,
|
||||||
|
);
|
||||||
|
|
||||||
|
strategy.target_nonce_updated(latest_nonce, &mut race_state);
|
||||||
|
},
|
||||||
|
&mut target_go_offline_future,
|
||||||
|
|delay| async_std::task::sleep(delay),
|
||||||
|
|| format!("Error retrieving latest nonce from {}", P::target_name()),
|
||||||
|
).fail_if_connection_error(FailedClient::Target)?;
|
||||||
|
},
|
||||||
|
|
||||||
|
// proof generation and submission
|
||||||
|
proof = source_generate_proof => {
|
||||||
|
source_client_is_online = process_future_result(
|
||||||
|
proof,
|
||||||
|
&mut source_retry_backoff,
|
||||||
|
|(at_block, nonces_range, proof)| {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Received proof for nonces in range {:?} from {}",
|
||||||
|
nonces_range,
|
||||||
|
P::source_name(),
|
||||||
|
);
|
||||||
|
|
||||||
|
race_state.nonces_to_submit = Some((at_block, nonces_range, proof));
|
||||||
|
},
|
||||||
|
&mut source_go_offline_future,
|
||||||
|
|delay| async_std::task::sleep(delay),
|
||||||
|
|| format!("Error generating proof at {}", P::source_name()),
|
||||||
|
).fail_if_connection_error(FailedClient::Source)?;
|
||||||
|
},
|
||||||
|
proof_submit_result = target_submit_proof => {
|
||||||
|
target_client_is_online = process_future_result(
|
||||||
|
proof_submit_result,
|
||||||
|
&mut target_retry_backoff,
|
||||||
|
|nonces_range| {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Successfully submitted proof of nonces {:?} to {}",
|
||||||
|
nonces_range,
|
||||||
|
P::target_name(),
|
||||||
|
);
|
||||||
|
|
||||||
|
race_state.nonces_to_submit = None;
|
||||||
|
race_state.nonces_submitted = Some(nonces_range);
|
||||||
|
},
|
||||||
|
&mut target_go_offline_future,
|
||||||
|
|delay| async_std::task::sleep(delay),
|
||||||
|
|| format!("Error submitting proof {}", P::target_name()),
|
||||||
|
).fail_if_connection_error(FailedClient::Target)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stall_countdown.elapsed() > stall_timeout {
|
||||||
|
return Err(FailedClient::Both);
|
||||||
|
} else if race_state.nonces_to_submit.is_none() && race_state.nonces_submitted.is_none() && strategy.is_empty()
|
||||||
|
{
|
||||||
|
stall_countdown = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
if source_client_is_online {
|
||||||
|
source_client_is_online = false;
|
||||||
|
|
||||||
|
let nonces_to_deliver = race_state.source_state.as_ref().and_then(|source_state| {
|
||||||
|
strategy
|
||||||
|
.select_nonces_to_deliver(&race_state)
|
||||||
|
.map(|nonces_range| (source_state.best_self.clone(), nonces_range))
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Some((at_block, nonces_range)) = nonces_to_deliver {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Asking {} to prove nonces in range {:?}",
|
||||||
|
P::source_name(),
|
||||||
|
nonces_range,
|
||||||
|
);
|
||||||
|
source_generate_proof.set(race_source.generate_proof(at_block, nonces_range).fuse());
|
||||||
|
} else if source_latest_nonce_required {
|
||||||
|
log::debug!(target: "bridge", "Asking {} about latest generated message nonce", P::source_name());
|
||||||
|
let at_block = race_state
|
||||||
|
.source_state
|
||||||
|
.as_ref()
|
||||||
|
.expect("source_latest_nonce_required is only true when source_state is Some; qed")
|
||||||
|
.best_self
|
||||||
|
.clone();
|
||||||
|
source_latest_nonce.set(race_source.latest_nonce(at_block).fuse());
|
||||||
|
} else {
|
||||||
|
source_client_is_online = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if target_client_is_online {
|
||||||
|
target_client_is_online = false;
|
||||||
|
|
||||||
|
if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() {
|
||||||
|
log::debug!(
|
||||||
|
target: "bridge",
|
||||||
|
"Going to submit proof of messages in range {:?} to {} node",
|
||||||
|
nonces_range,
|
||||||
|
P::target_name(),
|
||||||
|
);
|
||||||
|
target_submit_proof.set(
|
||||||
|
race_target
|
||||||
|
.submit_proof(at_block.clone(), nonces_range.clone(), proof.clone())
|
||||||
|
.fuse(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if target_latest_nonce_required {
|
||||||
|
log::debug!(target: "bridge", "Asking {} about latest nonce", P::target_name());
|
||||||
|
let at_block = race_state
|
||||||
|
.target_state
|
||||||
|
.as_ref()
|
||||||
|
.expect("target_latest_nonce_required is only true when target_state is Some; qed")
|
||||||
|
.best_self
|
||||||
|
.clone();
|
||||||
|
target_latest_nonce.set(race_target.latest_nonce(at_block).fuse());
|
||||||
|
} else {
|
||||||
|
target_client_is_online = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<SourceHeaderId, TargetHeaderId, MessageNonce, Proof> Default
|
||||||
|
for RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof>
|
||||||
|
{
|
||||||
|
fn default() -> Self {
|
||||||
|
RaceState {
|
||||||
|
source_state: None,
|
||||||
|
target_state: None,
|
||||||
|
nonces_to_submit: None,
|
||||||
|
nonces_submitted: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,7 +19,8 @@ use crate::instances::BridgeInstance;
|
|||||||
use crate::rpc::{Substrate, SubstrateRpc};
|
use crate::rpc::{Substrate, SubstrateRpc};
|
||||||
use crate::rpc_errors::RpcError;
|
use crate::rpc_errors::RpcError;
|
||||||
use crate::substrate_types::{Hash, Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock};
|
use crate::substrate_types::{Hash, Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock};
|
||||||
use crate::sync_types::{HeaderId, SubmittedHeaders};
|
use crate::sync_types::SubmittedHeaders;
|
||||||
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bp_eth_poa::Header as SubstrateEthereumHeader;
|
use bp_eth_poa::Header as SubstrateEthereumHeader;
|
||||||
|
|||||||
@@ -17,7 +17,9 @@
|
|||||||
use crate::ethereum_types::{
|
use crate::ethereum_types::{
|
||||||
Header as EthereumHeader, Receipt as EthereumReceipt, HEADER_ID_PROOF as ETHEREUM_HEADER_ID_PROOF,
|
Header as EthereumHeader, Receipt as EthereumReceipt, HEADER_ID_PROOF as ETHEREUM_HEADER_ID_PROOF,
|
||||||
};
|
};
|
||||||
use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
||||||
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
use codec::Encode;
|
use codec::Encode;
|
||||||
|
|
||||||
pub use bp_eth_poa::{
|
pub use bp_eth_poa::{
|
||||||
|
|||||||
@@ -310,7 +310,8 @@ pub mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256};
|
use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256};
|
||||||
use crate::headers::tests::{header, id};
|
use crate::headers::tests::{header, id};
|
||||||
use crate::sync_types::{HeaderId, HeaderStatus};
|
use crate::sync_types::HeaderStatus;
|
||||||
|
use crate::utils::HeaderId;
|
||||||
|
|
||||||
fn side_hash(number: u64) -> H256 {
|
fn side_hash(number: u64) -> H256 {
|
||||||
H256::from_low_u64_le(1000 + number)
|
H256::from_low_u64_le(1000 + number)
|
||||||
|
|||||||
@@ -18,10 +18,11 @@ use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
|
|||||||
use crate::sync::HeadersSyncParams;
|
use crate::sync::HeadersSyncParams;
|
||||||
use crate::sync_loop_metrics::SyncLoopMetrics;
|
use crate::sync_loop_metrics::SyncLoopMetrics;
|
||||||
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders};
|
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders};
|
||||||
use crate::utils::{format_ids, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError};
|
use crate::utils::{
|
||||||
|
format_ids, interval, process_future_result, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError,
|
||||||
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use backoff::{backoff::Backoff, ExponentialBackoff};
|
|
||||||
use futures::{future::FutureExt, stream::StreamExt};
|
use futures::{future::FutureExt, stream::StreamExt};
|
||||||
use num_traits::{Saturating, Zero};
|
use num_traits::{Saturating, Zero};
|
||||||
use std::{
|
use std::{
|
||||||
@@ -44,9 +45,6 @@ const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60);
|
|||||||
/// Delay after we have seen update of best source header at target node,
|
/// Delay after we have seen update of best source header at target node,
|
||||||
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
|
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
|
||||||
const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||||
/// Delay after connection-related error happened before we'll try
|
|
||||||
/// reconnection again.
|
|
||||||
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
|
|
||||||
|
|
||||||
/// Source client trait.
|
/// Source client trait.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -186,7 +184,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut source_go_offline_future,
|
&mut source_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
|
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
source_new_header = source_new_header_future => {
|
source_new_header = source_new_header_future => {
|
||||||
source_client_is_online = process_future_result(
|
source_client_is_online = process_future_result(
|
||||||
@@ -196,7 +194,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut source_go_offline_future,
|
&mut source_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
|
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
source_orphan_header = source_orphan_header_future => {
|
source_orphan_header = source_orphan_header_future => {
|
||||||
source_client_is_online = process_future_result(
|
source_client_is_online = process_future_result(
|
||||||
@@ -206,7 +204,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut source_go_offline_future,
|
&mut source_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
|
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
source_extra = source_extra_future => {
|
source_extra = source_extra_future => {
|
||||||
source_client_is_online = process_future_result(
|
source_client_is_online = process_future_result(
|
||||||
@@ -216,7 +214,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut source_go_offline_future,
|
&mut source_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
|
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
source_completion = source_completion_future => {
|
source_completion = source_completion_future => {
|
||||||
source_client_is_online = process_future_result(
|
source_client_is_online = process_future_result(
|
||||||
@@ -226,7 +224,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut source_go_offline_future,
|
&mut source_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving completion data from {} node", P::SOURCE_NAME),
|
|| format!("Error retrieving completion data from {} node", P::SOURCE_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
source_client = source_go_offline_future => {
|
source_client = source_go_offline_future => {
|
||||||
source_client_is_online = true;
|
source_client_is_online = true;
|
||||||
@@ -277,7 +275,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut target_go_offline_future,
|
&mut target_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
|
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
incomplete_headers_ids = target_incomplete_headers_future => {
|
incomplete_headers_ids = target_incomplete_headers_future => {
|
||||||
target_incomplete_headers_required = false;
|
target_incomplete_headers_required = false;
|
||||||
@@ -289,7 +287,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut target_go_offline_future,
|
&mut target_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME),
|
|| format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
target_existence_status = target_existence_status_future => {
|
target_existence_status = target_existence_status_future => {
|
||||||
target_client_is_online = process_future_result(
|
target_client_is_online = process_future_result(
|
||||||
@@ -301,7 +299,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut target_go_offline_future,
|
&mut target_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
|
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
submitted_headers = target_submit_header_future => {
|
submitted_headers = target_submit_header_future => {
|
||||||
// following line helps Rust understand the type of `submitted_headers` :/
|
// following line helps Rust understand the type of `submitted_headers` :/
|
||||||
@@ -329,7 +327,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut target_go_offline_future,
|
&mut target_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
|
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
|
||||||
);
|
).is_ok();
|
||||||
|
|
||||||
log::debug!(target: "bridge", "Header submit result: {}", submitted_headers_str);
|
log::debug!(target: "bridge", "Header submit result: {}", submitted_headers_str);
|
||||||
|
|
||||||
@@ -350,7 +348,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut target_go_offline_future,
|
&mut target_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error completing headers at {}", P::TARGET_NAME),
|
|| format!("Error completing headers at {}", P::TARGET_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
target_extra_check_result = target_extra_check_future => {
|
target_extra_check_result = target_extra_check_future => {
|
||||||
target_client_is_online = process_future_result(
|
target_client_is_online = process_future_result(
|
||||||
@@ -362,7 +360,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
&mut target_go_offline_future,
|
&mut target_go_offline_future,
|
||||||
|delay| async_std::task::sleep(delay),
|
|delay| async_std::task::sleep(delay),
|
||||||
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
|
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
|
||||||
);
|
).is_ok();
|
||||||
},
|
},
|
||||||
target_client = target_go_offline_future => {
|
target_client = target_go_offline_future => {
|
||||||
target_client_is_online = true;
|
target_client_is_online = true;
|
||||||
@@ -557,62 +555,6 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stream that emits item every `timeout_ms` milliseconds.
|
|
||||||
fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
|
|
||||||
futures::stream::unfold((), move |_| async move {
|
|
||||||
async_std::task::sleep(timeout).await;
|
|
||||||
Some(((), ()))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process result of the future from a client.
|
|
||||||
///
|
|
||||||
/// Returns whether or not the client we're interacting with is online. In this context
|
|
||||||
/// what online means is that the client is currently not handling any other requests
|
|
||||||
/// that we've previously sent.
|
|
||||||
pub(crate) fn process_future_result<TResult, TError, TGoOfflineFuture>(
|
|
||||||
result: Result<TResult, TError>,
|
|
||||||
retry_backoff: &mut ExponentialBackoff,
|
|
||||||
on_success: impl FnOnce(TResult),
|
|
||||||
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
|
|
||||||
go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
|
|
||||||
error_pattern: impl FnOnce() -> String,
|
|
||||||
) -> bool
|
|
||||||
where
|
|
||||||
TError: std::fmt::Debug + MaybeConnectionError,
|
|
||||||
TGoOfflineFuture: FutureExt,
|
|
||||||
{
|
|
||||||
let mut client_is_online = false;
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(result) => {
|
|
||||||
on_success(result);
|
|
||||||
retry_backoff.reset();
|
|
||||||
client_is_online = true
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
let is_connection_error = error.is_connection_error();
|
|
||||||
let retry_delay = if is_connection_error {
|
|
||||||
retry_backoff.reset();
|
|
||||||
CONNECTION_ERROR_DELAY
|
|
||||||
} else {
|
|
||||||
retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY)
|
|
||||||
};
|
|
||||||
go_offline_future.set(go_offline(retry_delay).fuse());
|
|
||||||
|
|
||||||
log::error!(
|
|
||||||
target: "bridge",
|
|
||||||
"{}: {:?}. Retrying in {}s",
|
|
||||||
error_pattern(),
|
|
||||||
error,
|
|
||||||
retry_delay.as_secs_f64(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
client_is_online
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Print synchronization progress.
|
/// Print synchronization progress.
|
||||||
fn print_sync_progress<P: HeadersSyncPipeline>(
|
fn print_sync_progress<P: HeadersSyncPipeline>(
|
||||||
progress_context: (Instant, Option<P::Number>, Option<P::Number>),
|
progress_context: (Instant, Option<P::Number>, Option<P::Number>),
|
||||||
|
|||||||
@@ -16,9 +16,9 @@
|
|||||||
|
|
||||||
#![cfg(test)]
|
#![cfg(test)]
|
||||||
|
|
||||||
use crate::sync_loop::{process_future_result, run, SourceClient, TargetClient};
|
use crate::sync_loop::{run, SourceClient, TargetClient};
|
||||||
use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders};
|
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders};
|
||||||
use crate::utils::{retry_backoff, MaybeConnectionError};
|
use crate::utils::{process_future_result, retry_backoff, HeaderId, MaybeConnectionError};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use backoff::backoff::Backoff;
|
use backoff::backoff::Backoff;
|
||||||
|
|||||||
@@ -14,14 +14,10 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use crate::utils::format_ids;
|
use crate::utils::{format_ids, HeaderId};
|
||||||
|
|
||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
/// Ethereum header Id.
|
|
||||||
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
|
|
||||||
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);
|
|
||||||
|
|
||||||
/// Ethereum header synchronization status.
|
/// Ethereum header synchronization status.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||||
pub enum HeaderStatus {
|
pub enum HeaderStatus {
|
||||||
|
|||||||
@@ -14,12 +14,16 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use backoff::ExponentialBackoff;
|
use backoff::{backoff::Backoff, ExponentialBackoff};
|
||||||
|
use futures::future::FutureExt;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
/// Max delay after connection-unrelated error happened before we'll try the
|
/// Max delay after connection-unrelated error happened before we'll try the
|
||||||
/// same request again.
|
/// same request again.
|
||||||
const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
|
const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
|
/// Delay after connection-related error happened before we'll try
|
||||||
|
/// reconnection again.
|
||||||
|
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
|
/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
@@ -43,6 +47,10 @@ macro_rules! bail_on_arg_error {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ethereum header Id.
|
||||||
|
#[derive(Debug, Default, Clone, Copy, Eq, Hash, PartialEq)]
|
||||||
|
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);
|
||||||
|
|
||||||
/// Error type that can signal connection errors.
|
/// Error type that can signal connection errors.
|
||||||
pub trait MaybeConnectionError {
|
pub trait MaybeConnectionError {
|
||||||
/// Returns true if error (maybe) represents connection error.
|
/// Returns true if error (maybe) represents connection error.
|
||||||
@@ -114,3 +122,102 @@ pub fn format_ids<Id: std::fmt::Debug>(mut ids: impl ExactSizeIterator<Item = Id
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stream that emits item every `timeout_ms` milliseconds.
|
||||||
|
pub fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
|
||||||
|
futures::stream::unfold((), move |_| async move {
|
||||||
|
async_std::task::sleep(timeout).await;
|
||||||
|
Some(((), ()))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Which client has caused error.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||||
|
pub enum FailedClient {
|
||||||
|
/// It is the source client who has caused error.
|
||||||
|
Source,
|
||||||
|
/// It is the target client who has caused error.
|
||||||
|
Target,
|
||||||
|
/// Both clients are failing, or we just encountered some other error that
|
||||||
|
/// should be treated like that.
|
||||||
|
Both,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future process result.
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum ProcessFutureResult {
|
||||||
|
/// Future has been processed successfully.
|
||||||
|
Success,
|
||||||
|
/// Future has failed with non-connection error.
|
||||||
|
Failed,
|
||||||
|
/// Future has failed with connection error.
|
||||||
|
ConnectionFailed,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessFutureResult {
|
||||||
|
/// Returns true if result is Success.
|
||||||
|
pub fn is_ok(self) -> bool {
|
||||||
|
match self {
|
||||||
|
ProcessFutureResult::Success => true,
|
||||||
|
ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns Ok(true) if future has succeeded.
|
||||||
|
/// Returns Ok(false) if future has failed with non-connection error.
|
||||||
|
/// Returns Err if future is `ConnectionFailed`.
|
||||||
|
pub fn fail_if_connection_error(self, failed_client: FailedClient) -> Result<bool, FailedClient> {
|
||||||
|
match self {
|
||||||
|
ProcessFutureResult::Success => Ok(true),
|
||||||
|
ProcessFutureResult::Failed => Ok(false),
|
||||||
|
ProcessFutureResult::ConnectionFailed => Err(failed_client),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process result of the future from a client.
|
||||||
|
pub(crate) fn process_future_result<TResult, TError, TGoOfflineFuture>(
|
||||||
|
result: Result<TResult, TError>,
|
||||||
|
retry_backoff: &mut ExponentialBackoff,
|
||||||
|
on_success: impl FnOnce(TResult),
|
||||||
|
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
|
||||||
|
go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
|
||||||
|
error_pattern: impl FnOnce() -> String,
|
||||||
|
) -> ProcessFutureResult
|
||||||
|
where
|
||||||
|
TError: std::fmt::Debug + MaybeConnectionError,
|
||||||
|
TGoOfflineFuture: FutureExt,
|
||||||
|
{
|
||||||
|
match result {
|
||||||
|
Ok(result) => {
|
||||||
|
on_success(result);
|
||||||
|
retry_backoff.reset();
|
||||||
|
ProcessFutureResult::Success
|
||||||
|
}
|
||||||
|
Err(error) if error.is_connection_error() => {
|
||||||
|
log::error!(
|
||||||
|
target: "bridge",
|
||||||
|
"{}: {:?}. Going to restart",
|
||||||
|
error_pattern(),
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
|
||||||
|
retry_backoff.reset();
|
||||||
|
go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse());
|
||||||
|
ProcessFutureResult::ConnectionFailed
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
|
||||||
|
log::error!(
|
||||||
|
target: "bridge",
|
||||||
|
"{}: {:?}. Retrying in {}",
|
||||||
|
error_pattern(),
|
||||||
|
error,
|
||||||
|
retry_delay.as_secs_f64(),
|
||||||
|
);
|
||||||
|
|
||||||
|
go_offline_future.set(go_offline(retry_delay).fuse());
|
||||||
|
ProcessFutureResult::Failed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user