diff --git a/bridges/relays/ethereum/Cargo.toml b/bridges/relays/ethereum/Cargo.toml index 4bdcf930c1..0505ff8bc1 100644 --- a/bridges/relays/ethereum/Cargo.toml +++ b/bridges/relays/ethereum/Cargo.toml @@ -18,7 +18,6 @@ ethabi-derive = "12.0" ethereum-tx-sign = "3.0" futures = "0.3.5" hex = "0.4" -jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee.git", default-features = false, features = ["http"] } linked-hash-map = "0.5.3" log = "0.4.8" num-traits = "0.2" @@ -31,6 +30,12 @@ sp-bridge-eth-poa = { path = "../../primitives/ethereum-poa" } time = "0.2" web3 = { version = "0.12.0", default-features = false } +[dependencies.jsonrpsee] +git = "https://github.com/svyatonik/jsonrpsee.git" +branch = "shared-client-in-rpc-api" +default-features = false +features = ["http"] + # Substrate Based Dependencies [dependencies.frame-system] version = "2.0.0-rc3" diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index 5dba31344b..fcd627549a 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -15,33 +15,30 @@ // along with Parity Bridges Common. If not, see . use crate::ethereum_types::{ - Address, Bytes, CallRequest, EthereumHeaderId, Header, Receipt, TransactionHash, H256, U256, U64, + Address, Bytes, CallRequest, EthereumHeaderId, Header, Receipt, SignedRawTx, TransactionHash, H256, U256, }; +use crate::rpc::{Ethereum, EthereumRpc}; +use crate::rpc_errors::{EthereumNodeError, RpcError}; use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId}; -use crate::sync_types::{HeaderId, MaybeConnectionError}; -use crate::{bail_on_arg_error, bail_on_error}; +use crate::sync_types::HeaderId; + +use async_trait::async_trait; use codec::{Decode, Encode}; use ethabi::FunctionOutputDecoder; -use jsonrpsee::common::Params; -use jsonrpsee::raw::{RawClient, RawClientError}; -use jsonrpsee::transport::http::{HttpTransportClient, RequestError}; +use jsonrpsee::raw::RawClient; +use jsonrpsee::transport::http::HttpTransportClient; +use jsonrpsee::Client; use parity_crypto::publickey::KeyPair; -use serde::de::DeserializeOwned; -use serde_json::{from_value, to_value}; + use std::collections::HashSet; // to encode/decode contract calls ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json"); -/// Proof of hash serialization success. -const HASH_SERIALIZATION_PROOF: &'static str = "hash serialization never fails; qed"; -/// Proof of integer serialization success. -const INT_SERIALIZATION_PROOF: &'static str = "integer serialization never fails; qed"; -/// Proof of bool serialization success. -const BOOL_SERIALIZATION_PROOF: &'static str = "bool serialization never fails; qed"; +type Result = std::result::Result; /// Ethereum connection params. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EthereumConnectionParams { /// Ethereum RPC host. pub host: String, @@ -86,381 +83,283 @@ impl Default for EthereumSigningParams { } } -/// Ethereum client type. -pub type Client = RawClient; - -/// All possible errors that can occur during interacting with Ethereum node. -#[derive(Debug)] -pub enum Error { - /// Request start failed. - StartRequestFailed(RequestError), - /// Error serializing request. - RequestSerialization(serde_json::Error), - /// Request not found (should never occur?). - RequestNotFound, - /// Failed to receive response. - ResponseRetrievalFailed(RawClientError), - /// Failed to parse response. - ResponseParseFailed(String), - /// We have received header with missing number and hash fields. - IncompleteHeader, - /// We have received receipt with missing gas_used field. - IncompleteReceipt, - /// Invalid Substrate block number received from Ethereum node. - InvalidSubstrateBlockNumber, +/// The client used to interact with an Ethereum node through RPC. +pub struct EthereumRpcClient { + client: Client, } -impl MaybeConnectionError for Error { - fn is_connection_error(&self) -> bool { - match *self { - Error::StartRequestFailed(_) | Error::ResponseRetrievalFailed(_) => true, - _ => false, +impl EthereumRpcClient { + /// Create a new Ethereum RPC Client. + pub fn new(params: EthereumConnectionParams) -> Self { + let uri = format!("http://{}:{}", params.host, params.port); + let transport = HttpTransportClient::new(&uri); + let raw_client = RawClient::new(transport); + let client: Client = raw_client.into(); + + Self { client } + } +} + +#[async_trait] +impl EthereumRpc for EthereumRpcClient { + async fn estimate_gas(&self, call_request: CallRequest) -> Result { + Ok(Ethereum::estimate_gas(&self.client, call_request).await?) + } + + async fn best_block_number(&self) -> Result { + Ok(Ethereum::block_number(&self.client).await?.as_u64()) + } + + async fn header_by_number(&self, block_number: u64) -> Result
{ + let get_full_tx_objects = false; + let header = Ethereum::get_block_by_number(&self.client, block_number, get_full_tx_objects).await?; + match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { + true => Ok(header), + false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), } } -} -/// Returns client that is able to call RPCs on Ethereum node. -pub fn client(params: EthereumConnectionParams) -> Client { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - RawClient::new(transport) -} - -/// Retrieve best known block number from Ethereum node. -pub async fn best_block_number(client: Client) -> (Client, Result) { - let (client, result) = call_rpc::(client, "eth_blockNumber", Params::None).await; - (client, result.map(|x| x.as_u64())) -} - -/// Retrieve block header by its number from Ethereum node. -pub async fn header_by_number(client: Client, number: u64) -> (Client, Result) { - let (client, header) = call_rpc( - client, - "eth_getBlockByNumber", - Params::Array(vec![ - to_value(U64::from(number)).expect(INT_SERIALIZATION_PROOF), - to_value(false).expect(BOOL_SERIALIZATION_PROOF), - ]), - ) - .await; - ( - client, - header.and_then(|header: Header| { - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(Error::IncompleteHeader), - } - }), - ) -} - -/// Retrieve block header by its hash from Ethereum node. -pub async fn header_by_hash(client: Client, hash: H256) -> (Client, Result) { - let (client, header) = call_rpc( - client, - "eth_getBlockByHash", - Params::Array(vec![ - to_value(hash).expect(HASH_SERIALIZATION_PROOF), - to_value(false).expect(BOOL_SERIALIZATION_PROOF), - ]), - ) - .await; - ( - client, - header.and_then(|header: Header| { - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(Error::IncompleteHeader), - } - }), - ) -} - -/// Retrieve transactions receipts for given block. -pub async fn transactions_receipts( - mut client: Client, - id: EthereumHeaderId, - transactions: Vec, -) -> (Client, Result<(EthereumHeaderId, Vec), Error>) { - let mut transactions_receipts = Vec::with_capacity(transactions.len()); - for transaction in transactions { - let (next_client, transaction_receipt) = bail_on_error!(transaction_receipt(client, transaction).await); - transactions_receipts.push(transaction_receipt); - client = next_client; + async fn header_by_hash(&self, hash: H256) -> Result
{ + let header = Ethereum::get_block_by_hash(&self.client, hash).await?; + match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { + true => Ok(header), + false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), + } + } + + async fn transaction_receipt(&self, transaction_hash: H256) -> Result { + let receipt = Ethereum::get_transaction_receipt(&self.client, transaction_hash).await?; + + match receipt.gas_used { + Some(_) => Ok(receipt), + None => Err(RpcError::Ethereum(EthereumNodeError::IncompleteReceipt)), + } + } + + async fn account_nonce(&self, address: Address) -> Result { + Ok(Ethereum::get_transaction_count(&self.client, address).await?) + } + + async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result { + let transaction = Bytes(signed_raw_tx); + Ok(Ethereum::submit_transaction(&self.client, transaction).await?) + } + + async fn eth_call(&self, call_transaction: CallRequest) -> Result { + Ok(Ethereum::call(&self.client, call_transaction).await?) } - (client, Ok((id, transactions_receipts))) } -/// Retrieve transaction receipt by transaction hash. -async fn transaction_receipt(client: Client, hash: H256) -> (Client, Result) { - let (client, receipt) = call_rpc::( - client, - "eth_getTransactionReceipt", - Params::Array(vec![to_value(hash).expect(HASH_SERIALIZATION_PROOF)]), - ) - .await; - ( - client, - receipt.and_then(|receipt| match receipt.gas_used.is_some() { - true => Ok(receipt), - false => Err(Error::IncompleteReceipt), - }), - ) +/// A trait which contains methods that work by using multiple low-level RPCs, or more complicated +/// interactions involving, for example, an Ethereum contract. +#[async_trait] +pub trait EthereumHighLevelRpc: EthereumRpc { + /// Returns best Substrate block that PoA chain knows of. + async fn best_substrate_block(&self, contract_address: Address) -> Result; + + /// Returns true if Substrate header is known to Ethereum node. + async fn substrate_header_known( + &self, + contract_address: Address, + id: SubstrateHeaderId, + ) -> Result<(SubstrateHeaderId, bool)>; + + /// Submits Substrate headers to Ethereum contract. + async fn submit_substrate_headers( + &self, + params: EthereumSigningParams, + contract_address: Address, + headers: Vec, + ) -> Result>; + + /// Returns ids of incomplete Substrate headers. + async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result>; + + /// Complete Substrate header. + async fn complete_substrate_header( + &self, + params: EthereumSigningParams, + contract_address: Address, + id: SubstrateHeaderId, + justification: GrandpaJustification, + ) -> Result; + + /// Submit ethereum transaction. + async fn submit_ethereum_transaction( + &self, + params: &EthereumSigningParams, + contract_address: Option
, + nonce: Option, + double_gas: bool, + encoded_call: Vec, + ) -> Result<()>; + + /// Retrieve transactions receipts for given block. + async fn transaction_receipts( + &self, + id: EthereumHeaderId, + transactions: Vec, + ) -> Result<(EthereumHeaderId, Vec)>; } -/// Returns best Substrate block that PoA chain knows of. -pub async fn best_substrate_block( - client: Client, - contract_address: Address, -) -> (Client, Result) { - let (encoded_call, call_decoder) = bridge_contract::functions::best_known_header::call(); - let call_request = bail_on_arg_error!( - to_value(CallRequest { +#[async_trait] +impl EthereumHighLevelRpc for EthereumRpcClient { + async fn best_substrate_block(&self, contract_address: Address) -> Result { + let (encoded_call, call_decoder) = bridge_contract::functions::best_known_header::call(); + let call_request = CallRequest { to: Some(contract_address), data: Some(encoded_call.into()), ..Default::default() - }) - .map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, call_result) = - bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); - let (number, raw_hash) = match call_decoder.decode(&call_result.0) { - Ok((raw_number, raw_hash)) => (raw_number, raw_hash), - Err(error) => return (client, Err(Error::ResponseParseFailed(format!("{}", error)))), - }; - let hash = match SubstrateHash::decode(&mut &raw_hash[..]) { - Ok(hash) => hash, - Err(error) => return (client, Err(Error::ResponseParseFailed(format!("{}", error)))), - }; + }; - if number != number.low_u32().into() { - return (client, Err(Error::InvalidSubstrateBlockNumber)); + let call_result = self.eth_call(call_request).await?; + let (number, raw_hash) = call_decoder.decode(&call_result.0)?; + let hash = SubstrateHash::decode(&mut &raw_hash[..])?; + + if number != number.low_u32().into() { + return Err(RpcError::Ethereum(EthereumNodeError::InvalidSubstrateBlockNumber)); + } + + Ok(HeaderId(number.low_u32(), hash)) } - (client, Ok(HeaderId(number.low_u32(), hash))) -} - -/// Returns true if Substrate header is known to Ethereum node. -pub async fn substrate_header_known( - client: Client, - contract_address: Address, - id: SubstrateHeaderId, -) -> (Client, Result<(SubstrateHeaderId, bool), Error>) { - let (encoded_call, call_decoder) = bridge_contract::functions::is_known_header::call(id.1); - let call_request = bail_on_arg_error!( - to_value(CallRequest { + async fn substrate_header_known( + &self, + contract_address: Address, + id: SubstrateHeaderId, + ) -> Result<(SubstrateHeaderId, bool)> { + let (encoded_call, call_decoder) = bridge_contract::functions::is_known_header::call(id.1); + let call_request = CallRequest { to: Some(contract_address), data: Some(encoded_call.into()), ..Default::default() - }) - .map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, call_result) = - bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); - match call_decoder.decode(&call_result.0) { - Ok(is_known_block) => (client, Ok((id, is_known_block))), - Err(error) => (client, Err(Error::ResponseParseFailed(format!("{}", error)))), + }; + + let call_result = self.eth_call(call_request).await?; + let is_known_block = call_decoder.decode(&call_result.0)?; + + Ok((id, is_known_block)) } -} -/// Submits Substrate headers to Ethereum contract. -pub async fn submit_substrate_headers( - client: Client, - params: EthereumSigningParams, - contract_address: Address, - headers: Vec, -) -> (Client, Result, Error>) { - let (mut client, mut nonce) = - bail_on_error!(account_nonce(client, params.signer.address().as_fixed_bytes().into()).await); + async fn submit_substrate_headers( + &self, + params: EthereumSigningParams, + contract_address: Address, + headers: Vec, + ) -> Result> { + let address: Address = params.signer.address().as_fixed_bytes().into(); + let mut nonce = self.account_nonce(address).await?; - let ids = headers.iter().map(|header| header.id()).collect(); - for header in headers { - client = bail_on_error!( - submit_ethereum_transaction( - client, + let ids = headers.iter().map(|header| header.id()).collect(); + for header in headers { + self.submit_ethereum_transaction( ¶ms, Some(contract_address), Some(nonce), false, - bridge_contract::functions::import_header::encode_input(header.header().encode(),), + bridge_contract::functions::import_header::encode_input(header.header().encode()), ) - .await - ) - .0; + .await?; - nonce += 1.into(); + nonce += 1.into(); + } + + Ok(ids) } - (client, Ok(ids)) -} - -/// Returns ids of incomplete Substrate headers. -pub async fn incomplete_substrate_headers( - client: Client, - contract_address: Address, -) -> (Client, Result, Error>) { - let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call(); - let call_request = bail_on_arg_error!( - to_value(CallRequest { + async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result> { + let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call(); + let call_request = CallRequest { to: Some(contract_address), data: Some(encoded_call.into()), ..Default::default() - }) - .map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, call_result) = - bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); - match call_decoder.decode(&call_result.0) { - Ok((incomplete_headers_numbers, incomplete_headers_hashes)) => ( - client, - Ok(incomplete_headers_numbers - .into_iter() - .zip(incomplete_headers_hashes) - .filter_map(|(number, hash)| { - if number != number.low_u32().into() { - return None; - } + }; - Some(HeaderId(number.low_u32(), hash)) - }) - .collect()), - ), - Err(error) => (client, Err(Error::ResponseParseFailed(format!("{}", error)))), - } -} + let call_result = self.eth_call(call_request).await?; -/// Complete Substrate header. -pub async fn complete_substrate_header( - client: Client, - params: EthereumSigningParams, - contract_address: Address, - id: SubstrateHeaderId, - justification: GrandpaJustification, -) -> (Client, Result) { - let (client, _) = bail_on_error!( - submit_ethereum_transaction( - client, - ¶ms, - Some(contract_address), - None, - false, - bridge_contract::functions::import_finality_proof::encode_input(id.0, id.1, justification,), - ) - .await - ); + // Q: Is is correct to call these "incomplete_ids"? + let (incomplete_headers_numbers, incomplete_headers_hashes) = call_decoder.decode(&call_result.0)?; + let incomplete_ids = incomplete_headers_numbers + .into_iter() + .zip(incomplete_headers_hashes) + .filter_map(|(number, hash)| { + if number != number.low_u32().into() { + return None; + } - (client, Ok(id)) -} + Some(HeaderId(number.low_u32(), hash)) + }) + .collect(); -/// Deploy bridge contract. -pub async fn deploy_bridge_contract( - client: Client, - params: &EthereumSigningParams, - contract_code: Vec, - initial_header: Vec, - initial_set_id: u64, - initial_authorities: Vec, -) -> (Client, Result<(), Error>) { - submit_ethereum_transaction( - client, - params, - None, - None, - false, - bridge_contract::constructor(contract_code, initial_header, initial_set_id, initial_authorities), - ) - .await -} - -/// Submit ethereum transaction. -async fn submit_ethereum_transaction( - client: Client, - params: &EthereumSigningParams, - contract_address: Option
, - nonce: Option, - double_gas: bool, - encoded_call: Vec, -) -> (Client, Result<(), Error>) { - let (client, nonce) = match nonce { - Some(nonce) => (client, nonce), - None => bail_on_error!(account_nonce(client, params.signer.address().as_fixed_bytes().into()).await), - }; - let (client, gas) = bail_on_error!( - estimate_gas( - client, - CallRequest { - to: contract_address, - data: Some(encoded_call.clone().into()), - ..Default::default() - } - ) - .await - ); - let raw_transaction = ethereum_tx_sign::RawTransaction { - nonce, - to: contract_address, - value: U256::zero(), - gas: if double_gas { gas.saturating_mul(2.into()) } else { gas }, - gas_price: params.gas_price, - data: encoded_call, - } - .sign(¶ms.signer.secret().as_fixed_bytes().into(), ¶ms.chain_id); - let transaction = bail_on_arg_error!( - to_value(Bytes(raw_transaction)).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, _) = bail_on_error!( - call_rpc::(client, "eth_submitTransaction", Params::Array(vec![transaction])).await - ); - (client, Ok(())) -} - -/// Get account nonce. -async fn account_nonce(client: Client, caller_address: Address) -> (Client, Result) { - let caller_address = bail_on_arg_error!( - to_value(caller_address).map_err(|e| Error::RequestSerialization(e)), - client - ); - call_rpc(client, "eth_getTransactionCount", Params::Array(vec![caller_address])).await -} - -/// Estimate gas usage for call. -async fn estimate_gas(client: Client, call_request: CallRequest) -> (Client, Result) { - let call_request = bail_on_arg_error!( - to_value(call_request).map_err(|e| Error::RequestSerialization(e)), - client - ); - call_rpc(client, "eth_estimateGas", Params::Array(vec![call_request])).await -} - -/// Calls RPC on Ethereum node. -async fn call_rpc( - mut client: Client, - method: &'static str, - params: Params, -) -> (Client, Result) { - async fn do_call_rpc( - client: &mut Client, - method: &'static str, - params: Params, - ) -> Result { - let request_id = client - .start_request(method, params) - .await - .map_err(Error::StartRequestFailed)?; - // WARN: if there'll be need for executing >1 request at a time, we should avoid - // calling request_by_id - let response = client - .request_by_id(request_id) - .ok_or(Error::RequestNotFound)? - .await - .map_err(Error::ResponseRetrievalFailed)?; - from_value(response).map_err(|e| Error::ResponseParseFailed(format!("{}", e))) + Ok(incomplete_ids) } - let result = do_call_rpc(&mut client, method, params).await; - (client, result) + async fn complete_substrate_header( + &self, + params: EthereumSigningParams, + contract_address: Address, + id: SubstrateHeaderId, + justification: GrandpaJustification, + ) -> Result { + let _ = self + .submit_ethereum_transaction( + ¶ms, + Some(contract_address), + None, + false, + bridge_contract::functions::import_finality_proof::encode_input(id.0, id.1, justification), + ) + .await?; + + Ok(id) + } + + async fn submit_ethereum_transaction( + &self, + params: &EthereumSigningParams, + contract_address: Option
, + nonce: Option, + double_gas: bool, + encoded_call: Vec, + ) -> Result<()> { + let nonce = if let Some(n) = nonce { + n + } else { + let address: Address = params.signer.address().as_fixed_bytes().into(); + self.account_nonce(address).await? + }; + + let call_request = CallRequest { + to: contract_address, + data: Some(encoded_call.clone().into()), + ..Default::default() + }; + let gas = self.estimate_gas(call_request).await?; + + let raw_transaction = ethereum_tx_sign::RawTransaction { + nonce, + to: contract_address, + value: U256::zero(), + gas: if double_gas { gas.saturating_mul(2.into()) } else { gas }, + gas_price: params.gas_price, + data: encoded_call, + } + .sign(¶ms.signer.secret().as_fixed_bytes().into(), ¶ms.chain_id); + + let _ = self.submit_transaction(raw_transaction).await?; + Ok(()) + } + + async fn transaction_receipts( + &self, + id: EthereumHeaderId, + transactions: Vec, + ) -> Result<(EthereumHeaderId, Vec)> { + let mut transaction_receipts = Vec::with_capacity(transactions.len()); + for transaction in transactions { + let transaction_receipt = self.transaction_receipt(transaction).await?; + transaction_receipts.push(transaction_receipt); + } + Ok((id, transaction_receipts)) + } } diff --git a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs index eb9f66be92..0e573bda06 100644 --- a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs +++ b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs @@ -14,9 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::ethereum_client::{self, EthereumConnectionParams, EthereumSigningParams}; -use crate::substrate_client::{self, SubstrateConnectionParams}; +use crate::ethereum_client::{ + bridge_contract, EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient, EthereumSigningParams, +}; +use crate::rpc::SubstrateRpc; +use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient}; use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader}; + use codec::{Decode, Encode}; use num_traits::Zero; @@ -59,18 +63,16 @@ pub fn run(params: EthereumDeployContractParams) { let mut local_pool = futures::executor::LocalPool::new(); let result = local_pool.run_until(async move { - let eth_client = ethereum_client::client(params.eth); - let sub_client = substrate_client::client(params.sub); + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = SubstrateRpcClient::new(params.sub).await?; - let (sub_client, initial_header) = prepare_initial_header(sub_client, params.sub_initial_header).await; - let (initial_header_hash, initial_header) = initial_header?; + let (initial_header_hash, initial_header) = prepare_initial_header(&sub_client, params.sub_initial_header).await?; let initial_set_id = params.sub_initial_authorities_set_id.unwrap_or(0); - let (_, initial_set) = prepare_initial_authorities_set( - sub_client, + let initial_set = prepare_initial_authorities_set( + &sub_client, initial_header_hash, params.sub_initial_authorities_set, - ).await; - let initial_set = initial_set?; + ).await?; log::info!( target: "bridge", @@ -81,14 +83,14 @@ pub fn run(params: EthereumDeployContractParams) { hex::encode(&initial_set), ); - ethereum_client::deploy_bridge_contract( - eth_client, + deploy_bridge_contract( + ð_client, ¶ms.eth_sign, params.eth_contract_code, initial_header, initial_set_id, initial_set, - ).await.1.map_err(|error| format!("Error deploying contract: {:?}", error)) + ).await }); if let Err(error) = result { @@ -98,39 +100,54 @@ pub fn run(params: EthereumDeployContractParams) { /// Prepare initial header. async fn prepare_initial_header( - sub_client: substrate_client::Client, + sub_client: &SubstrateRpcClient, sub_initial_header: Option>, -) -> (substrate_client::Client, Result<(SubstrateHash, Vec), String>) { +) -> Result<(SubstrateHash, Vec), String> { match sub_initial_header { Some(raw_initial_header) => match SubstrateHeader::decode(&mut &raw_initial_header[..]) { - Ok(initial_header) => (sub_client, Ok((initial_header.hash(), raw_initial_header))), - Err(error) => (sub_client, Err(format!("Error decoding initial header: {}", error))), + Ok(initial_header) => Ok((initial_header.hash(), raw_initial_header)), + Err(error) => Err(format!("Error decoding initial header: {}", error)), }, None => { - let (sub_client, initial_header) = substrate_client::header_by_number(sub_client, Zero::zero()).await; - ( - sub_client, - initial_header - .map(|header| (header.hash(), header.encode())) - .map_err(|error| format!("Error reading Substrate genesis header: {:?}", error)), - ) + let initial_header = sub_client.header_by_number(Zero::zero()).await; + initial_header + .map(|header| (header.hash(), header.encode())) + .map_err(|error| format!("Error reading Substrate genesis header: {:?}", error)) } } } /// Prepare initial GRANDPA authorities set. async fn prepare_initial_authorities_set( - sub_client: substrate_client::Client, + sub_client: &SubstrateRpcClient, sub_initial_header_hash: SubstrateHash, sub_initial_authorities_set: Option>, -) -> (substrate_client::Client, Result, String>) { - let (sub_client, initial_authorities_set) = match sub_initial_authorities_set { - Some(initial_authorities_set) => (sub_client, Ok(initial_authorities_set)), - None => substrate_client::grandpa_authorities_set(sub_client, sub_initial_header_hash).await, +) -> Result, String> { + let initial_authorities_set = match sub_initial_authorities_set { + Some(initial_authorities_set) => Ok(initial_authorities_set), + None => sub_client.grandpa_authorities_set(sub_initial_header_hash).await, }; - ( - sub_client, - initial_authorities_set.map_err(|error| format!("Error reading GRANDPA authorities set: {:?}", error)), - ) + initial_authorities_set.map_err(|error| format!("Error reading GRANDPA authorities set: {:?}", error)) +} + +/// Deploy bridge contract to Ethereum chain. +async fn deploy_bridge_contract( + eth_client: &EthereumRpcClient, + params: &EthereumSigningParams, + contract_code: Vec, + initial_header: Vec, + initial_set_id: u64, + initial_authorities: Vec, +) -> Result<(), String> { + eth_client + .submit_ethereum_transaction( + params, + None, + None, + false, + bridge_contract::constructor(contract_code, initial_header, initial_set_id, initial_authorities), + ) + .await + .map_err(|error| format!("Error deploying contract: {:?}", error)) } diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index 95f415a753..26804d4de0 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -16,17 +16,23 @@ //! Ethereum PoA -> Substrate synchronization. -use crate::ethereum_client::{self, EthereumConnectionParams}; +use crate::ethereum_client::{EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient}; use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt}; -use crate::substrate_client::{self, SubstrateConnectionParams, SubstrateSigningParams}; +use crate::rpc::{EthereumRpc, SubstrateRpc}; +use crate::rpc_errors::RpcError; +use crate::substrate_client::{ + SubmitEthereumHeaders, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams, +}; +use crate::substrate_types::into_substrate_ethereum_header; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; -use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; +use crate::sync_loop::{SourceClient, TargetClient}; +use crate::sync_types::SourceHeader; use async_trait::async_trait; -use futures::future::FutureExt; -use std::{collections::HashSet, time::Duration}; use web3::types::H256; +use std::{collections::HashSet, time::Duration}; + /// Interval at which we check new Ethereum headers when we are synced/almost synced. const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(10); /// Interval at which we check new Substrate blocks. @@ -45,6 +51,7 @@ const MAX_SUBMITTED_HEADERS: usize = 128; const PRUNE_DEPTH: u32 = 4096; /// Ethereum synchronization parameters. +#[derive(Clone)] pub struct EthereumSyncParams { /// Ethereum connection params. pub eth: EthereumConnectionParams, @@ -77,159 +84,124 @@ impl Default for EthereumSyncParams { /// Ethereum client as headers source. struct EthereumHeadersSource { /// Ethereum node client. - client: ethereum_client::Client, + client: EthereumRpcClient, } -type EthereumFutureOutput = OwnedSourceFutureOutput; +impl EthereumHeadersSource { + fn new(client: EthereumRpcClient) -> Self { + Self { client } + } +} #[async_trait] impl SourceClient for EthereumHeadersSource { - type Error = ethereum_client::Error; + type Error = RpcError; - async fn best_block_number(self) -> EthereumFutureOutput { - ethereum_client::best_block_number(self.client) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .await + async fn best_block_number(&self) -> Result { + self.client.best_block_number().await } - async fn header_by_hash(self, hash: H256) -> EthereumFutureOutput
{ - ethereum_client::header_by_hash(self.client, hash) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .await + async fn header_by_hash(&self, hash: H256) -> Result { + self.client.header_by_hash(hash).await } - async fn header_by_number(self, number: u64) -> EthereumFutureOutput
{ - ethereum_client::header_by_number(self.client, number) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .await + async fn header_by_number(&self, number: u64) -> Result { + self.client.header_by_number(number).await } - async fn header_completion(self, id: EthereumHeaderId) -> EthereumFutureOutput<(EthereumHeaderId, Option<()>)> { - (self, Ok((id, None))) + async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), Self::Error> { + Ok((id, None)) } async fn header_extra( - self, + &self, id: EthereumHeaderId, header: QueuedEthereumHeader, - ) -> EthereumFutureOutput<(EthereumHeaderId, Vec)> { - ethereum_client::transactions_receipts(self.client, id, header.header().transactions.clone()) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) + ) -> Result<(EthereumHeaderId, Vec), Self::Error> { + self.client + .transaction_receipts(id, header.header().transactions.clone()) .await } } -/// Substrate client as Ethereum headers target. struct SubstrateHeadersTarget { /// Substrate node client. - client: substrate_client::Client, + client: SubstrateRpcClient, /// Whether we want to submit signed (true), or unsigned (false) transactions. sign_transactions: bool, /// Substrate signing params. sign_params: SubstrateSigningParams, } -type SubstrateFutureOutput = OwnedTargetFutureOutput; +impl SubstrateHeadersTarget { + fn new(client: SubstrateRpcClient, sign_transactions: bool, sign_params: SubstrateSigningParams) -> Self { + Self { + client, + sign_transactions, + sign_params, + } + } +} #[async_trait] impl TargetClient for SubstrateHeadersTarget { - type Error = substrate_client::Error; + type Error = RpcError; - async fn best_header_id(self) -> SubstrateFutureOutput { - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::best_ethereum_block(self.client) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) + async fn best_header_id(&self) -> Result { + self.client.best_ethereum_block().await + } + + async fn is_known_header(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, bool), Self::Error> { + Ok((id, self.client.ethereum_header_known(id).await?)) + } + + async fn submit_headers(&self, headers: Vec) -> Result, Self::Error> { + let (sign_params, sign_transactions) = (self.sign_params.clone(), self.sign_transactions.clone()); + self.client + .submit_ethereum_headers(sign_params, headers, sign_transactions) .await } - async fn is_known_header(self, id: EthereumHeaderId) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> { - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::ethereum_header_known(self.client, id) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) - .await + async fn incomplete_headers_ids(&self) -> Result, Self::Error> { + Ok(HashSet::new()) } - async fn submit_headers(self, headers: Vec) -> SubstrateFutureOutput> { - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::submit_ethereum_headers(self.client, sign_params.clone(), headers, sign_transactions) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) - .await + async fn complete_header(&self, id: EthereumHeaderId, _completion: ()) -> Result { + Ok(id) } - async fn incomplete_headers_ids(self) -> SubstrateFutureOutput> { - (self, Ok(HashSet::new())) - } - - async fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> SubstrateFutureOutput { - (self, Ok(id)) - } - - async fn requires_extra(self, header: QueuedEthereumHeader) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> { + async fn requires_extra(&self, header: QueuedEthereumHeader) -> Result<(EthereumHeaderId, bool), Self::Error> { // we can minimize number of receipts_check calls by checking header // logs bloom here, but it may give us false positives (when authorities // source is contract, we never need any logs) - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::ethereum_receipts_required(self.client, header) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) - .await + let id = header.header().id(); + let sub_eth_header = into_substrate_ethereum_header(header.header()); + Ok((id, self.client.ethereum_receipts_required(sub_eth_header).await?)) } } /// Run Ethereum headers synchronization. -pub fn run(params: EthereumSyncParams) { - let eth_client = ethereum_client::client(params.eth); - let sub_client = substrate_client::client(params.sub); +pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { + let sub_params = params.clone(); + + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = async_std::task::block_on(async { SubstrateRpcClient::new(sub_params.sub).await })?; let sign_sub_transactions = match params.sync_params.target_tx_mode { TargetTransactionMode::Signed | TargetTransactionMode::Backup => true, TargetTransactionMode::Unsigned => false, }; + let source = EthereumHeadersSource::new(eth_client); + let target = SubstrateHeadersTarget::new(sub_client, sign_sub_transactions, params.sub_sign); + crate::sync_loop::run( - EthereumHeadersSource { client: eth_client }, + source, ETHEREUM_TICK_INTERVAL, - SubstrateHeadersTarget { - client: sub_client, - sign_transactions: sign_sub_transactions, - sign_params: params.sub_sign, - }, + target, SUBSTRATE_TICK_INTERVAL, params.sync_params, ); + + Ok(()) } diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index c17582e23c..5786ff23db 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -46,28 +46,38 @@ fn main() { let matches = clap::App::from_yaml(yaml).get_matches(); match matches.subcommand() { ("eth-to-sub", Some(eth_to_sub_matches)) => { - ethereum_sync_loop::run(match ethereum_sync_params(ð_to_sub_matches) { + if ethereum_sync_loop::run(match ethereum_sync_params(ð_to_sub_matches) { Ok(ethereum_sync_params) => ethereum_sync_params, Err(err) => { log::error!(target: "bridge", "Error parsing parameters: {}", err); return; } - }); + }) + .is_err() + { + log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync."); + return; + }; } ("sub-to-eth", Some(sub_to_eth_matches)) => { - substrate_sync_loop::run(match substrate_sync_params(&sub_to_eth_matches) { + if substrate_sync_loop::run(match substrate_sync_params(&sub_to_eth_matches) { Ok(substrate_sync_params) => substrate_sync_params, Err(err) => { log::error!(target: "bridge", "Error parsing parameters: {}", err); return; } - }); + }) + .is_err() + { + log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync."); + return; + }; } ("eth-deploy-contract", Some(eth_deploy_matches)) => { ethereum_deploy_contract::run(match ethereum_deploy_contract_params(ð_deploy_matches) { Ok(ethereum_deploy_matches) => ethereum_deploy_matches, Err(err) => { - log::error!(target: "bridge", "Error parsing parameters: {}", err); + log::error!(target: "bridge", "Error during contract deployment: {}", err); return; } }); diff --git a/bridges/relays/ethereum/src/rpc.rs b/bridges/relays/ethereum/src/rpc.rs index 70c775a268..1dc253406c 100644 --- a/bridges/relays/ethereum/src/rpc.rs +++ b/bridges/relays/ethereum/src/rpc.rs @@ -16,69 +16,61 @@ //! RPC Module +#![warn(missing_docs)] + +// The compiler doesn't think we're using the +// code from rpc_api! #![allow(dead_code)] #![allow(unused_variables)] -#[warn(missing_docs)] use std::result; -use crate::ethereum_client::EthereumConnectionParams; use crate::ethereum_types::{ Address as EthAddress, Bytes, CallRequest, EthereumHeaderId, Header as EthereumHeader, Receipt, SignedRawTx, TransactionHash as EthereumTxHash, H256, U256, U64, }; -use crate::rpc_errors::{EthereumNodeError, RpcError}; -use crate::substrate_client::SubstrateConnectionParams; +use crate::rpc_errors::RpcError; use crate::substrate_types::{ Hash as SubstrateHash, Header as SubstrateHeader, Number as SubBlockNumber, SignedBlock as SubstrateBlock, }; -use crate::sync_types::HeaderId; use async_trait::async_trait; -use codec::{Decode, Encode}; -use jsonrpsee::raw::client::RawClient; -use jsonrpsee::transport::http::HttpTransportClient; use sp_bridge_eth_poa::Header as SubstrateEthereumHeader; -const ETH_API_BEST_BLOCK: &str = "EthereumHeadersApi_best_block"; -const ETH_API_IMPORT_REQUIRES_RECEIPTS: &str = "EthereumHeadersApi_is_import_requires_receipts"; -const ETH_API_IS_KNOWN_BLOCK: &str = "EthereumHeadersApi_is_known_block"; -const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; - type Result = result::Result; type GrandpaAuthorityList = Vec; jsonrpsee::rpc_api! { - Ethereum { - #[rpc(method = "eth_estimateGas")] + pub(crate) Ethereum { + #[rpc(method = "eth_estimateGas", positional_params)] fn estimate_gas(call_request: CallRequest) -> U256; - #[rpc(method = "eth_blockNumber")] + #[rpc(method = "eth_blockNumber", positional_params)] fn block_number() -> U64; - #[rpc(method = "eth_getBlockByNumber")] - fn get_block_by_number(block_number: u64) -> EthereumHeader; - #[rpc(method = "eth_getBlockByHash")] + #[rpc(method = "eth_getBlockByNumber", positional_params)] + fn get_block_by_number(block_number: U64, full_tx_objs: bool) -> EthereumHeader; + #[rpc(method = "eth_getBlockByHash", positional_params)] fn get_block_by_hash(hash: H256) -> EthereumHeader; - #[rpc(method = "eth_getTransactionReceipt")] + #[rpc(method = "eth_getTransactionReceipt", positional_params)] fn get_transaction_receipt(transaction_hash: H256) -> Receipt; - #[rpc(method = "eth_getTransactionCount")] + #[rpc(method = "eth_getTransactionCount", positional_params)] fn get_transaction_count(address: EthAddress) -> U256; - #[rpc(method = "eth_submitTransaction")] + #[rpc(method = "eth_submitTransaction", positional_params)] fn submit_transaction(transaction: Bytes) -> EthereumTxHash; - #[rpc(method = "eth_call")] + #[rpc(method = "eth_call", positional_params)] fn call(transaction_call: CallRequest) -> Bytes; } - Substrate { - #[rpc(method = "chain_getHeader")] + pub(crate) Substrate { + #[rpc(method = "chain_getHeader", positional_params)] fn chain_get_header(block_hash: Option) -> SubstrateHeader; - #[rpc(method = "chain_getBlock")] + #[rpc(method = "chain_getBlock", positional_params)] fn chain_get_block(block_hash: Option) -> SubstrateBlock; - #[rpc(method = "chain_getBlockHash")] + #[rpc(method = "chain_getBlockHash", positional_params)] fn chain_get_block_hash(block_number: Option) -> SubstrateHash; - #[rpc(method = "system_accountNextIndex")] + #[rpc(method = "system_accountNextIndex", positional_params)] fn system_account_next_index(account_id: node_primitives::AccountId) -> node_primitives::Index; - #[rpc(method = "author_submitExtrinsic")] + #[rpc(method = "author_submitExtrinsic", positional_params)] fn author_submit_extrinsic(extrinsic: Bytes) -> SubstrateHash; - #[rpc(method = "state_call")] + #[rpc(method = "state_call", positional_params)] fn state_call(method: String, data: Bytes, at_block: Option) -> Bytes; } } @@ -87,215 +79,52 @@ jsonrpsee::rpc_api! { #[async_trait] pub trait EthereumRpc { /// Estimate gas usage for the given call. - async fn estimate_gas(&mut self, call_request: CallRequest) -> Result; + async fn estimate_gas(&self, call_request: CallRequest) -> Result; /// Retrieve number of the best known block from the Ethereum node. - async fn best_block_number(&mut self) -> Result; + async fn best_block_number(&self) -> Result; /// Retrieve block header by its number from Ethereum node. - async fn header_by_number(&mut self, block_number: u64) -> Result; + async fn header_by_number(&self, block_number: u64) -> Result; /// Retrieve block header by its hash from Ethereum node. - async fn header_by_hash(&mut self, hash: H256) -> Result; + async fn header_by_hash(&self, hash: H256) -> Result; /// Retrieve transaction receipt by transaction hash. - async fn transaction_receipt(&mut self, transaction_hash: H256) -> Result; + async fn transaction_receipt(&self, transaction_hash: H256) -> Result; /// Get the nonce of the given account. - async fn account_nonce(&mut self, address: EthAddress) -> Result; + async fn account_nonce(&self, address: EthAddress) -> Result; /// Submit an Ethereum transaction. /// /// The transaction must already be signed before sending it through this method. - async fn submit_transaction(&mut self, signed_raw_tx: SignedRawTx) -> Result; + async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result; /// Submit a call to an Ethereum smart contract. - async fn eth_call(&mut self, call_transaction: CallRequest) -> Result; -} - -/// The client used to interact with an Ethereum node through RPC. -pub struct EthereumRpcClient { - client: RawClient, -} - -impl EthereumRpcClient { - /// Create a new Ethereum RPC Client. - pub fn new(params: EthereumConnectionParams) -> Self { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - let client = RawClient::new(transport); - - Self { client } - } -} - -#[async_trait] -impl EthereumRpc for EthereumRpcClient { - async fn estimate_gas(&mut self, call_request: CallRequest) -> Result { - Ok(Ethereum::estimate_gas(&mut self.client, call_request).await?) - } - - async fn best_block_number(&mut self) -> Result { - Ok(Ethereum::block_number(&mut self.client).await?.as_u64()) - } - - async fn header_by_number(&mut self, block_number: u64) -> Result { - let header = Ethereum::get_block_by_number(&mut self.client, block_number).await?; - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), - } - } - - async fn header_by_hash(&mut self, hash: H256) -> Result { - let header = Ethereum::get_block_by_hash(&mut self.client, hash).await?; - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), - } - } - - async fn transaction_receipt(&mut self, transaction_hash: H256) -> Result { - let receipt = Ethereum::get_transaction_receipt(&mut self.client, transaction_hash).await?; - - match receipt.gas_used { - Some(_) => Ok(receipt), - None => Err(RpcError::Ethereum(EthereumNodeError::IncompleteReceipt)), - } - } - - async fn account_nonce(&mut self, address: EthAddress) -> Result { - Ok(Ethereum::get_transaction_count(&mut self.client, address).await?) - } - - async fn submit_transaction(&mut self, signed_raw_tx: SignedRawTx) -> Result { - let transaction = Bytes(signed_raw_tx); - Ok(Ethereum::submit_transaction(&mut self.client, transaction).await?) - } - - async fn eth_call(&mut self, call_transaction: CallRequest) -> Result { - Ok(Ethereum::call(&mut self.client, call_transaction).await?) - } + async fn eth_call(&self, call_transaction: CallRequest) -> Result; } /// The API for the supported Substrate RPC methods. #[async_trait] pub trait SubstrateRpc { /// Returns the best Substrate header. - async fn best_header(&mut self) -> Result; + async fn best_header(&self) -> Result; /// Get a Substrate block from its hash. - async fn get_block(&mut self, block_hash: Option) -> Result; + async fn get_block(&self, block_hash: Option) -> Result; /// Get a Substrate header by its hash. - async fn header_by_hash(&mut self, hash: SubstrateHash) -> Result; + async fn header_by_hash(&self, hash: SubstrateHash) -> Result; /// Get a Substrate block hash by its number. - async fn block_hash_by_number(&mut self, number: SubBlockNumber) -> Result; + async fn block_hash_by_number(&self, number: SubBlockNumber) -> Result; /// Get a Substrate header by its number. - async fn header_by_number(&mut self, block_number: SubBlockNumber) -> Result; + async fn header_by_number(&self, block_number: SubBlockNumber) -> Result; /// Get the nonce of the given Substrate account. /// /// Note: It's the caller's responsibility to make sure `account` is a valid ss58 address. - async fn next_account_index(&mut self, account: node_primitives::AccountId) -> Result; + async fn next_account_index(&self, account: node_primitives::AccountId) -> Result; /// Returns best Ethereum block that Substrate runtime knows of. - async fn best_ethereum_block(&mut self) -> Result; + async fn best_ethereum_block(&self) -> Result; /// Returns whether or not transactions receipts are required for Ethereum header submission. - async fn ethereum_receipts_required(&mut self, header: SubstrateEthereumHeader) -> Result; + async fn ethereum_receipts_required(&self, header: SubstrateEthereumHeader) -> Result; /// Returns whether or not the given Ethereum header is known to the Substrate runtime. - async fn ethereum_header_known(&mut self, header_id: EthereumHeaderId) -> Result; + async fn ethereum_header_known(&self, header_id: EthereumHeaderId) -> Result; /// Submit an extrinsic for inclusion in a block. /// /// Note: The given transaction does not need be SCALE encoded beforehand. - async fn submit_extrinsic(&mut self, transaction: Bytes) -> Result; + async fn submit_extrinsic(&self, transaction: Bytes) -> Result; /// Get the GRANDPA authority set at given block. - async fn grandpa_authorities_set(&mut self, block: SubstrateHash) -> Result; -} - -/// The client used to interact with a Substrate node through RPC. -pub struct SubstrateRpcClient { - client: RawClient, -} - -impl SubstrateRpcClient { - /// Create a new Substrate RPC Client. - pub fn new(params: SubstrateConnectionParams) -> Self { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - let client = RawClient::new(transport); - - Self { client } - } -} - -#[async_trait] -impl SubstrateRpc for SubstrateRpcClient { - async fn best_header(&mut self) -> Result { - Ok(Substrate::chain_get_header(&mut self.client, None).await?) - } - - async fn get_block(&mut self, block_hash: Option) -> Result { - Ok(Substrate::chain_get_block(&mut self.client, block_hash).await?) - } - - async fn header_by_hash(&mut self, block_hash: SubstrateHash) -> Result { - Ok(Substrate::chain_get_header(&mut self.client, block_hash).await?) - } - - async fn block_hash_by_number(&mut self, number: SubBlockNumber) -> Result { - Ok(Substrate::chain_get_block_hash(&mut self.client, number).await?) - } - - async fn header_by_number(&mut self, block_number: SubBlockNumber) -> Result { - let block_hash = Self::block_hash_by_number(self, block_number).await?; - Ok(Self::header_by_hash(self, block_hash).await?) - } - - async fn next_account_index(&mut self, account: node_primitives::AccountId) -> Result { - Ok(Substrate::system_account_next_index(&mut self.client, account).await?) - } - - async fn best_ethereum_block(&mut self) -> Result { - let call = ETH_API_BEST_BLOCK.to_string(); - let data = Bytes("0x".into()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let decoded_response: (u64, sp_bridge_eth_poa::H256) = Decode::decode(&mut &encoded_response.0[..])?; - - let best_header_id = HeaderId(decoded_response.0, decoded_response.1); - Ok(best_header_id) - } - - async fn ethereum_receipts_required(&mut self, header: SubstrateEthereumHeader) -> Result { - let call = ETH_API_IMPORT_REQUIRES_RECEIPTS.to_string(); - let data = Bytes(header.encode()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let receipts_required: bool = Decode::decode(&mut &encoded_response.0[..])?; - - // Gonna make it the responsibility of the caller to return (receipts_required, id) - Ok(receipts_required) - } - - // The Substrate module could prune old headers. So this function could return false even - // if header is synced. And we'll mark corresponding Ethereum header as Orphan. - // - // But when we read the best header from Substrate next time, we will know that - // there's a better header. This Orphan will either be marked as synced, or - // eventually pruned. - async fn ethereum_header_known(&mut self, header_id: EthereumHeaderId) -> Result { - let call = ETH_API_IS_KNOWN_BLOCK.to_string(); - let data = Bytes(header_id.1.encode()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let is_known_block: bool = Decode::decode(&mut &encoded_response.0[..])?; - - // Gonna make it the responsibility of the caller to return (is_known_block, id) - Ok(is_known_block) - } - - async fn submit_extrinsic(&mut self, transaction: Bytes) -> Result { - let encoded_transaction = Bytes(transaction.0.encode()); - Ok(Substrate::author_submit_extrinsic(&mut self.client, encoded_transaction).await?) - } - - async fn grandpa_authorities_set(&mut self, block: SubstrateHash) -> Result { - let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); - let data = Bytes(block.as_bytes().to_vec()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let authority_list = encoded_response.0; - - Ok(authority_list) - } + async fn grandpa_authorities_set(&self, block: SubstrateHash) -> Result; } diff --git a/bridges/relays/ethereum/src/rpc_errors.rs b/bridges/relays/ethereum/src/rpc_errors.rs index 86758b5599..34295df756 100644 --- a/bridges/relays/ethereum/src/rpc_errors.rs +++ b/bridges/relays/ethereum/src/rpc_errors.rs @@ -14,14 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -#![allow(dead_code)] +use crate::sync_types::MaybeConnectionError; -use jsonrpsee::raw::client::RawClientError; -use jsonrpsee::transport::http::RequestError; +use jsonrpsee::client::RequestError; use serde_json; -type RpcHttpError = RawClientError; - /// Contains common errors that can occur when /// interacting with a Substrate or Ethereum node /// through RPC. @@ -35,9 +32,18 @@ pub enum RpcError { Substrate(SubstrateNodeError), /// An error that can occur when making an HTTP request to /// an JSON-RPC client. - Request(RpcHttpError), - /// The response from the client could not be SCALE decoded. - Decoding(codec::Error), + Request(RequestError), +} + +impl From for String { + fn from(err: RpcError) -> Self { + match err { + RpcError::Serialization(e) => e.to_string(), + RpcError::Ethereum(e) => e.to_string(), + RpcError::Substrate(e) => e.to_string(), + RpcError::Request(e) => e.to_string(), + } + } } impl From for RpcError { @@ -58,15 +64,30 @@ impl From for RpcError { } } -impl From for RpcError { - fn from(err: RpcHttpError) -> Self { +impl From for RpcError { + fn from(err: RequestError) -> Self { Self::Request(err) } } +impl From for RpcError { + fn from(err: ethabi::Error) -> Self { + Self::Ethereum(EthereumNodeError::ResponseParseFailed(format!("{}", err))) + } +} + +impl MaybeConnectionError for RpcError { + fn is_connection_error(&self) -> bool { + match *self { + RpcError::Request(RequestError::TransportError(_)) => true, + _ => false, + } + } +} + impl From for RpcError { fn from(err: codec::Error) -> Self { - Self::Decoding(err) + Self::Substrate(SubstrateNodeError::Decoding(err)) } } @@ -85,14 +106,30 @@ pub enum EthereumNodeError { InvalidSubstrateBlockNumber, } +impl ToString for EthereumNodeError { + fn to_string(&self) -> String { + match self { + Self::ResponseParseFailed(e) => e, + Self::IncompleteHeader => "Incomplete Ethereum Header Received", + Self::IncompleteReceipt => "Incomplete Ethereum Receipt Recieved", + Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node", + } + .to_string() + } +} + /// Errors that can occur only when interacting with /// a Substrate node through RPC. #[derive(Debug)] pub enum SubstrateNodeError { - /// Request start failed. - StartRequestFailed(RequestError), - /// Error serializing request. - RequestSerialization(serde_json::Error), - /// Failed to parse response. - ResponseParseFailed, + /// The response from the client could not be SCALE decoded. + Decoding(codec::Error), +} + +impl ToString for SubstrateNodeError { + fn to_string(&self) -> String { + match self { + Self::Decoding(e) => e.what().to_string(), + } + } } diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index e7067cbda3..cd77545b26 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -15,24 +15,34 @@ // along with Parity Bridges Common. If not, see . use crate::ethereum_types::{Bytes, EthereumHeaderId, QueuedEthereumHeader, H256}; +use crate::rpc::{Substrate, SubstrateRpc}; +use crate::rpc_errors::RpcError; use crate::substrate_types::{ - into_substrate_ethereum_header, into_substrate_ethereum_receipts, GrandpaJustification, Hash, - Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock, SubstrateHeaderId, + into_substrate_ethereum_header, into_substrate_ethereum_receipts, Hash, Header as SubstrateHeader, Number, + SignedBlock as SignedSubstrateBlock, }; -use crate::sync_types::{HeaderId, MaybeConnectionError, SourceHeader}; -use crate::{bail_on_arg_error, bail_on_error}; +use crate::sync_types::HeaderId; + +use async_trait::async_trait; use codec::{Decode, Encode}; -use jsonrpsee::common::Params; -use jsonrpsee::raw::{RawClient, RawClientError}; -use jsonrpsee::transport::http::{HttpTransportClient, RequestError}; +use jsonrpsee::raw::RawClient; +use jsonrpsee::transport::http::HttpTransportClient; +use jsonrpsee::Client; use num_traits::Zero; -use serde::de::DeserializeOwned; -use serde_json::{from_value, to_value, Value}; +use sp_bridge_eth_poa::Header as SubstrateEthereumHeader; use sp_core::crypto::Pair; use sp_runtime::traits::IdentifyAccount; +const ETH_API_IMPORT_REQUIRES_RECEIPTS: &str = "EthereumHeadersApi_is_import_requires_receipts"; +const ETH_API_IS_KNOWN_BLOCK: &str = "EthereumHeadersApi_is_known_block"; +const ETH_API_BEST_BLOCK: &str = "EthereumHeadersApi_best_block"; +const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; + +type Result = std::result::Result; +type GrandpaAuthorityList = Vec; + /// Substrate connection params. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SubstrateConnectionParams { /// Substrate RPC host. pub host: String, @@ -71,301 +81,177 @@ impl Default for SubstrateSigningParams { } /// Substrate client type. -pub struct Client { +pub struct SubstrateRpcClient { /// Substrate RPC client. - rpc_client: RawClient, + client: Client, /// Genesis block hash. - genesis_hash: Option, + genesis_hash: H256, } -/// All possible errors that can occur during interacting with Ethereum node. -#[derive(Debug)] -pub enum Error { - /// Request start failed. - StartRequestFailed(RequestError), - /// Error serializing request. - RequestSerialization(serde_json::Error), - /// Request not found (should never occur?). - RequestNotFound, - /// Failed to receive response. - ResponseRetrievalFailed(RawClientError), - /// Failed to parse response. - ResponseParseFailed, -} +impl SubstrateRpcClient { + /// Returns client that is able to call RPCs on Substrate node. + pub async fn new(params: SubstrateConnectionParams) -> Result { + let uri = format!("http://{}:{}", params.host, params.port); + let transport = HttpTransportClient::new(&uri); + let raw_client = RawClient::new(transport); + let client: Client = raw_client.into(); -impl MaybeConnectionError for Error { - fn is_connection_error(&self) -> bool { - match *self { - Error::StartRequestFailed(_) | Error::ResponseRetrievalFailed(_) => true, - _ => false, - } + let number: Number = Zero::zero(); + let genesis_hash = Substrate::chain_get_block_hash(&client, number).await?; + + Ok(Self { client, genesis_hash }) } } -/// Returns client that is able to call RPCs on Substrate node. -pub fn client(params: SubstrateConnectionParams) -> Client { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - Client { - rpc_client: RawClient::new(transport), - genesis_hash: None, +#[async_trait] +impl SubstrateRpc for SubstrateRpcClient { + async fn best_header(&self) -> Result { + Ok(Substrate::chain_get_header(&self.client, None).await?) } -} -/// Returns best Substrate header. -pub async fn best_header(client: Client) -> (Client, Result) { - call_rpc(client, "chain_getHeader", Params::None, rpc_returns_value).await -} + async fn get_block(&self, block_hash: Option) -> Result { + Ok(Substrate::chain_get_block(&self.client, block_hash).await?) + } -/// Returns Substrate header by hash. -pub async fn header_by_hash(client: Client, hash: Hash) -> (Client, Result) { - let hash = bail_on_arg_error!(to_value(hash).map_err(|e| Error::RequestSerialization(e)), client); - call_rpc(client, "chain_getHeader", Params::Array(vec![hash]), rpc_returns_value).await -} + async fn header_by_hash(&self, block_hash: Hash) -> Result { + Ok(Substrate::chain_get_header(&self.client, block_hash).await?) + } -/// Returns Substrate header by number. -pub async fn header_by_number(client: Client, number: Number) -> (Client, Result) { - let (client, hash) = bail_on_error!(block_hash_by_number(client, number).await); - header_by_hash(client, hash).await -} + async fn block_hash_by_number(&self, number: Number) -> Result { + Ok(Substrate::chain_get_block_hash(&self.client, number).await?) + } -/// Returns best Ethereum block that Substrate runtime knows of. -pub async fn best_ethereum_block(client: Client) -> (Client, Result) { - let (client, result) = call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("EthereumHeadersApi_best_block".into()), - serde_json::Value::String("0x".into()), - ]), - rpc_returns_encoded_value, - ) - .await; - (client, result.map(|(num, hash)| HeaderId(num, hash))) -} + async fn header_by_number(&self, block_number: Number) -> Result { + let block_hash = Self::block_hash_by_number(self, block_number).await?; + Ok(Self::header_by_hash(self, block_hash).await?) + } -/// Returns true if transactions receipts are required for Ethereum header submission. -pub async fn ethereum_receipts_required( - client: Client, - header: QueuedEthereumHeader, -) -> (Client, Result<(EthereumHeaderId, bool), Error>) { - let id = header.header().id(); - let header = into_substrate_ethereum_header(header.header()); - let encoded_header = bail_on_arg_error!( - to_value(Bytes(header.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, receipts_required) = call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("EthereumHeadersApi_is_import_requires_receipts".into()), - encoded_header, - ]), - rpc_returns_encoded_value, - ) - .await; - ( - client, - receipts_required.map(|receipts_required| (id, receipts_required)), - ) -} + async fn next_account_index(&self, account: node_primitives::AccountId) -> Result { + Ok(Substrate::system_account_next_index(&self.client, account).await?) + } -/// Returns true if Ethereum header is known to Substrate runtime. -pub async fn ethereum_header_known( - client: Client, - id: EthereumHeaderId, -) -> (Client, Result<(EthereumHeaderId, bool), Error>) { - // Substrate module could prune old headers. So this fn could return false even + async fn best_ethereum_block(&self) -> Result { + let call = ETH_API_BEST_BLOCK.to_string(); + let data = Bytes("0x".into()); + + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let decoded_response: (u64, sp_bridge_eth_poa::H256) = Decode::decode(&mut &encoded_response.0[..])?; + + let best_header_id = HeaderId(decoded_response.0, decoded_response.1); + Ok(best_header_id) + } + + async fn ethereum_receipts_required(&self, header: SubstrateEthereumHeader) -> Result { + let call = ETH_API_IMPORT_REQUIRES_RECEIPTS.to_string(); + let data = Bytes(header.encode()); + + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let receipts_required: bool = Decode::decode(&mut &encoded_response.0[..])?; + + Ok(receipts_required) + } + + // The Substrate module could prune old headers. So this function could return false even // if header is synced. And we'll mark corresponding Ethereum header as Orphan. // - // But when we'll read best header from Substrate next time, we will know that - // there's a better header => this Orphan will either be marked as synced, or + // But when we read the best header from Substrate next time, we will know that + // there's a better header. This Orphan will either be marked as synced, or // eventually pruned. - let encoded_id = bail_on_arg_error!( - to_value(Bytes(id.1.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, is_known_block) = call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("EthereumHeadersApi_is_known_block".into()), - encoded_id, - ]), - rpc_returns_encoded_value, - ) - .await; - (client, is_known_block.map(|is_known_block| (id, is_known_block))) -} + async fn ethereum_header_known(&self, header_id: EthereumHeaderId) -> Result { + let call = ETH_API_IS_KNOWN_BLOCK.to_string(); + let data = Bytes(header_id.1.encode()); -/// Submits Ethereum header to Substrate runtime. -pub async fn submit_ethereum_headers( - client: Client, - params: SubstrateSigningParams, - headers: Vec, - sign_transactions: bool, -) -> (Client, Result, Error>) { - match sign_transactions { - true => submit_signed_ethereum_headers(client, params, headers).await, - false => submit_unsigned_ethereum_headers(client, headers).await, + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let is_known_block: bool = Decode::decode(&mut &encoded_response.0[..])?; + + Ok(is_known_block) + } + + async fn submit_extrinsic(&self, transaction: Bytes) -> Result { + Ok(Substrate::author_submit_extrinsic(&self.client, transaction).await?) + } + + async fn grandpa_authorities_set(&self, block: Hash) -> Result { + let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); + let data = Bytes(block.as_bytes().to_vec()); + + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let authority_list = encoded_response.0; + + Ok(authority_list) } } -/// Submits signed Ethereum header to Substrate runtime. -pub async fn submit_signed_ethereum_headers( - client: Client, - params: SubstrateSigningParams, - headers: Vec, -) -> (Client, Result, Error>) { - let ids = headers.iter().map(|header| header.id()).collect(); - let (client, genesis_hash) = match client.genesis_hash { - Some(genesis_hash) => (client, genesis_hash), - None => { - let (mut client, genesis_hash) = bail_on_error!(block_hash_by_number(client, Zero::zero()).await); - client.genesis_hash = Some(genesis_hash); - (client, genesis_hash) +/// A trait for RPC calls which are used to submit Ethereum headers to a Substrate +/// runtime. These are typically calls which use a combination of other low-level RPC +/// calls. +#[async_trait] +pub trait SubmitEthereumHeaders: SubstrateRpc { + /// Submits Ethereum header to Substrate runtime. + async fn submit_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + sign_transactions: bool, + ) -> Result>; + + /// Submits signed Ethereum header to Substrate runtime. + async fn submit_signed_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + ) -> Result>; + + /// Submits unsigned Ethereum header to Substrate runtime. + async fn submit_unsigned_ethereum_headers( + &self, + headers: Vec, + ) -> Result>; +} + +#[async_trait] +impl SubmitEthereumHeaders for SubstrateRpcClient { + async fn submit_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + sign_transactions: bool, + ) -> Result> { + if sign_transactions { + self.submit_signed_ethereum_headers(params, headers).await + } else { + self.submit_unsigned_ethereum_headers(headers).await } - }; - let account_id = params.signer.public().as_array_ref().clone().into(); - let (client, nonce) = bail_on_error!(next_account_index(client, account_id).await); - - let transaction = create_signed_submit_transaction(headers, ¶ms.signer, nonce, genesis_hash); - let encoded_transaction = bail_on_arg_error!( - to_value(Bytes(transaction.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, _) = bail_on_error!( - call_rpc( - client, - "author_submitExtrinsic", - Params::Array(vec![encoded_transaction]), - |_| Ok(()), - ) - .await - ); - - (client, Ok(ids)) -} - -/// Submits unsigned Ethereum header to Substrate runtime. -pub async fn submit_unsigned_ethereum_headers( - mut client: Client, - headers: Vec, -) -> (Client, Result, Error>) { - let ids = headers.iter().map(|header| header.id()).collect(); - for header in headers { - let transaction = create_unsigned_submit_transaction(header); - - let encoded_transaction = bail_on_arg_error!( - to_value(Bytes(transaction.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (used_client, _) = bail_on_error!( - call_rpc( - client, - "author_submitExtrinsic", - Params::Array(vec![encoded_transaction]), - |_| Ok(()), - ) - .await - ); - - client = used_client; } - (client, Ok(ids)) -} + async fn submit_signed_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + ) -> Result> { + let ids = headers.iter().map(|header| header.id()).collect(); -/// Get GRANDPA justification for given block. -pub async fn grandpa_justification( - client: Client, - id: SubstrateHeaderId, -) -> (Client, Result<(SubstrateHeaderId, Option), Error>) { - let hash = bail_on_arg_error!(to_value(id.1).map_err(|e| Error::RequestSerialization(e)), client); - let (client, signed_block) = call_rpc(client, "chain_getBlock", Params::Array(vec![hash]), rpc_returns_value).await; - ( - client, - signed_block.map(|signed_block: SignedSubstrateBlock| (id, signed_block.justification)), - ) -} + let account_id = params.signer.public().as_array_ref().clone().into(); + let nonce = self.next_account_index(account_id).await?; -/// Get GRANDPA authorities set at given block. -pub async fn grandpa_authorities_set(client: Client, block: Hash) -> (Client, Result, Error>) { - let block = bail_on_arg_error!(to_value(block).map_err(|e| Error::RequestSerialization(e)), client); - call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("GrandpaApi_grandpa_authorities".into()), - block, - ]), - rpc_returns_bytes, - ) - .await -} + let transaction = create_signed_submit_transaction(headers, ¶ms.signer, nonce, self.genesis_hash); + let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?; -/// Get Substrate block hash by its number. -async fn block_hash_by_number(client: Client, number: Number) -> (Client, Result) { - let number = bail_on_arg_error!(to_value(number).map_err(|e| Error::RequestSerialization(e)), client); - call_rpc( - client, - "chain_getBlockHash", - Params::Array(vec![number]), - rpc_returns_value, - ) - .await -} - -/// Get substrate account nonce. -async fn next_account_index( - client: Client, - account: node_primitives::AccountId, -) -> (Client, Result) { - use sp_core::crypto::Ss58Codec; - - let account = bail_on_arg_error!( - to_value(account.to_ss58check()).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, index) = call_rpc(client, "system_accountNextIndex", Params::Array(vec![account]), |v| { - rpc_returns_value::(v) - }) - .await; - (client, index.map(|index| index as _)) -} - -/// Calls RPC on Substrate node that returns Bytes. -async fn call_rpc( - mut client: Client, - method: &'static str, - params: Params, - decode_value: impl Fn(Value) -> Result, -) -> (Client, Result) { - async fn do_call_rpc( - client: &mut Client, - method: &'static str, - params: Params, - decode_value: impl Fn(Value) -> Result, - ) -> Result { - let request_id = client - .rpc_client - .start_request(method, params) - .await - .map_err(Error::StartRequestFailed)?; - // WARN: if there'll be need for executing >1 request at a time, we should avoid - // calling request_by_id - let response = client - .rpc_client - .request_by_id(request_id) - .ok_or(Error::RequestNotFound)? - .await - .map_err(Error::ResponseRetrievalFailed)?; - decode_value(response) + Ok(ids) } - let result = do_call_rpc(&mut client, method, params, decode_value).await; - (client, result) + async fn submit_unsigned_ethereum_headers( + &self, + headers: Vec, + ) -> Result> { + let ids = headers.iter().map(|header| header.id()).collect(); + for header in headers { + let transaction = create_unsigned_submit_transaction(header); + let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?; + } + + Ok(ids) + } } /// Create signed Substrate transaction for submitting Ethereum headers. @@ -429,20 +315,3 @@ fn create_unsigned_submit_transaction(header: QueuedEthereumHeader) -> bridge_no bridge_node_runtime::UncheckedExtrinsic::new_unsigned(function) } - -/// When RPC method returns encoded value. -fn rpc_returns_encoded_value(value: Value) -> Result { - let encoded_response: Bytes = from_value(value).map_err(|_| Error::ResponseParseFailed)?; - Decode::decode(&mut &encoded_response.0[..]).map_err(|_| Error::ResponseParseFailed) -} - -/// When RPC method returns value. -fn rpc_returns_value(value: Value) -> Result { - from_value(value).map_err(|_| Error::ResponseParseFailed) -} - -/// When RPC method returns raw bytes. -fn rpc_returns_bytes(value: Value) -> Result, Error> { - let encoded_response: Bytes = from_value(value).map_err(|_| Error::ResponseParseFailed)?; - Ok(encoded_response.0) -} diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 7ca80f9672..ede547a9b5 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -16,18 +16,22 @@ //! Substrate -> Ethereum synchronization. -use crate::ethereum_client::{self, EthereumConnectionParams, EthereumSigningParams}; +use crate::ethereum_client::{ + EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient, EthereumSigningParams, +}; use crate::ethereum_types::Address; -use crate::substrate_client::{self, SubstrateConnectionParams}; +use crate::rpc::SubstrateRpc; +use crate::rpc_errors::RpcError; +use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient}; use crate::substrate_types::{ GrandpaJustification, Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline, }; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; -use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; +use crate::sync_loop::{SourceClient, TargetClient}; use crate::sync_types::SourceHeader; use async_trait::async_trait; -use futures::future::FutureExt; + use std::{collections::HashSet, time::Duration}; /// Interval at which we check new Substrate headers when we are synced/almost synced. @@ -42,7 +46,7 @@ const MAX_SUBMITTED_HEADERS: usize = 4; const PRUNE_DEPTH: u32 = 256; /// Substrate synchronization parameters. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SubstrateSyncParams { /// Ethereum connection params. pub eth: EthereumConnectionParams, @@ -84,170 +88,125 @@ impl Default for SubstrateSyncParams { /// Substrate client as headers source. struct SubstrateHeadersSource { /// Substrate node client. - client: substrate_client::Client, + client: SubstrateRpcClient, } -type SubstrateFutureOutput = OwnedSourceFutureOutput; +impl SubstrateHeadersSource { + fn new(client: SubstrateRpcClient) -> Self { + Self { client } + } +} #[async_trait] impl SourceClient for SubstrateHeadersSource { - type Error = substrate_client::Error; + type Error = RpcError; - async fn best_block_number(self) -> SubstrateFutureOutput { - substrate_client::best_header(self.client) - .map(|(client, result)| (SubstrateHeadersSource { client }, result.map(|header| header.number))) - .await + async fn best_block_number(&self) -> Result { + Ok(self.client.best_header().await?.number) } - async fn header_by_hash(self, hash: Hash) -> SubstrateFutureOutput
{ - substrate_client::header_by_hash(self.client, hash) - .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .await + async fn header_by_hash(&self, hash: Hash) -> Result { + self.client.header_by_hash(hash).await } - async fn header_by_number(self, number: Number) -> SubstrateFutureOutput
{ - substrate_client::header_by_number(self.client, number) - .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .await + async fn header_by_number(&self, number: Number) -> Result { + self.client.header_by_number(number).await } async fn header_completion( - self, + &self, id: SubstrateHeaderId, - ) -> SubstrateFutureOutput<(SubstrateHeaderId, Option)> { - substrate_client::grandpa_justification(self.client, id) - .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .await + ) -> Result<(SubstrateHeaderId, Option), Self::Error> { + let hash = id.1; + let signed_block = self.client.get_block(Some(hash)).await?; + let grandpa_justification = signed_block.justification; + + Ok((id, grandpa_justification)) } async fn header_extra( - self, + &self, id: SubstrateHeaderId, _header: QueuedSubstrateHeader, - ) -> SubstrateFutureOutput<(SubstrateHeaderId, ())> { - (self, Ok((id, ()))) + ) -> Result<(SubstrateHeaderId, ()), Self::Error> { + Ok((id, ())) } } /// Ethereum client as Substrate headers target. struct EthereumHeadersTarget { /// Ethereum node client. - client: ethereum_client::Client, + client: EthereumRpcClient, /// Bridge contract address. contract: Address, /// Ethereum signing params. sign_params: EthereumSigningParams, } -type EthereumFutureOutput = OwnedTargetFutureOutput; +impl EthereumHeadersTarget { + fn new(client: EthereumRpcClient, contract: Address, sign_params: EthereumSigningParams) -> Self { + Self { + client, + contract, + sign_params, + } + } +} #[async_trait] impl TargetClient for EthereumHeadersTarget { - type Error = ethereum_client::Error; + type Error = RpcError; - async fn best_header_id(self) -> EthereumFutureOutput { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::best_substrate_block(self.client, contract) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) + async fn best_header_id(&self) -> Result { + self.client.best_substrate_block(self.contract).await + } + + async fn is_known_header(&self, id: SubstrateHeaderId) -> Result<(SubstrateHeaderId, bool), Self::Error> { + self.client.substrate_header_known(self.contract, id).await + } + + async fn submit_headers(&self, headers: Vec) -> Result, Self::Error> { + self.client + .submit_substrate_headers(self.sign_params.clone(), self.contract, headers) .await } - async fn is_known_header(self, id: SubstrateHeaderId) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::substrate_header_known(self.client, contract, id) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) - .await - } - - async fn submit_headers(self, headers: Vec) -> EthereumFutureOutput> { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::submit_substrate_headers(self.client, sign_params.clone(), contract, headers) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) - .await - } - - async fn incomplete_headers_ids(self) -> EthereumFutureOutput> { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::incomplete_substrate_headers(self.client, contract) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) - .await + async fn incomplete_headers_ids(&self) -> Result, Self::Error> { + self.client.incomplete_substrate_headers(self.contract).await } async fn complete_header( - self, + &self, id: SubstrateHeaderId, completion: GrandpaJustification, - ) -> EthereumFutureOutput { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::complete_substrate_header(self.client, sign_params.clone(), contract, id, completion) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) + ) -> Result { + self.client + .complete_substrate_header(self.sign_params.clone(), self.contract, id, completion) .await } - async fn requires_extra(self, header: QueuedSubstrateHeader) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> { - (self, Ok((header.header().id(), false))) + async fn requires_extra(&self, header: QueuedSubstrateHeader) -> Result<(SubstrateHeaderId, bool), Self::Error> { + Ok((header.header().id(), false)) } } /// Run Substrate headers synchronization. -pub fn run(params: SubstrateSyncParams) { - let eth_client = ethereum_client::client(params.eth); - let sub_client = substrate_client::client(params.sub); +pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { + let sub_params = params.clone(); + + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = async_std::task::block_on(async { SubstrateRpcClient::new(sub_params.sub).await })?; + + let target = EthereumHeadersTarget::new(eth_client, params.eth_contract_address, params.eth_sign); + let source = SubstrateHeadersSource::new(sub_client); crate::sync_loop::run( - SubstrateHeadersSource { client: sub_client }, + source, SUBSTRATE_TICK_INTERVAL, - EthereumHeadersTarget { - client: eth_client, - contract: params.eth_contract_address, - sign_params: params.eth_sign, - }, + target, ETHEREUM_TICK_INTERVAL, params.sync_params, ); + + Ok(()) } diff --git a/bridges/relays/ethereum/src/sync.rs b/bridges/relays/ethereum/src/sync.rs index e1fc372e14..7f15865cc5 100644 --- a/bridges/relays/ethereum/src/sync.rs +++ b/bridges/relays/ethereum/src/sync.rs @@ -19,7 +19,7 @@ use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeade use num_traits::{One, Saturating}; /// Common sync params. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HeadersSyncParams { /// Maximal number of ethereum headers to pre-download. pub max_future_headers_to_download: usize, @@ -37,7 +37,7 @@ pub struct HeadersSyncParams { } /// Target transaction mode. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum TargetTransactionMode { /// Submit new headers using signed transactions. Signed, diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index 978db60359..28ee40650f 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -43,11 +43,6 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); /// reconnection again. const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); -/// Type alias for all SourceClient futures. -pub type OwnedSourceFutureOutput = (Client, Result>::Error>); -/// Type alias for all TargetClient futures. -pub type OwnedTargetFutureOutput = (Client, Result>::Error>); - /// Source client trait. #[async_trait] pub trait SourceClient: Sized { @@ -55,26 +50,26 @@ pub trait SourceClient: Sized { type Error: std::fmt::Debug + MaybeConnectionError; /// Get best block number. - async fn best_block_number(self) -> OwnedSourceFutureOutput; + async fn best_block_number(&self) -> Result; /// Get header by hash. - async fn header_by_hash(self, hash: P::Hash) -> OwnedSourceFutureOutput; + async fn header_by_hash(&self, hash: P::Hash) -> Result; /// Get canonical header by number. - async fn header_by_number(self, number: P::Number) -> OwnedSourceFutureOutput; + async fn header_by_number(&self, number: P::Number) -> Result; /// Get completion data by header hash. async fn header_completion( - self, + &self, id: HeaderId, - ) -> OwnedSourceFutureOutput, Option)>; + ) -> Result<(HeaderId, Option), Self::Error>; /// Get extra data by header hash. async fn header_extra( - self, + &self, id: HeaderId, header: QueuedHeader

, - ) -> OwnedSourceFutureOutput, P::Extra)>; + ) -> Result<(HeaderId, P::Extra), Self::Error>; } /// Target client trait. @@ -84,35 +79,35 @@ pub trait TargetClient: Sized { type Error: std::fmt::Debug + MaybeConnectionError; /// Returns ID of best header known to the target node. - async fn best_header_id(self) -> OwnedTargetFutureOutput>; + async fn best_header_id(&self) -> Result, Self::Error>; /// Returns true if header is known to the target node. async fn is_known_header( - self, + &self, id: HeaderId, - ) -> OwnedTargetFutureOutput, bool)>; + ) -> Result<(HeaderId, bool), Self::Error>; /// Submit headers. async fn submit_headers( - self, + &self, headers: Vec>, - ) -> OwnedTargetFutureOutput>>; + ) -> Result>, Self::Error>; /// Returns ID of headers that require to be 'completed' before children can be submitted. - async fn incomplete_headers_ids(self) -> OwnedTargetFutureOutput>>; + async fn incomplete_headers_ids(&self) -> Result>, Self::Error>; /// Submit completion data for header. async fn complete_header( - self, + &self, id: HeaderId, completion: P::Completion, - ) -> OwnedTargetFutureOutput>; + ) -> Result, Self::Error>; /// Returns true if header requires extra data to be submitted. async fn requires_extra( - self, + &self, header: QueuedHeader

, - ) -> OwnedTargetFutureOutput, bool)>; + ) -> Result<(HeaderId, bool), Self::Error>; } /// Run headers synchronization. @@ -131,7 +126,7 @@ pub fn run( let mut stall_countdown = None; let mut last_update_time = Instant::now(); - let mut source_maybe_client = None; + let mut source_client_is_online = false; let mut source_best_block_number_required = false; let source_best_block_number_future = source_client.best_block_number().fuse(); let source_new_header_future = futures::future::Fuse::terminated(); @@ -141,7 +136,7 @@ pub fn run( let source_go_offline_future = futures::future::Fuse::terminated(); let source_tick_stream = interval(source_tick).fuse(); - let mut target_maybe_client = None; + let mut target_client_is_online = false; let mut target_best_block_required = false; let mut target_incomplete_headers_required = true; let target_best_block_future = target_client.best_header_id().fuse(); @@ -173,77 +168,65 @@ pub fn run( loop { futures::select! { - (source_client, source_best_block_number) = source_best_block_number_future => { + source_best_block_number = source_best_block_number_future => { source_best_block_number_required = false; - process_future_result( - &mut source_maybe_client, - source_client, + source_client_is_online = process_future_result( source_best_block_number, |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving best header number from {}", P::SOURCE_NAME), ); }, - (source_client, source_new_header) = source_new_header_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_new_header = source_new_header_future => { + source_client_is_online = process_future_result( source_new_header, |source_new_header| sync.headers_mut().header_response(source_new_header), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving header from {} node", P::SOURCE_NAME), ); }, - (source_client, source_orphan_header) = source_orphan_header_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_orphan_header = source_orphan_header_future => { + source_client_is_online = process_future_result( source_orphan_header, |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), ); }, - (source_client, source_extra) = source_extra_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_extra = source_extra_future => { + source_client_is_online = process_future_result( source_extra, |(header, extra)| sync.headers_mut().extra_response(&header, extra), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), ); }, - (source_client, source_completion) = source_completion_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_completion = source_completion_future => { + source_client_is_online = process_future_result( source_completion, |(header, completion)| sync.headers_mut().completion_response(&header, completion), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), ); }, source_client = source_go_offline_future => { - source_maybe_client = Some(source_client); + source_client_is_online = true; }, _ = source_tick_stream.next() => { if sync.is_almost_synced() { source_best_block_number_required = true; } }, - (target_client, target_best_block) = target_best_block_future => { + target_best_block = target_best_block_future => { target_best_block_required = false; - process_future_result( - &mut target_maybe_client, - target_client, + target_client_is_online = process_future_result( target_best_block, |target_best_block| { let head_updated = sync.target_best_header_response(target_best_block); @@ -279,73 +262,63 @@ pub fn run( } }, &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving best known header from {} node", P::TARGET_NAME), ); }, - (target_client, incomplete_headers_ids) = target_incomplete_headers_future => { + incomplete_headers_ids = target_incomplete_headers_future => { target_incomplete_headers_required = false; - process_future_result( - &mut target_maybe_client, - target_client, + target_client_is_online = process_future_result( incomplete_headers_ids, |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), ); }, - (target_client, target_existence_status) = target_existence_status_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_existence_status = target_existence_status_future => { + target_client_is_online = process_future_result( target_existence_status, |(target_header, target_existence_status)| sync .headers_mut() .maybe_orphan_response(&target_header, target_existence_status), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving existence status from {} node", P::TARGET_NAME), ); }, - (target_client, target_submit_header_result) = target_submit_header_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_submit_header_result = target_submit_header_future => { + target_client_is_online = process_future_result( target_submit_header_result, |submitted_headers| sync.headers_mut().headers_submitted(submitted_headers), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error submitting headers to {} node", P::TARGET_NAME), ); }, - (target_client, target_complete_header_result) = target_complete_header_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_complete_header_result = target_complete_header_future => { + target_client_is_online = process_future_result( target_complete_header_result, |completed_header| sync.headers_mut().header_completed(&completed_header), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error completing headers at {}", P::TARGET_NAME), ); }, - (target_client, target_extra_check_result) = target_extra_check_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_extra_check_result = target_extra_check_future => { + target_client_is_online = process_future_result( target_extra_check_result, |(header, extra_check_result)| sync .headers_mut() .maybe_extra_response(&header, extra_check_result), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), ); }, target_client = target_go_offline_future => { - target_maybe_client = Some(target_client); + target_client_is_online = true; }, _ = target_tick_stream.next() => { target_best_block_required = true; @@ -356,15 +329,35 @@ pub fn run( // print progress progress_context = print_sync_progress(progress_context, &sync); - // if target client is available: wait, or call required target methods - if let Some(target_client) = target_maybe_client.take() { - // the priority is to: - // 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely; - // 2) get incomplete headers - it stops us from submitting new blocks + we call it rarely; - // 3) complete headers - it stops us from submitting new blocks; - // 4) check if we need extra data from source - it stops us from downloading/submitting new blocks; - // 5) check existence - it stops us from submitting new blocks; - // 6) submit header + // If the target client is accepting requests we update the requests that + // we want it to run + if target_client_is_online { + // NOTE: Is is important to reset this so that we only have one + // request being processed by the client at a time. This prevents + // race conditions like receiving two transactions with the same + // nonce from the client. + target_client_is_online = false; + + // The following is how we prioritize requests: + // + // 1. Get best block + // - Stops us from downloading or submitting new blocks + // - Only called rarely + // + // 2. Get incomplete headers + // - Stops us from submitting new blocks + // - Only called rarely + // + // 3. Get complete headers + // - Stops us from submitting new blocks + // + // 4. Check if we need extra data from source + // - Stops us from downloading or submitting new blocks + // + // 5. Check existence of header + // - Stops us from submitting new blocks + // + // 6. Submit header if target_best_block_required { log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); @@ -424,18 +417,35 @@ pub fn run( stall_countdown = Some(Instant::now()); } } else { - target_maybe_client = Some(target_client); + target_client_is_online = true; } } - // if source client is available: wait, or call required source methods - if let Some(source_client) = source_maybe_client.take() { - // the priority is to: - // 1) get best block - it stops us from downloading new blocks + we call it rarely; - // 2) download completion data - it stops us from submitting new blocks; - // 3) download extra data - it stops us from submitting new blocks; - // 4) download missing headers - it stops us from downloading/submitting new blocks; - // 5) downloading new headers + // If the source client is accepting requests we update the requests that + // we want it to run + if source_client_is_online { + // NOTE: Is is important to reset this so that we only have one + // request being processed by the client at a time. This prevents + // race conditions like receiving two transactions with the same + // nonce from the client. + source_client_is_online = false; + + // The following is how we prioritize requests: + // + // 1. Get best block + // - Stops us from downloading or submitting new blocks + // - Only called rarely + // + // 2. Download completion data + // - Stops us from submitting new blocks + // + // 3. Download extra data + // - Stops us from submitting new blocks + // + // 4. Download missing headers + // - Stops us from downloading or submitting new blocks + // + // 5. Downloading new headers if source_best_block_number_required { log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); @@ -488,55 +498,56 @@ pub fn run( source_new_header_future.set(source_client.header_by_number(id).fuse()); } else { - source_maybe_client = Some(source_client); + source_client_is_online = true; } } } }); } -/// Future that resolves into given value after given timeout. -async fn delay(timeout: Duration, retval: T) -> T { - async_std::task::sleep(timeout).await; - retval -} - /// Stream that emits item every `timeout_ms` milliseconds. fn interval(timeout: Duration) -> impl futures::Stream { futures::stream::unfold((), move |_| async move { - delay(timeout, ()).await; + async_std::task::sleep(timeout).await; Some(((), ())) }) } -/// Process result of the future that may have been caused by connection failure. -fn process_future_result( - maybe_client: &mut Option, - client: TClient, +/// Process result of the future from a client. +/// +/// Returns whether or not the client we're interacting with is online. In this context +/// what online means is that the client is currently not handling any other requests +/// that we've previously sent. +fn process_future_result( result: Result, on_success: impl FnOnce(TResult), go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse>, - go_offline: impl FnOnce(TClient) -> TGoOfflineFuture, + go_offline: impl FnOnce() -> TGoOfflineFuture, error_pattern: impl FnOnce() -> String, -) where +) -> bool +where TError: std::fmt::Debug + MaybeConnectionError, TGoOfflineFuture: FutureExt, { + let mut client_is_online = false; + match result { Ok(result) => { - *maybe_client = Some(client); on_success(result); + client_is_online = true } Err(error) => { if error.is_connection_error() { - go_offline_future.set(go_offline(client).fuse()); + go_offline_future.set(go_offline().fuse()); } else { - *maybe_client = Some(client); + client_is_online = true } log::error!(target: "bridge", "{}: {:?}", error_pattern(), error); } } + + client_is_online } /// Print synchronization progress.