diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index ae7e630c33..bfcb7d9cb2 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -21,8 +21,8 @@ use crate::ethereum_types::{ use crate::rpc::{Ethereum, EthereumRpc}; use crate::rpc_errors::{EthereumNodeError, RpcError}; use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId}; -use crate::sync_types::{HeaderId, SubmittedHeaders}; -use crate::utils::MaybeConnectionError; +use crate::sync_types::SubmittedHeaders; +use crate::utils::{HeaderId, MaybeConnectionError}; use async_trait::async_trait; use codec::{Decode, Encode}; diff --git a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs index cce91a5519..94ce807067 100644 --- a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs +++ b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs @@ -21,7 +21,7 @@ use crate::instances::BridgeInstance; use crate::rpc::SubstrateRpc; use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient}; use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader, SubstrateHeaderId}; -use crate::sync_types::HeaderId; +use crate::utils::HeaderId; use codec::{Decode, Encode}; use num_traits::Zero; diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index bcaebdd9c5..b045b36356 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -34,7 +34,7 @@ use crate::substrate_client::{ SubmitEthereumExchangeTransactionProof, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams, }; use crate::substrate_types::into_substrate_ethereum_receipt; -use crate::sync_types::HeaderId; +use crate::utils::HeaderId; use async_trait::async_trait; use bp_currency_exchange::MaybeLockFundsTransaction; diff --git a/bridges/relays/ethereum/src/ethereum_types.rs b/bridges/relays/ethereum/src/ethereum_types.rs index 8dcfb00fc9..bdb960c8cd 100644 --- a/bridges/relays/ethereum/src/ethereum_types.rs +++ b/bridges/relays/ethereum/src/ethereum_types.rs @@ -15,7 +15,8 @@ // along with Parity Bridges Common. If not, see . use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts}; -use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use crate::utils::HeaderId; use codec::Encode; pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64}; diff --git a/bridges/relays/ethereum/src/exchange.rs b/bridges/relays/ethereum/src/exchange.rs index 89f30a2dae..e236b9d3cf 100644 --- a/bridges/relays/ethereum/src/exchange.rs +++ b/bridges/relays/ethereum/src/exchange.rs @@ -54,7 +54,7 @@ pub trait SourceBlock { type Transaction: SourceTransaction; /// Return hash of the block. - fn id(&self) -> crate::sync_types::HeaderId; + fn id(&self) -> crate::utils::HeaderId; /// Return block transactions iterator. fn transactions(&self) -> Vec; } @@ -81,7 +81,7 @@ pub type TransactionOf

= <

::Block as SourceBlo pub type TransactionHashOf

= as SourceTransaction>::Hash; /// Header id. -pub type HeaderId

= crate::sync_types::HeaderId, BlockNumberOf

>; +pub type HeaderId

= crate::utils::HeaderId, BlockNumberOf

>; /// Source client API. #[async_trait] @@ -443,7 +443,7 @@ async fn wait_header_finalized( #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::sync_types::HeaderId; + use crate::utils::HeaderId; use parking_lot::Mutex; use std::{ diff --git a/bridges/relays/ethereum/src/headers.rs b/bridges/relays/ethereum/src/headers.rs index 63c8e07351..991d935548 100644 --- a/bridges/relays/ethereum/src/headers.rs +++ b/bridges/relays/ethereum/src/headers.rs @@ -14,7 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::sync_types::{HeaderId, HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use crate::utils::HeaderId; + use linked_hash_map::LinkedHashMap; use num_traits::{One, Zero}; use std::{ @@ -777,7 +779,7 @@ fn queued_incomplete_header( pub(crate) mod tests { use super::*; use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, H256}; - use crate::sync_types::{HeaderId, QueuedHeader}; + use crate::sync_types::QueuedHeader; pub(crate) fn header(number: u64) -> QueuedHeader { QueuedHeader::new(Header { diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index a842c2b559..82dfb08209 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -27,6 +27,10 @@ mod exchange_loop; mod exchange_loop_metrics; mod headers; mod instances; +mod message_lane; +mod message_lane_loop; +mod message_race_delivery; +mod message_race_loop; mod metrics; mod rpc; mod rpc_errors; diff --git a/bridges/relays/ethereum/src/message_lane.rs b/bridges/relays/ethereum/src/message_lane.rs new file mode 100644 index 0000000000..7ca8d2e9fb --- /dev/null +++ b/bridges/relays/ethereum/src/message_lane.rs @@ -0,0 +1,55 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! One-way message lane types. Within single one-way lane we have three 'races' where we try to: +//! +//! 1) relay new messages from source to target node; +//! 2) relay proof-of-receiving from target to source node; +//! 3) relay proof-of-processing from target no source node. + +use crate::utils::HeaderId; + +use std::fmt::Debug; + +/// One-way message lane. +pub trait MessageLane { + /// Name of the messages source. + const SOURCE_NAME: &'static str; + /// Name of the messages target. + const TARGET_NAME: &'static str; + + /// Message nonce type. + type MessageNonce: Clone + Copy + Debug + Default + From + Ord + std::ops::Add; + + /// Messages proof. + type MessagesProof: Clone; + + /// Number of the source header. + type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq; + /// Hash of the source header. + type SourceHeaderHash: Clone + Debug + Default + PartialEq; + + /// Number of the target header. + type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq; + /// Hash of the target header. + type TargetHeaderHash: Clone + Debug + Default + PartialEq; +} + +/// Source header id within given one-way message lane. +pub type SourceHeaderIdOf

= HeaderId<

::SourceHeaderHash,

::SourceHeaderNumber>; + +/// Target header id within given one-way message lane. +pub type TargetHeaderIdOf

= HeaderId<

::TargetHeaderHash,

::TargetHeaderNumber>; diff --git a/bridges/relays/ethereum/src/message_lane_loop.rs b/bridges/relays/ethereum/src/message_lane_loop.rs new file mode 100644 index 0000000000..03cc2929b3 --- /dev/null +++ b/bridges/relays/ethereum/src/message_lane_loop.rs @@ -0,0 +1,591 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Message delivery loop. Designed to work with message-lane pallet. +//! +//! Single relay instance delivers messages of single lane in single direction. +//! To serve two-way lane, you would need two instances of relay. +//! To serve N two-way lanes, you would need N*2 instances of relay. +//! +//! Please keep in mind that the best header in this file is actually best +//! finalized header. I.e. when talking about headers in lane context, we +//! only care about finalized headers. + +// Until there'll be actual message-lane in the runtime. +#![allow(dead_code)] + +use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; +use crate::message_race_delivery::run as run_message_delivery_race; +use crate::utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError}; + +use async_trait::async_trait; +use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; +use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; + +/// Source client trait. +#[async_trait(?Send)] +pub trait SourceClient: Clone { + /// Type of error this clients returns. + type Error: std::fmt::Debug + MaybeConnectionError; + + /// Try to reconnect to source node. + fn reconnect(self) -> Self; + + /// Returns state of the client. + async fn state(&self) -> Result, Self::Error>; + + /// Get nonce of instance of latest generated message. + async fn latest_generated_nonce( + &self, + id: SourceHeaderIdOf

, + ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error>; + + /// Prove messages in inclusive range [begin; end]. + async fn prove_messages( + &self, + id: SourceHeaderIdOf

, + nonces: RangeInclusive, + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error>; +} + +/// Target client trait. +#[async_trait(?Send)] +pub trait TargetClient: Clone { + /// Type of error this clients returns. + type Error: std::fmt::Debug + MaybeConnectionError; + + /// Try to reconnect to source node. + fn reconnect(self) -> Self; + + /// Returns state of the client. + async fn state(&self) -> Result, Self::Error>; + + /// Get nonce of latest message, which receival has been confirmed. + async fn latest_received_nonce( + &self, + id: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error>; + + /// Submit messages proof. + async fn submit_messages_proof( + &self, + generated_at_header: SourceHeaderIdOf

, + nonces: RangeInclusive, + proof: P::MessagesProof, + ) -> Result, Self::Error>; +} + +/// State of the client. +#[derive(Clone, Debug, Default, PartialEq)] +pub struct ClientState { + /// Best header id of this chain. + pub best_self: SelfHeaderId, + /// Best header id of the peer chain. + pub best_peer: PeerHeaderId, +} + +/// State of source client in one-way message lane. +pub type SourceClientState

= ClientState, TargetHeaderIdOf

>; + +/// State of target client in one-way message lane. +pub type TargetClientState

= ClientState, SourceHeaderIdOf

>; + +/// Both clients state. +#[derive(Debug, Default)] +pub struct ClientsState { + /// Source client state. + pub source: Option>, + /// Target client state. + pub target: Option>, +} + +/// Run message lane service loop. +pub fn run( + mut source_client: impl SourceClient

, + source_tick: Duration, + mut target_client: impl TargetClient

, + target_tick: Duration, + reconnect_delay: Duration, + stall_timeout: Duration, + exit_signal: impl Future, +) { + let mut local_pool = futures::executor::LocalPool::new(); + let exit_signal = exit_signal.shared(); + + local_pool.run_until(async move { + loop { + let result = run_until_connection_lost( + source_client.clone(), + source_tick, + target_client.clone(), + target_tick, + stall_timeout, + exit_signal.clone(), + ) + .await; + + match result { + Ok(()) => break, + Err(failed_client) => { + async_std::task::sleep(reconnect_delay).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + source_client = source_client.reconnect(); + } + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + target_client = target_client.reconnect(); + } + } + } + + log::debug!( + target: "bridge", + "Restarting lane {} -> {}", + P::SOURCE_NAME, + P::TARGET_NAME, + ); + } + }); +} + +/// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. +async fn run_until_connection_lost, TC: TargetClient

>( + source_client: SC, + source_tick: Duration, + target_client: TC, + target_tick: Duration, + stall_timeout: Duration, + exit_signal: impl Future, +) -> Result<(), FailedClient> { + let mut source_retry_backoff = retry_backoff(); + let mut source_client_is_online = false; + let mut source_state_required = true; + let source_state = source_client.state().fuse(); + let source_go_offline_future = futures::future::Fuse::terminated(); + let source_tick_stream = interval(source_tick).fuse(); + + let mut target_retry_backoff = retry_backoff(); + let mut target_client_is_online = false; + let mut target_state_required = true; + let target_state = target_client.state().fuse(); + let target_go_offline_future = futures::future::Fuse::terminated(); + let target_tick_stream = interval(target_tick).fuse(); + + let ( + (delivery_source_state_sender, delivery_source_state_receiver), + (delivery_target_state_sender, delivery_target_state_receiver), + ) = (unbounded(), unbounded()); + let delivery_race_loop = run_message_delivery_race( + source_client.clone(), + delivery_source_state_receiver, + target_client.clone(), + delivery_target_state_receiver, + stall_timeout, + ) + .fuse(); + + let exit_signal = exit_signal.fuse(); + + futures::pin_mut!( + source_state, + source_go_offline_future, + source_tick_stream, + target_state, + target_go_offline_future, + target_tick_stream, + delivery_race_loop, + exit_signal + ); + + loop { + futures::select! { + new_source_state = source_state => { + source_state_required = false; + + source_client_is_online = process_future_result( + new_source_state, + &mut source_retry_backoff, + |new_source_state| { + log::debug!( + target: "bridge", + "Received state from {} node: {:?}", + P::SOURCE_NAME, + new_source_state, + ); + let _ = delivery_source_state_sender.unbounded_send(new_source_state); + }, + &mut source_go_offline_future, + |delay| async_std::task::sleep(delay), + || format!("Error retrieving state from {} node", P::SOURCE_NAME), + ).fail_if_connection_error(FailedClient::Source)?; + }, + _ = source_go_offline_future => { + source_client_is_online = true; + }, + _ = source_tick_stream.next() => { + source_state_required = true; + }, + new_target_state = target_state => { + target_state_required = false; + + target_client_is_online = process_future_result( + new_target_state, + &mut target_retry_backoff, + |new_target_state| { + log::debug!( + target: "bridge", + "Received state from {} node: {:?}", + P::TARGET_NAME, + new_target_state, + ); + let _ = delivery_target_state_sender.unbounded_send(new_target_state); + }, + &mut target_go_offline_future, + |delay| async_std::task::sleep(delay), + || format!("Error retrieving state from {} node", P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + }, + _ = target_go_offline_future => { + target_client_is_online = true; + }, + _ = target_tick_stream.next() => { + target_state_required = true; + }, + + delivery_error = delivery_race_loop => { + match delivery_error { + Ok(_) => unreachable!("only ends with error; qed"), + Err(err) => return Err(err), + } + }, + + () = exit_signal => { + return Ok(()); + } + } + + if source_client_is_online && source_state_required { + log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME); + source_state.set(source_client.state().fuse()); + source_client_is_online = false; + } + + if target_client_is_online && target_state_required { + log::debug!(target: "bridge", "Asking {} node about its state", P::TARGET_NAME); + target_state.set(target_client.state().fuse()); + target_client_is_online = false; + } + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::utils::HeaderId; + use futures::stream::StreamExt; + use parking_lot::Mutex; + use std::sync::Arc; + + pub fn header_id(number: TestSourceHeaderNumber) -> HeaderId { + HeaderId(number, number) + } + + pub type TestMessageNonce = u64; + pub type TestMessagesProof = RangeInclusive; + + pub type TestSourceHeaderNumber = u64; + pub type TestSourceHeaderHash = u64; + + pub type TestTargetHeaderNumber = u64; + pub type TestTargetHeaderHash = u64; + + #[derive(Debug)] + pub enum TestError { + Logic, + Connection, + } + + impl MaybeConnectionError for TestError { + fn is_connection_error(&self) -> bool { + match *self { + TestError::Logic => false, + TestError::Connection => true, + } + } + } + + pub struct TestMessageLane; + + impl MessageLane for TestMessageLane { + const SOURCE_NAME: &'static str = "TestSource"; + const TARGET_NAME: &'static str = "TestTarget"; + + type MessageNonce = TestMessageNonce; + type MessagesProof = TestMessagesProof; + + type SourceHeaderNumber = TestSourceHeaderNumber; + type SourceHeaderHash = TestSourceHeaderHash; + + type TargetHeaderNumber = TestTargetHeaderNumber; + type TargetHeaderHash = TestTargetHeaderHash; + } + + #[derive(Debug, Default, Clone)] + pub struct TestClientData { + is_source_fails: bool, + is_source_reconnected: bool, + source_state: SourceClientState, + source_latest_generated_nonce: TestMessageNonce, + is_target_fails: bool, + is_target_reconnected: bool, + target_state: SourceClientState, + target_latest_received_nonce: TestMessageNonce, + submitted_messages_proofs: Vec, + } + + #[derive(Clone)] + pub struct TestSourceClient { + data: Arc>, + tick: Arc, + } + + #[async_trait(?Send)] + impl SourceClient for TestSourceClient { + type Error = TestError; + + fn reconnect(self) -> Self { + { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + data.is_source_reconnected = true; + } + self + } + + async fn state(&self) -> Result, Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + if data.is_source_fails { + return Err(TestError::Connection); + } + Ok(data.source_state.clone()) + } + + /// Get nonce of instance of latest generated message. + async fn latest_generated_nonce( + &self, + id: SourceHeaderIdOf, + ) -> Result<(SourceHeaderIdOf, TestMessageNonce), Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + if data.is_source_fails { + return Err(TestError::Connection); + } + Ok((id, data.source_latest_generated_nonce)) + } + + async fn prove_messages( + &self, + id: SourceHeaderIdOf, + nonces: RangeInclusive, + ) -> Result< + ( + SourceHeaderIdOf, + RangeInclusive, + TestMessagesProof, + ), + Self::Error, + > { + Ok((id, nonces.clone(), nonces)) + } + } + + #[derive(Clone)] + pub struct TestTargetClient { + data: Arc>, + tick: Arc, + } + + #[async_trait(?Send)] + impl TargetClient for TestTargetClient { + type Error = TestError; + + fn reconnect(self) -> Self { + { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + data.is_target_reconnected = true; + } + self + } + + async fn state(&self) -> Result, Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + if data.is_target_fails { + return Err(TestError::Connection); + } + Ok(data.target_state.clone()) + } + + async fn latest_received_nonce( + &self, + id: TargetHeaderIdOf, + ) -> Result<(TargetHeaderIdOf, TestMessageNonce), Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + if data.is_target_fails { + return Err(TestError::Connection); + } + Ok((id, data.target_latest_received_nonce)) + } + + /// Submit messages proof. + async fn submit_messages_proof( + &self, + _generated_at_header: SourceHeaderIdOf, + nonces: RangeInclusive, + proof: TestMessagesProof, + ) -> Result, Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + if data.is_target_fails { + return Err(TestError::Connection); + } + data.target_state.best_self = + HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1); + data.target_latest_received_nonce = *proof.end(); + data.submitted_messages_proofs.push(proof); + Ok(nonces) + } + } + + fn run_loop_test( + data: TestClientData, + source_tick: Arc, + target_tick: Arc, + exit_signal: impl Future, + ) -> TestClientData { + async_std::task::block_on(async { + let data = Arc::new(Mutex::new(data)); + + let source_client = TestSourceClient { + data: data.clone(), + tick: source_tick, + }; + let target_client = TestTargetClient { + data: data.clone(), + tick: target_tick, + }; + run( + source_client, + Duration::from_millis(100), + target_client, + Duration::from_millis(100), + Duration::from_millis(0), + Duration::from_secs(60), + exit_signal, + ); + + let result = data.lock().clone(); + result + }) + } + + #[test] + fn message_lane_loop_is_able_to_recover_from_connection_errors() { + // with this configuration, source client will return Err, making source client + // reconnect. Then the target client will fail with Err + reconnect. Then we finally + // able to deliver messages. + let (exit_sender, exit_receiver) = unbounded(); + let result = run_loop_test( + TestClientData { + is_source_fails: true, + source_state: ClientState { + best_self: HeaderId(0, 0), + best_peer: HeaderId(0, 0), + }, + source_latest_generated_nonce: 1, + target_state: ClientState { + best_self: HeaderId(0, 0), + best_peer: HeaderId(0, 0), + }, + target_latest_received_nonce: 0, + ..Default::default() + }, + Arc::new(|data: &mut TestClientData| { + if data.is_source_reconnected { + data.is_source_fails = false; + data.is_target_fails = true; + } + }), + Arc::new(move |data: &mut TestClientData| { + if data.is_target_reconnected { + data.is_target_fails = false; + } + if data.target_state.best_peer.0 < 10 { + data.target_state.best_peer = + HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1); + } + if !data.submitted_messages_proofs.is_empty() { + exit_sender.unbounded_send(()).unwrap(); + } + }), + exit_receiver.into_future().map(|(_, _)| ()), + ); + + assert_eq!(result.submitted_messages_proofs, vec![1..=1],); + } + + #[test] + fn message_lane_loop_works() { + // with this configuration, target client must first sync headers [1; 10] and + // then submit proof-of-messages [0; 10] at once + let (exit_sender, exit_receiver) = unbounded(); + let result = run_loop_test( + TestClientData { + source_state: ClientState { + best_self: HeaderId(10, 10), + best_peer: HeaderId(0, 0), + }, + source_latest_generated_nonce: 10, + target_state: ClientState { + best_self: HeaderId(0, 0), + best_peer: HeaderId(0, 0), + }, + target_latest_received_nonce: 0, + ..Default::default() + }, + Arc::new(|_: &mut TestClientData| {}), + Arc::new(move |data: &mut TestClientData| { + if data.target_state.best_peer.0 < 10 { + data.target_state.best_peer = + HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1); + } + if data + .submitted_messages_proofs + .last() + .map(|last| *last.end() == 10) + .unwrap_or(false) + { + exit_sender.unbounded_send(()).unwrap(); + } + }), + exit_receiver.into_future().map(|(_, _)| ()), + ); + + assert_eq!(result.submitted_messages_proofs, vec![1..=4, 5..=8, 9..=10],); + } +} diff --git a/bridges/relays/ethereum/src/message_race_delivery.rs b/bridges/relays/ethereum/src/message_race_delivery.rs new file mode 100644 index 0000000000..a003990978 --- /dev/null +++ b/bridges/relays/ethereum/src/message_race_delivery.rs @@ -0,0 +1,385 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Message delivery race delivers proof-of-messages from lane.source to lane.target. + +use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; +use crate::message_lane_loop::{ + SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, + TargetClientState, +}; +use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient}; +use crate::utils::FailedClient; + +use async_trait::async_trait; +use futures::stream::FusedStream; +use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; + +/// Maximal number of messages to relay in single transaction. +const MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX: u32 = 4; + +/// Run message delivery race. +pub async fn run( + source_client: impl MessageLaneSourceClient

, + source_state_updates: impl FusedStream>, + target_client: impl MessageLaneTargetClient

, + target_state_updates: impl FusedStream>, + stall_timeout: Duration, +) -> Result<(), FailedClient> { + crate::message_race_loop::run( + MessageDeliveryRaceSource { + client: source_client, + _phantom: Default::default(), + }, + source_state_updates, + MessageDeliveryRaceTarget { + client: target_client, + _phantom: Default::default(), + }, + target_state_updates, + stall_timeout, + MessageDeliveryStrategy::

::default(), + ) + .await +} + +/// Message delivery race. +struct MessageDeliveryRace

(std::marker::PhantomData

); + +impl MessageRace for MessageDeliveryRace

{ + type SourceHeaderId = SourceHeaderIdOf

; + type TargetHeaderId = TargetHeaderIdOf

; + + type MessageNonce = P::MessageNonce; + type Proof = P::MessagesProof; + + fn source_name() -> String { + format!("{}::MessagesDelivery", P::SOURCE_NAME) + } + + fn target_name() -> String { + format!("{}::MessagesDelivery", P::TARGET_NAME) + } +} + +/// Message delivery race source, which is a source of the lane. +struct MessageDeliveryRaceSource { + client: C, + _phantom: PhantomData

, +} + +#[async_trait(?Send)] +impl SourceClient> for MessageDeliveryRaceSource +where + P: MessageLane, + C: MessageLaneSourceClient

, +{ + type Error = C::Error; + + async fn latest_nonce( + &self, + at_block: SourceHeaderIdOf

, + ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { + self.client.latest_generated_nonce(at_block).await + } + + async fn generate_proof( + &self, + at_block: SourceHeaderIdOf

, + nonces: RangeInclusive, + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { + self.client.prove_messages(at_block, nonces).await + } +} + +/// Message delivery race target, which is a target of the lane. +struct MessageDeliveryRaceTarget { + client: C, + _phantom: PhantomData

, +} + +#[async_trait(?Send)] +impl TargetClient> for MessageDeliveryRaceTarget +where + P: MessageLane, + C: MessageLaneTargetClient

, +{ + type Error = C::Error; + + async fn latest_nonce( + &self, + at_block: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { + self.client.latest_received_nonce(at_block).await + } + + async fn submit_proof( + &self, + generated_at_block: SourceHeaderIdOf

, + nonces: RangeInclusive, + proof: P::MessagesProof, + ) -> Result, Self::Error> { + self.client + .submit_messages_proof(generated_at_block, nonces, proof) + .await + } +} + +/// Message delivery strategy. +struct MessageDeliveryStrategy { + /// All queued nonces. + source_queue: VecDeque<(SourceHeaderIdOf

, P::MessageNonce)>, + /// Best nonce known to target node. + target_nonce: P::MessageNonce, + /// Unused generic types dump. + _phantom: PhantomData

, +} + +impl Default for MessageDeliveryStrategy

{ + fn default() -> Self { + MessageDeliveryStrategy { + source_queue: VecDeque::new(), + target_nonce: Default::default(), + _phantom: Default::default(), + } + } +} + +impl RaceStrategy, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof> + for MessageDeliveryStrategy

+{ + fn is_empty(&self) -> bool { + self.source_queue.is_empty() + } + + fn source_nonce_updated(&mut self, at_block: SourceHeaderIdOf

, nonce: P::MessageNonce) { + if nonce <= self.target_nonce { + return; + } + + match self.source_queue.back() { + Some((_, prev_nonce)) if *prev_nonce < nonce => (), + Some(_) => return, + None => (), + } + + self.source_queue.push_back((at_block, nonce)) + } + + fn target_nonce_updated( + &mut self, + nonce: P::MessageNonce, + race_state: &mut RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, + ) { + if nonce < self.target_nonce { + return; + } + + while let Some(true) = self + .source_queue + .front() + .map(|(_, source_nonce)| *source_nonce <= nonce) + { + self.source_queue.pop_front(); + } + + let need_to_select_new_nonces = race_state + .nonces_to_submit + .as_ref() + .map(|(_, nonces, _)| *nonces.end() <= nonce) + .unwrap_or(false); + if need_to_select_new_nonces { + race_state.nonces_to_submit = None; + } + + let need_new_nonces_to_submit = race_state + .nonces_submitted + .as_ref() + .map(|nonces| *nonces.end() <= nonce) + .unwrap_or(false); + if need_new_nonces_to_submit { + race_state.nonces_submitted = None; + } + + self.target_nonce = nonce; + } + + fn select_nonces_to_deliver( + &mut self, + race_state: &RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, + ) -> Option> { + // if we have already selected nonces that we want to submit, do nothing + if race_state.nonces_to_submit.is_some() { + return None; + } + + // if we already submitted some nonces, do nothing + if race_state.nonces_submitted.is_some() { + return None; + } + + // 1) we want to deliver all nonces, starting from `target_nonce + 1` + // 2) we want to deliver at most `MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX` nonces in this batch + // 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized + // by target client + let nonces_begin = self.target_nonce + 1.into(); + let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; + let mut nonces_end = None; + for i in 0..MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX { + let nonce = nonces_begin + i.into(); + + // if queue is empty, we don't need to prove anything + let (first_queued_at, first_queued_nonce) = match self.source_queue.front() { + Some((first_queued_at, first_queued_nonce)) => (first_queued_at.clone(), *first_queued_nonce), + None => break, + }; + + // if header that has queued the message is not yet finalized at bridged chain, + // we can't prove anything + if first_queued_at.0 > best_header_at_target.0 { + break; + } + + // ok, we may deliver this nonce + nonces_end = Some(nonce); + + // probably remove it from the queue? + if nonce == first_queued_nonce { + self.source_queue.pop_front(); + } + } + + nonces_end.map(|nonces_end| RangeInclusive::new(nonces_begin, nonces_end)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message_lane_loop::{ + tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof}, + ClientState, + }; + + #[test] + fn strategy_is_empty_works() { + let mut strategy = MessageDeliveryStrategy::::default(); + assert_eq!(strategy.is_empty(), true); + strategy.source_nonce_updated(header_id(1), 1); + assert_eq!(strategy.is_empty(), false); + } + + #[test] + fn source_nonce_is_never_lower_than_known_target_nonce() { + let mut strategy = MessageDeliveryStrategy::::default(); + strategy.target_nonce_updated(10, &mut Default::default()); + strategy.source_nonce_updated(header_id(1), 5); + assert_eq!(strategy.source_queue, vec![]); + } + + #[test] + fn source_nonce_is_never_lower_than_latest_known_source_nonce() { + let mut strategy = MessageDeliveryStrategy::::default(); + strategy.source_nonce_updated(header_id(1), 5); + strategy.source_nonce_updated(header_id(2), 3); + strategy.source_nonce_updated(header_id(2), 5); + assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]); + } + + #[test] + fn target_nonce_is_never_lower_than_latest_known_target_nonce() { + let mut strategy = MessageDeliveryStrategy::::default(); + strategy.target_nonce_updated(10, &mut Default::default()); + strategy.target_nonce_updated(5, &mut Default::default()); + assert_eq!(strategy.target_nonce, 10); + } + + #[test] + fn updated_target_nonce_removes_queued_entries() { + let mut strategy = MessageDeliveryStrategy::::default(); + strategy.source_nonce_updated(header_id(1), 5); + strategy.source_nonce_updated(header_id(2), 10); + strategy.source_nonce_updated(header_id(3), 15); + strategy.source_nonce_updated(header_id(4), 20); + strategy.target_nonce_updated(15, &mut Default::default()); + assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]); + } + + #[test] + fn selected_nonces_are_dropped_on_target_nonce_update() { + let mut state = RaceState::default(); + let mut strategy = MessageDeliveryStrategy::::default(); + state.nonces_to_submit = Some((header_id(1), 5..=10, 5..=10)); + strategy.target_nonce_updated(7, &mut state); + assert!(state.nonces_to_submit.is_some()); + strategy.target_nonce_updated(10, &mut state); + assert!(state.nonces_to_submit.is_none()); + } + + #[test] + fn submitted_nonces_are_dropped_on_target_nonce_update() { + let mut state = RaceState::default(); + let mut strategy = MessageDeliveryStrategy::::default(); + state.nonces_submitted = Some(5..=10); + strategy.target_nonce_updated(7, &mut state); + assert!(state.nonces_submitted.is_some()); + strategy.target_nonce_updated(10, &mut state); + assert!(state.nonces_submitted.is_none()); + } + + #[test] + fn nothing_is_selected_if_something_is_already_selected() { + let mut state = RaceState::default(); + let mut strategy = MessageDeliveryStrategy::::default(); + state.nonces_to_submit = Some((header_id(1), 1..=10, 1..=10)); + strategy.source_nonce_updated(header_id(1), 10); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } + + #[test] + fn nothing_is_selected_if_something_is_already_submitted() { + let mut state = RaceState::default(); + let mut strategy = MessageDeliveryStrategy::::default(); + state.nonces_submitted = Some(1..=10); + strategy.source_nonce_updated(header_id(1), 10); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } + + #[test] + fn select_nonces_to_deliver_works() { + let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default(); + let mut strategy = MessageDeliveryStrategy::::default(); + strategy.source_nonce_updated(header_id(1), 1); + strategy.source_nonce_updated(header_id(2), 2); + strategy.source_nonce_updated(header_id(3), 6); + strategy.source_nonce_updated(header_id(5), 8); + + state.target_state = Some(ClientState { + best_self: header_id(0), + best_peer: header_id(4), + }); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some(1..=4)); + strategy.target_nonce_updated(4, &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some(5..=6)); + strategy.target_nonce_updated(6, &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + + state.target_state = Some(ClientState { + best_self: header_id(0), + best_peer: header_id(5), + }); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some(7..=8)); + strategy.target_nonce_updated(8, &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } +} diff --git a/bridges/relays/ethereum/src/message_race_loop.rs b/bridges/relays/ethereum/src/message_race_loop.rs new file mode 100644 index 0000000000..9d1657edb4 --- /dev/null +++ b/bridges/relays/ethereum/src/message_race_loop.rs @@ -0,0 +1,359 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Loop that is serving single race within message lane. This could be +//! message delivery race, receiving confirmations race or processing +//! confirmations race. +//! +//! The idea of the race is simple - we have `nonce`-s on source and target +//! nodes. We're trying to prove that the source node has this nonce (and +//! associated data - like messages, lane state, etc) to the target node by +//! generating and submitting proof. + +// Until there'll be actual message-lane in the runtime. +#![allow(dead_code)] + +use crate::message_lane_loop::ClientState; +use crate::utils::{process_future_result, retry_backoff, FailedClient, MaybeConnectionError}; + +use async_trait::async_trait; +use futures::{ + future::FutureExt, + stream::{FusedStream, StreamExt}, +}; +use std::{ + fmt::Debug, + ops::RangeInclusive, + time::{Duration, Instant}, +}; + +/// One of races within lane. +pub trait MessageRace { + /// Header id of the race source. + type SourceHeaderId: Debug + Clone + PartialEq; + /// Header id of the race source. + type TargetHeaderId: Debug + Clone + PartialEq; + + /// Message nonce used in the race. + type MessageNonce: Debug + Clone; + /// Proof that is generated and delivered in this race. + type Proof: Clone; + + /// Name of the race source. + fn source_name() -> String; + /// Name of the race target. + fn target_name() -> String; +} + +/// State of race source client. +type SourceClientState

= ClientState<

::SourceHeaderId,

::TargetHeaderId>; + +/// State of race target client. +type TargetClientState

= ClientState<

::TargetHeaderId,

::SourceHeaderId>; + +/// One of message lane clients, which is source client for the race. +#[async_trait(?Send)] +pub trait SourceClient { + /// Type of error this clients returns. + type Error: std::fmt::Debug + MaybeConnectionError; + + /// Return latest nonce that is known to the source client. + async fn latest_nonce( + &self, + at_block: P::SourceHeaderId, + ) -> Result<(P::SourceHeaderId, P::MessageNonce), Self::Error>; + /// Generate proof for delivering to the target client. + async fn generate_proof( + &self, + at_block: P::SourceHeaderId, + nonces: RangeInclusive, + ) -> Result<(P::SourceHeaderId, RangeInclusive, P::Proof), Self::Error>; +} + +/// One of message lane clients, which is target client for the race. +#[async_trait(?Send)] +pub trait TargetClient { + /// Type of error this clients returns. + type Error: std::fmt::Debug + MaybeConnectionError; + + /// Return latest nonce that is known to the target client. + async fn latest_nonce( + &self, + at_block: P::TargetHeaderId, + ) -> Result<(P::TargetHeaderId, P::MessageNonce), Self::Error>; + /// Submit proof to the target client. + async fn submit_proof( + &self, + generated_at_block: P::SourceHeaderId, + nonces: RangeInclusive, + proof: P::Proof, + ) -> Result, Self::Error>; +} + +/// Race strategy. +pub trait RaceStrategy { + /// Should return true if nothing has to be synced. + fn is_empty(&self) -> bool; + /// Called when latest nonce is updated at source node of the race. + fn source_nonce_updated(&mut self, at_block: SourceHeaderId, nonce: MessageNonce); + /// Called when latest nonce is updated at target node of the race. + fn target_nonce_updated( + &mut self, + nonce: MessageNonce, + race_state: &mut RaceState, + ); + /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated + /// data) from source to target node. + fn select_nonces_to_deliver( + &mut self, + race_state: &RaceState, + ) -> Option>; +} + +/// State of the race. +pub struct RaceState { + /// Source state, if known. + pub source_state: Option>, + /// Target state, if known. + pub target_state: Option>, + /// Range of nonces that we have selected to submit. + pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive, Proof)>, + /// Range of nonces that is currently submitted. + pub nonces_submitted: Option>, +} + +/// Run race loop until connection with target or source node is lost. +pub async fn run( + race_source: impl SourceClient

, + race_source_updated: impl FusedStream>, + race_target: impl TargetClient

, + race_target_updated: impl FusedStream>, + stall_timeout: Duration, + mut strategy: impl RaceStrategy, +) -> Result<(), FailedClient> { + let mut race_state = RaceState::default(); + let mut stall_countdown = Instant::now(); + + let mut source_retry_backoff = retry_backoff(); + let mut source_client_is_online = true; + let mut source_latest_nonce_required = false; + let source_latest_nonce = futures::future::Fuse::terminated(); + let source_generate_proof = futures::future::Fuse::terminated(); + let source_go_offline_future = futures::future::Fuse::terminated(); + + let mut target_retry_backoff = retry_backoff(); + let mut target_client_is_online = true; + let mut target_latest_nonce_required = false; + let target_latest_nonce = futures::future::Fuse::terminated(); + let target_submit_proof = futures::future::Fuse::terminated(); + let target_go_offline_future = futures::future::Fuse::terminated(); + + futures::pin_mut!( + race_source_updated, + source_latest_nonce, + source_generate_proof, + source_go_offline_future, + race_target_updated, + target_latest_nonce, + target_submit_proof, + target_go_offline_future, + ); + + loop { + futures::select! { + // when headers ids are updated + source_state = race_source_updated.next() => { + if let Some(source_state) = source_state { + if race_state.source_state.as_ref() != Some(&source_state) { + source_latest_nonce_required = true; + race_state.source_state = Some(source_state); + } + } + }, + target_state = race_target_updated.next() => { + if let Some(target_state) = target_state { + if race_state.target_state.as_ref() != Some(&target_state) { + target_latest_nonce_required = true; + race_state.target_state = Some(target_state); + } + } + }, + + // when nonces are updated + latest_nonce = source_latest_nonce => { + source_latest_nonce_required = false; + + source_client_is_online = process_future_result( + latest_nonce, + &mut source_retry_backoff, + |(at_block, latest_nonce)| { + log::debug!( + target: "bridge", + "Received latest nonce from {}: {:?}", + P::source_name(), + latest_nonce, + ); + + strategy.source_nonce_updated(at_block, latest_nonce); + }, + &mut source_go_offline_future, + |delay| async_std::task::sleep(delay), + || format!("Error retrieving latest nonce from {}", P::source_name()), + ).fail_if_connection_error(FailedClient::Source)?; + }, + latest_nonce = target_latest_nonce => { + target_latest_nonce_required = false; + + target_client_is_online = process_future_result( + latest_nonce, + &mut target_retry_backoff, + |(_, latest_nonce)| { + log::debug!( + target: "bridge", + "Received latest nonce from {}: {:?}", + P::target_name(), + latest_nonce, + ); + + strategy.target_nonce_updated(latest_nonce, &mut race_state); + }, + &mut target_go_offline_future, + |delay| async_std::task::sleep(delay), + || format!("Error retrieving latest nonce from {}", P::target_name()), + ).fail_if_connection_error(FailedClient::Target)?; + }, + + // proof generation and submission + proof = source_generate_proof => { + source_client_is_online = process_future_result( + proof, + &mut source_retry_backoff, + |(at_block, nonces_range, proof)| { + log::debug!( + target: "bridge", + "Received proof for nonces in range {:?} from {}", + nonces_range, + P::source_name(), + ); + + race_state.nonces_to_submit = Some((at_block, nonces_range, proof)); + }, + &mut source_go_offline_future, + |delay| async_std::task::sleep(delay), + || format!("Error generating proof at {}", P::source_name()), + ).fail_if_connection_error(FailedClient::Source)?; + }, + proof_submit_result = target_submit_proof => { + target_client_is_online = process_future_result( + proof_submit_result, + &mut target_retry_backoff, + |nonces_range| { + log::debug!( + target: "bridge", + "Successfully submitted proof of nonces {:?} to {}", + nonces_range, + P::target_name(), + ); + + race_state.nonces_to_submit = None; + race_state.nonces_submitted = Some(nonces_range); + }, + &mut target_go_offline_future, + |delay| async_std::task::sleep(delay), + || format!("Error submitting proof {}", P::target_name()), + ).fail_if_connection_error(FailedClient::Target)?; + } + } + + if stall_countdown.elapsed() > stall_timeout { + return Err(FailedClient::Both); + } else if race_state.nonces_to_submit.is_none() && race_state.nonces_submitted.is_none() && strategy.is_empty() + { + stall_countdown = Instant::now(); + } + + if source_client_is_online { + source_client_is_online = false; + + let nonces_to_deliver = race_state.source_state.as_ref().and_then(|source_state| { + strategy + .select_nonces_to_deliver(&race_state) + .map(|nonces_range| (source_state.best_self.clone(), nonces_range)) + }); + + if let Some((at_block, nonces_range)) = nonces_to_deliver { + log::debug!( + target: "bridge", + "Asking {} to prove nonces in range {:?}", + P::source_name(), + nonces_range, + ); + source_generate_proof.set(race_source.generate_proof(at_block, nonces_range).fuse()); + } else if source_latest_nonce_required { + log::debug!(target: "bridge", "Asking {} about latest generated message nonce", P::source_name()); + let at_block = race_state + .source_state + .as_ref() + .expect("source_latest_nonce_required is only true when source_state is Some; qed") + .best_self + .clone(); + source_latest_nonce.set(race_source.latest_nonce(at_block).fuse()); + } else { + source_client_is_online = true; + } + } + + if target_client_is_online { + target_client_is_online = false; + + if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() { + log::debug!( + target: "bridge", + "Going to submit proof of messages in range {:?} to {} node", + nonces_range, + P::target_name(), + ); + target_submit_proof.set( + race_target + .submit_proof(at_block.clone(), nonces_range.clone(), proof.clone()) + .fuse(), + ); + } + if target_latest_nonce_required { + log::debug!(target: "bridge", "Asking {} about latest nonce", P::target_name()); + let at_block = race_state + .target_state + .as_ref() + .expect("target_latest_nonce_required is only true when target_state is Some; qed") + .best_self + .clone(); + target_latest_nonce.set(race_target.latest_nonce(at_block).fuse()); + } else { + target_client_is_online = true; + } + } + } +} + +impl Default + for RaceState +{ + fn default() -> Self { + RaceState { + source_state: None, + target_state: None, + nonces_to_submit: None, + nonces_submitted: None, + } + } +} diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index c2bc56ac26..7004dd8b14 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -19,7 +19,8 @@ use crate::instances::BridgeInstance; use crate::rpc::{Substrate, SubstrateRpc}; use crate::rpc_errors::RpcError; use crate::substrate_types::{Hash, Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock}; -use crate::sync_types::{HeaderId, SubmittedHeaders}; +use crate::sync_types::SubmittedHeaders; +use crate::utils::HeaderId; use async_trait::async_trait; use bp_eth_poa::Header as SubstrateEthereumHeader; diff --git a/bridges/relays/ethereum/src/substrate_types.rs b/bridges/relays/ethereum/src/substrate_types.rs index d03e8a86fd..fdd2880230 100644 --- a/bridges/relays/ethereum/src/substrate_types.rs +++ b/bridges/relays/ethereum/src/substrate_types.rs @@ -17,7 +17,9 @@ use crate::ethereum_types::{ Header as EthereumHeader, Receipt as EthereumReceipt, HEADER_ID_PROOF as ETHEREUM_HEADER_ID_PROOF, }; -use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use crate::utils::HeaderId; + use codec::Encode; pub use bp_eth_poa::{ diff --git a/bridges/relays/ethereum/src/sync.rs b/bridges/relays/ethereum/src/sync.rs index 2538b04c32..e86e9be4ae 100644 --- a/bridges/relays/ethereum/src/sync.rs +++ b/bridges/relays/ethereum/src/sync.rs @@ -310,7 +310,8 @@ pub mod tests { use super::*; use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256}; use crate::headers::tests::{header, id}; - use crate::sync_types::{HeaderId, HeaderStatus}; + use crate::sync_types::HeaderStatus; + use crate::utils::HeaderId; fn side_hash(number: u64) -> H256 { H256::from_low_u64_le(1000 + number) diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index 4b82bfdf12..62a9b06a83 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -18,10 +18,11 @@ use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams}; use crate::sync::HeadersSyncParams; use crate::sync_loop_metrics::SyncLoopMetrics; use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders}; -use crate::utils::{format_ids, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError}; +use crate::utils::{ + format_ids, interval, process_future_result, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError, +}; use async_trait::async_trait; -use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::{future::FutureExt, stream::StreamExt}; use num_traits::{Saturating, Zero}; use std::{ @@ -44,9 +45,6 @@ const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60); /// Delay after we have seen update of best source header at target node, /// for us to treat sync stalled. ONLY when relay operates in backup mode. const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); -/// Delay after connection-related error happened before we'll try -/// reconnection again. -const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); /// Source client trait. #[async_trait] @@ -186,7 +184,7 @@ pub fn run>( &mut source_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving best header number from {}", P::SOURCE_NAME), - ); + ).is_ok(); }, source_new_header = source_new_header_future => { source_client_is_online = process_future_result( @@ -196,7 +194,7 @@ pub fn run>( &mut source_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving header from {} node", P::SOURCE_NAME), - ); + ).is_ok(); }, source_orphan_header = source_orphan_header_future => { source_client_is_online = process_future_result( @@ -206,7 +204,7 @@ pub fn run>( &mut source_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), - ); + ).is_ok(); }, source_extra = source_extra_future => { source_client_is_online = process_future_result( @@ -216,7 +214,7 @@ pub fn run>( &mut source_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), - ); + ).is_ok(); }, source_completion = source_completion_future => { source_client_is_online = process_future_result( @@ -226,7 +224,7 @@ pub fn run>( &mut source_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), - ); + ).is_ok(); }, source_client = source_go_offline_future => { source_client_is_online = true; @@ -277,7 +275,7 @@ pub fn run>( &mut target_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving best known header from {} node", P::TARGET_NAME), - ); + ).is_ok(); }, incomplete_headers_ids = target_incomplete_headers_future => { target_incomplete_headers_required = false; @@ -289,7 +287,7 @@ pub fn run>( &mut target_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), - ); + ).is_ok(); }, target_existence_status = target_existence_status_future => { target_client_is_online = process_future_result( @@ -301,7 +299,7 @@ pub fn run>( &mut target_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving existence status from {} node", P::TARGET_NAME), - ); + ).is_ok(); }, submitted_headers = target_submit_header_future => { // following line helps Rust understand the type of `submitted_headers` :/ @@ -329,7 +327,7 @@ pub fn run>( &mut target_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error submitting headers to {} node", P::TARGET_NAME), - ); + ).is_ok(); log::debug!(target: "bridge", "Header submit result: {}", submitted_headers_str); @@ -350,7 +348,7 @@ pub fn run>( &mut target_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error completing headers at {}", P::TARGET_NAME), - ); + ).is_ok(); }, target_extra_check_result = target_extra_check_future => { target_client_is_online = process_future_result( @@ -362,7 +360,7 @@ pub fn run>( &mut target_go_offline_future, |delay| async_std::task::sleep(delay), || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), - ); + ).is_ok(); }, target_client = target_go_offline_future => { target_client_is_online = true; @@ -557,62 +555,6 @@ pub fn run>( }); } -/// Stream that emits item every `timeout_ms` milliseconds. -fn interval(timeout: Duration) -> impl futures::Stream { - futures::stream::unfold((), move |_| async move { - async_std::task::sleep(timeout).await; - Some(((), ())) - }) -} - -/// Process result of the future from a client. -/// -/// Returns whether or not the client we're interacting with is online. In this context -/// what online means is that the client is currently not handling any other requests -/// that we've previously sent. -pub(crate) fn process_future_result( - result: Result, - retry_backoff: &mut ExponentialBackoff, - on_success: impl FnOnce(TResult), - go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse>, - go_offline: impl FnOnce(Duration) -> TGoOfflineFuture, - error_pattern: impl FnOnce() -> String, -) -> bool -where - TError: std::fmt::Debug + MaybeConnectionError, - TGoOfflineFuture: FutureExt, -{ - let mut client_is_online = false; - - match result { - Ok(result) => { - on_success(result); - retry_backoff.reset(); - client_is_online = true - } - Err(error) => { - let is_connection_error = error.is_connection_error(); - let retry_delay = if is_connection_error { - retry_backoff.reset(); - CONNECTION_ERROR_DELAY - } else { - retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY) - }; - go_offline_future.set(go_offline(retry_delay).fuse()); - - log::error!( - target: "bridge", - "{}: {:?}. Retrying in {}s", - error_pattern(), - error, - retry_delay.as_secs_f64(), - ); - } - } - - client_is_online -} - /// Print synchronization progress. fn print_sync_progress( progress_context: (Instant, Option, Option), diff --git a/bridges/relays/ethereum/src/sync_loop_tests.rs b/bridges/relays/ethereum/src/sync_loop_tests.rs index 13206f4ad2..8e2a4380eb 100644 --- a/bridges/relays/ethereum/src/sync_loop_tests.rs +++ b/bridges/relays/ethereum/src/sync_loop_tests.rs @@ -16,9 +16,9 @@ #![cfg(test)] -use crate::sync_loop::{process_future_result, run, SourceClient, TargetClient}; -use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders}; -use crate::utils::{retry_backoff, MaybeConnectionError}; +use crate::sync_loop::{run, SourceClient, TargetClient}; +use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders}; +use crate::utils::{process_future_result, retry_backoff, HeaderId, MaybeConnectionError}; use async_trait::async_trait; use backoff::backoff::Backoff; diff --git a/bridges/relays/ethereum/src/sync_types.rs b/bridges/relays/ethereum/src/sync_types.rs index 4f9dbb3e6b..3970fb5629 100644 --- a/bridges/relays/ethereum/src/sync_types.rs +++ b/bridges/relays/ethereum/src/sync_types.rs @@ -14,14 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::utils::format_ids; +use crate::utils::{format_ids, HeaderId}; use std::{ops::Deref, sync::Arc}; -/// Ethereum header Id. -#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] -pub struct HeaderId(pub Number, pub Hash); - /// Ethereum header synchronization status. #[derive(Debug, Clone, Copy, PartialEq)] pub enum HeaderStatus { diff --git a/bridges/relays/ethereum/src/utils.rs b/bridges/relays/ethereum/src/utils.rs index dfe3b3966f..e149f1884b 100644 --- a/bridges/relays/ethereum/src/utils.rs +++ b/bridges/relays/ethereum/src/utils.rs @@ -14,12 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use backoff::ExponentialBackoff; +use backoff::{backoff::Backoff, ExponentialBackoff}; +use futures::future::FutureExt; use std::time::Duration; /// Max delay after connection-unrelated error happened before we'll try the /// same request again. const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60); +/// Delay after connection-related error happened before we'll try +/// reconnection again. +const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); /// Macro that returns (client, Err(error)) tuple from function if result is Err(error). #[macro_export] @@ -43,6 +47,10 @@ macro_rules! bail_on_arg_error { }; } +/// Ethereum header Id. +#[derive(Debug, Default, Clone, Copy, Eq, Hash, PartialEq)] +pub struct HeaderId(pub Number, pub Hash); + /// Error type that can signal connection errors. pub trait MaybeConnectionError { /// Returns true if error (maybe) represents connection error. @@ -114,3 +122,102 @@ pub fn format_ids(mut ids: impl ExactSizeIterator impl futures::Stream { + futures::stream::unfold((), move |_| async move { + async_std::task::sleep(timeout).await; + Some(((), ())) + }) +} + +/// Which client has caused error. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum FailedClient { + /// It is the source client who has caused error. + Source, + /// It is the target client who has caused error. + Target, + /// Both clients are failing, or we just encountered some other error that + /// should be treated like that. + Both, +} + +/// Future process result. +#[derive(Debug, Clone, Copy)] +pub enum ProcessFutureResult { + /// Future has been processed successfully. + Success, + /// Future has failed with non-connection error. + Failed, + /// Future has failed with connection error. + ConnectionFailed, +} + +impl ProcessFutureResult { + /// Returns true if result is Success. + pub fn is_ok(self) -> bool { + match self { + ProcessFutureResult::Success => true, + ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false, + } + } + + /// Returns Ok(true) if future has succeeded. + /// Returns Ok(false) if future has failed with non-connection error. + /// Returns Err if future is `ConnectionFailed`. + pub fn fail_if_connection_error(self, failed_client: FailedClient) -> Result { + match self { + ProcessFutureResult::Success => Ok(true), + ProcessFutureResult::Failed => Ok(false), + ProcessFutureResult::ConnectionFailed => Err(failed_client), + } + } +} + +/// Process result of the future from a client. +pub(crate) fn process_future_result( + result: Result, + retry_backoff: &mut ExponentialBackoff, + on_success: impl FnOnce(TResult), + go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse>, + go_offline: impl FnOnce(Duration) -> TGoOfflineFuture, + error_pattern: impl FnOnce() -> String, +) -> ProcessFutureResult +where + TError: std::fmt::Debug + MaybeConnectionError, + TGoOfflineFuture: FutureExt, +{ + match result { + Ok(result) => { + on_success(result); + retry_backoff.reset(); + ProcessFutureResult::Success + } + Err(error) if error.is_connection_error() => { + log::error!( + target: "bridge", + "{}: {:?}. Going to restart", + error_pattern(), + error, + ); + + retry_backoff.reset(); + go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse()); + ProcessFutureResult::ConnectionFailed + } + Err(error) => { + let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY); + log::error!( + target: "bridge", + "{}: {:?}. Retrying in {}", + error_pattern(), + error, + retry_delay.as_secs_f64(), + ); + + go_offline_future.set(go_offline(retry_delay).fuse()); + ProcessFutureResult::Failed + } + } +}