From 63d6fc436ab1861833cbd056b2603093e2200dfe Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 20 Jul 2021 15:18:51 +0300 Subject: [PATCH] use tokio reactor to execute jsonrpsee futures (#1061) --- .../relays/bin-ethereum/src/rialto_client.rs | 10 +- .../bin-substrate/src/cli/send_message.rs | 5 +- .../bin-substrate/src/finality_target.rs | 7 +- .../bin-substrate/src/headers_initialize.rs | 10 +- .../bin-substrate/src/messages_source.rs | 4 +- .../bin-substrate/src/messages_target.rs | 11 +- bridges/relays/client-ethereum/Cargo.toml | 1 + bridges/relays/client-ethereum/src/client.rs | 168 +++++++----- bridges/relays/client-ethereum/src/error.rs | 18 ++ bridges/relays/client-substrate/Cargo.toml | 1 + bridges/relays/client-substrate/src/client.rs | 252 ++++++++++++------ bridges/relays/client-substrate/src/error.rs | 16 ++ .../client-substrate/src/finality_source.rs | 2 +- 13 files changed, 337 insertions(+), 168 deletions(-) diff --git a/bridges/relays/bin-ethereum/src/rialto_client.rs b/bridges/relays/bin-ethereum/src/rialto_client.rs index d9c0f265cb..1ec1e7df79 100644 --- a/bridges/relays/bin-ethereum/src/rialto_client.rs +++ b/bridges/relays/bin-ethereum/src/rialto_client.rs @@ -155,11 +155,12 @@ impl SubmitEthereumHeaders for SubstrateClient { headers: Vec, ) -> SubmittedHeaders { let ids = headers.iter().map(|header| header.id()).collect(); + let genesis_hash = *self.genesis_hash(); let submission_result = async { - self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), |transaction_nonce| { + self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), move |transaction_nonce| { Bytes( Rialto::sign_transaction( - *self.genesis_hash(), + genesis_hash, ¶ms, transaction_nonce, instance.build_signed_header_call(headers), @@ -257,10 +258,11 @@ impl SubmitEthereumExchangeTransactionProof for SubstrateClient { instance: Arc, proof: rialto_runtime::exchange::EthereumTransactionInclusionProof, ) -> RpcResult<()> { - self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), |transaction_nonce| { + let genesis_hash = *self.genesis_hash(); + self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), move |transaction_nonce| { Bytes( Rialto::sign_transaction( - *self.genesis_hash(), + genesis_hash, ¶ms, transaction_nonce, instance.build_currency_exchange_call(proof), diff --git a/bridges/relays/bin-substrate/src/cli/send_message.rs b/bridges/relays/bin-substrate/src/cli/send_message.rs index 6ec955ed20..2834474579 100644 --- a/bridges/relays/bin-substrate/src/cli/send_message.rs +++ b/bridges/relays/bin-substrate/src/cli/send_message.rs @@ -176,10 +176,11 @@ impl SendMessage { fee, })?; + let source_genesis_hash = *source_client.genesis_hash(); source_client - .submit_signed_extrinsic(source_sign.public().into(), |transaction_nonce| { + .submit_signed_extrinsic(source_sign.public().into(), move |transaction_nonce| { let signed_source_call = Source::sign_transaction( - *source_client.genesis_hash(), + source_genesis_hash, &source_sign, transaction_nonce, send_message_call, diff --git a/bridges/relays/bin-substrate/src/finality_target.rs b/bridges/relays/bin-substrate/src/finality_target.rs index ffa10cabac..6c4c384d11 100644 --- a/bridges/relays/bin-substrate/src/finality_target.rs +++ b/bridges/relays/bin-substrate/src/finality_target.rs @@ -80,10 +80,11 @@ where } async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), SubstrateError> { + let transactions_author = self.pipeline.transactions_author(); + let pipeline = self.pipeline.clone(); self.client - .submit_signed_extrinsic(self.pipeline.transactions_author(), move |transaction_nonce| { - self.pipeline - .make_submit_finality_proof_transaction(transaction_nonce, header, proof) + .submit_signed_extrinsic(transactions_author, move |transaction_nonce| { + pipeline.make_submit_finality_proof_transaction(transaction_nonce, header, proof) }) .await .map(drop) diff --git a/bridges/relays/bin-substrate/src/headers_initialize.rs b/bridges/relays/bin-substrate/src/headers_initialize.rs index 39d8ae7069..d3a78028e0 100644 --- a/bridges/relays/bin-substrate/src/headers_initialize.rs +++ b/bridges/relays/bin-substrate/src/headers_initialize.rs @@ -39,7 +39,9 @@ pub async fn initialize( source_client: Client, target_client: Client, target_transactions_signer: TargetChain::AccountId, - prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData) -> Bytes, + prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData) -> Bytes + + Send + + 'static, ) { let result = do_initialize( source_client, @@ -72,7 +74,9 @@ async fn do_initialize( source_client: Client, target_client: Client, target_transactions_signer: TargetChain::AccountId, - prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData) -> Bytes, + prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData) -> Bytes + + Send + + 'static, ) -> Result { let initialization_data = prepare_initialization_data(source_client).await?; log::info!( @@ -102,7 +106,7 @@ async fn prepare_initialization_data( // But now there are problems with this approach - `CurrentSetId` may return invalid value. So here // we're waiting for the next justification, read the authorities set and then try to figure out // the set id with bruteforce. - let mut justifications = source_client + let justifications = source_client .subscribe_justifications() .await .map_err(|err| format!("Failed to subscribe to {} justifications: {:?}", SourceChain::NAME, err))?; diff --git a/bridges/relays/bin-substrate/src/messages_source.rs b/bridges/relays/bin-substrate/src/messages_source.rs index 2271d3e4f1..6792848e4a 100644 --- a/bridges/relays/bin-substrate/src/messages_source.rs +++ b/bridges/relays/bin-substrate/src/messages_source.rs @@ -233,10 +233,10 @@ where generated_at_block: TargetHeaderIdOf

, proof: P::MessagesReceivingProof, ) -> Result<(), SubstrateError> { + let lane = self.lane.clone(); self.client .submit_signed_extrinsic(self.lane.source_transactions_author(), move |transaction_nonce| { - self.lane - .make_messages_receiving_proof_transaction(transaction_nonce, generated_at_block, proof) + lane.make_messages_receiving_proof_transaction(transaction_nonce, generated_at_block, proof) }) .await?; Ok(()) diff --git a/bridges/relays/bin-substrate/src/messages_target.rs b/bridges/relays/bin-substrate/src/messages_target.rs index bbee261c6d..666bfc48c2 100644 --- a/bridges/relays/bin-substrate/src/messages_target.rs +++ b/bridges/relays/bin-substrate/src/messages_target.rs @@ -219,14 +219,11 @@ where nonces: RangeInclusive, proof: P::MessagesProof, ) -> Result, SubstrateError> { + let lane = self.lane.clone(); + let nonces_clone = nonces.clone(); self.client - .submit_signed_extrinsic(self.lane.target_transactions_author(), |transaction_nonce| { - self.lane.make_messages_delivery_transaction( - transaction_nonce, - generated_at_header, - nonces.clone(), - proof, - ) + .submit_signed_extrinsic(self.lane.target_transactions_author(), move |transaction_nonce| { + lane.make_messages_delivery_transaction(transaction_nonce, generated_at_header, nonces_clone, proof) }) .await?; Ok(nonces) diff --git a/bridges/relays/client-ethereum/Cargo.toml b/bridges/relays/client-ethereum/Cargo.toml index 8ed3cc76f0..53add5cf58 100644 --- a/bridges/relays/client-ethereum/Cargo.toml +++ b/bridges/relays/client-ethereum/Cargo.toml @@ -16,4 +16,5 @@ jsonrpsee-ws-client = "0.2" libsecp256k1 = { version = "0.3.4", default-features = false, features = ["hmac"] } log = "0.4.11" relay-utils = { path = "../utils" } +tokio = "1.8" web3 = { git = "https://github.com/svyatonik/rust-web3.git", branch = "bump-deps" } diff --git a/bridges/relays/client-ethereum/src/client.rs b/bridges/relays/client-ethereum/src/client.rs index 71dac5df6d..1e5faa4868 100644 --- a/bridges/relays/client-ethereum/src/client.rs +++ b/bridges/relays/client-ethereum/src/client.rs @@ -23,7 +23,7 @@ use crate::{ConnectionParams, Error, Result}; use jsonrpsee_ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder}; use relay_utils::relay_loop::RECONNECT_DELAY; -use std::sync::Arc; +use std::{future::Future, sync::Arc}; /// Number of headers missing from the Ethereum node for us to consider node not synced. const MAJOR_SYNC_BLOCKS: u64 = 5; @@ -31,6 +31,7 @@ const MAJOR_SYNC_BLOCKS: u64 = 5; /// The client used to interact with an Ethereum node through RPC. #[derive(Clone)] pub struct Client { + tokio: Arc, params: ConnectionParams, client: Arc, } @@ -59,22 +60,23 @@ impl Client { /// Try to connect to Ethereum node. Returns Ethereum RPC client if connection has been established /// or error otherwise. pub async fn try_connect(params: ConnectionParams) -> Result { - Ok(Self { - client: Self::build_client(¶ms).await?, - params, - }) + let (tokio, client) = Self::build_client(¶ms).await?; + Ok(Self { tokio, client, params }) } /// Build client to use in connection. - async fn build_client(params: &ConnectionParams) -> Result> { + async fn build_client(params: &ConnectionParams) -> Result<(Arc, Arc)> { + let tokio = tokio::runtime::Runtime::new()?; let uri = format!("ws://{}:{}", params.host, params.port); - let client = RpcClientBuilder::default().build(&uri).await?; - Ok(Arc::new(client)) + let client = tokio + .spawn(async move { RpcClientBuilder::default().build(&uri).await }) + .await??; + Ok((Arc::new(tokio), Arc::new(client))) } /// Reopen client connection. pub async fn reconnect(&mut self) -> Result<()> { - self.client = Self::build_client(&self.params).await?; + self.client = Self::build_client(&self.params).await?.1; Ok(()) } } @@ -82,113 +84,153 @@ impl Client { impl Client { /// Returns true if client is connected to at least one peer and is in synced state. pub async fn ensure_synced(&self) -> Result<()> { - match Ethereum::syncing(&*self.client).await? { - SyncState::NotSyncing => Ok(()), - SyncState::Syncing(syncing) => { - let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block); - if missing_headers > MAJOR_SYNC_BLOCKS.into() { - return Err(Error::ClientNotSynced(missing_headers)); - } + self.jsonrpsee_execute(move |client| async move { + match Ethereum::syncing(&*client).await? { + SyncState::NotSyncing => Ok(()), + SyncState::Syncing(syncing) => { + let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block); + if missing_headers > MAJOR_SYNC_BLOCKS.into() { + return Err(Error::ClientNotSynced(missing_headers)); + } - Ok(()) + Ok(()) + } } - } + }) + .await } /// Estimate gas usage for the given call. pub async fn estimate_gas(&self, call_request: CallRequest) -> Result { - Ok(Ethereum::estimate_gas(&*self.client, call_request).await?) + self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::estimate_gas(&*client, call_request).await?) }) + .await } /// Retrieve number of the best known block from the Ethereum node. pub async fn best_block_number(&self) -> Result { - Ok(Ethereum::block_number(&*self.client).await?.as_u64()) + self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::block_number(&*client).await?.as_u64()) }) + .await } /// Retrieve number of the best known block from the Ethereum node. pub async fn header_by_number(&self, block_number: u64) -> Result

{ - let get_full_tx_objects = false; - let header = Ethereum::get_block_by_number(&*self.client, block_number, get_full_tx_objects).await?; - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(Error::IncompleteHeader), - } + self.jsonrpsee_execute(move |client| async move { + let get_full_tx_objects = false; + let header = Ethereum::get_block_by_number(&*client, block_number, get_full_tx_objects).await?; + match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { + true => Ok(header), + false => Err(Error::IncompleteHeader), + } + }) + .await } /// Retrieve block header by its hash from Ethereum node. pub async fn header_by_hash(&self, hash: H256) -> Result
{ - let get_full_tx_objects = false; - let header = Ethereum::get_block_by_hash(&*self.client, hash, get_full_tx_objects).await?; - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(Error::IncompleteHeader), - } + self.jsonrpsee_execute(move |client| async move { + let get_full_tx_objects = false; + let header = Ethereum::get_block_by_hash(&*client, hash, get_full_tx_objects).await?; + match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { + true => Ok(header), + false => Err(Error::IncompleteHeader), + } + }) + .await } /// Retrieve block header and its transactions by its number from Ethereum node. pub async fn header_by_number_with_transactions(&self, number: u64) -> Result { - let get_full_tx_objects = true; - let header = - Ethereum::get_block_by_number_with_transactions(&*self.client, number, get_full_tx_objects).await?; + self.jsonrpsee_execute(move |client| async move { + let get_full_tx_objects = true; + let header = Ethereum::get_block_by_number_with_transactions(&*client, number, get_full_tx_objects).await?; - let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); - if !is_complete_header { - return Err(Error::IncompleteHeader); - } + let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); + if !is_complete_header { + return Err(Error::IncompleteHeader); + } - let is_complete_transactions = header.transactions.iter().all(|tx| tx.raw.is_some()); - if !is_complete_transactions { - return Err(Error::IncompleteTransaction); - } + let is_complete_transactions = header.transactions.iter().all(|tx| tx.raw.is_some()); + if !is_complete_transactions { + return Err(Error::IncompleteTransaction); + } - Ok(header) + Ok(header) + }) + .await } /// Retrieve block header and its transactions by its hash from Ethereum node. pub async fn header_by_hash_with_transactions(&self, hash: H256) -> Result { - let get_full_tx_objects = true; - let header = Ethereum::get_block_by_hash_with_transactions(&*self.client, hash, get_full_tx_objects).await?; + self.jsonrpsee_execute(move |client| async move { + let get_full_tx_objects = true; + let header = Ethereum::get_block_by_hash_with_transactions(&*client, hash, get_full_tx_objects).await?; - let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); - if !is_complete_header { - return Err(Error::IncompleteHeader); - } + let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); + if !is_complete_header { + return Err(Error::IncompleteHeader); + } - let is_complete_transactions = header.transactions.iter().all(|tx| tx.raw.is_some()); - if !is_complete_transactions { - return Err(Error::IncompleteTransaction); - } + let is_complete_transactions = header.transactions.iter().all(|tx| tx.raw.is_some()); + if !is_complete_transactions { + return Err(Error::IncompleteTransaction); + } - Ok(header) + Ok(header) + }) + .await } /// Retrieve transaction by its hash from Ethereum node. pub async fn transaction_by_hash(&self, hash: H256) -> Result> { - Ok(Ethereum::transaction_by_hash(&*self.client, hash).await?) + self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::transaction_by_hash(&*client, hash).await?) }) + .await } /// Retrieve transaction receipt by transaction hash. pub async fn transaction_receipt(&self, transaction_hash: H256) -> Result { - Ok(Ethereum::get_transaction_receipt(&*self.client, transaction_hash).await?) + self.jsonrpsee_execute(move |client| async move { + Ok(Ethereum::get_transaction_receipt(&*client, transaction_hash).await?) + }) + .await } /// Get the nonce of the given account. pub async fn account_nonce(&self, address: Address) -> Result { - Ok(Ethereum::get_transaction_count(&*self.client, address).await?) + self.jsonrpsee_execute( + move |client| async move { Ok(Ethereum::get_transaction_count(&*client, address).await?) }, + ) + .await } /// Submit an Ethereum transaction. /// /// The transaction must already be signed before sending it through this method. pub async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result { - let transaction = Bytes(signed_raw_tx); - let tx_hash = Ethereum::submit_transaction(&*self.client, transaction).await?; - log::trace!(target: "bridge", "Sent transaction to Ethereum node: {:?}", tx_hash); - Ok(tx_hash) + self.jsonrpsee_execute(move |client| async move { + let transaction = Bytes(signed_raw_tx); + let tx_hash = Ethereum::submit_transaction(&*client, transaction).await?; + log::trace!(target: "bridge", "Sent transaction to Ethereum node: {:?}", tx_hash); + Ok(tx_hash) + }) + .await } /// Call Ethereum smart contract. pub async fn eth_call(&self, call_transaction: CallRequest) -> Result { - Ok(Ethereum::call(&*self.client, call_transaction).await?) + self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::call(&*client, call_transaction).await?) }) + .await + } + + /// Execute jsonrpsee future in tokio context. + async fn jsonrpsee_execute(&self, make_jsonrpsee_future: MF) -> Result + where + MF: FnOnce(Arc) -> F + Send + 'static, + F: Future> + Send, + T: Send + 'static, + { + let client = self.client.clone(); + self.tokio + .spawn(async move { make_jsonrpsee_future(client).await }) + .await? } } diff --git a/bridges/relays/client-ethereum/src/error.rs b/bridges/relays/client-ethereum/src/error.rs index e01cdf1bf5..3650fd2fd9 100644 --- a/bridges/relays/client-ethereum/src/error.rs +++ b/bridges/relays/client-ethereum/src/error.rs @@ -28,6 +28,8 @@ pub type Result = std::result::Result; /// an Ethereum node through RPC. #[derive(Debug)] pub enum Error { + /// IO error. + Io(std::io::Error), /// An error that can occur when making an HTTP request to /// an JSON-RPC client. RpcError(RpcError), @@ -45,6 +47,8 @@ pub enum Error { /// The client we're connected to is not synced, so we can't rely on its state. Contains /// number of unsynced headers. ClientNotSynced(U256), + /// Custom logic error. + Custom(String), } impl From for Error { @@ -53,6 +57,18 @@ impl From for Error { } } +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error::Io(error) + } +} + +impl From for Error { + fn from(error: tokio::task::JoinError) -> Self { + Error::Custom(format!("Failed to wait tokio task: {}", error)) + } +} + impl MaybeConnectionError for Error { fn is_connection_error(&self) -> bool { matches!( @@ -68,6 +84,7 @@ impl MaybeConnectionError for Error { impl ToString for Error { fn to_string(&self) -> String { match self { + Self::Io(e) => e.to_string(), Self::RpcError(e) => e.to_string(), Self::ResponseParseFailed(e) => e.to_string(), Self::IncompleteHeader => { @@ -80,6 +97,7 @@ impl ToString for Error { Self::ClientNotSynced(missing_headers) => { format!("Ethereum client is not synced: syncing {} headers", missing_headers) } + Self::Custom(ref e) => e.clone(), } } } diff --git a/bridges/relays/client-substrate/Cargo.toml b/bridges/relays/client-substrate/Cargo.toml index 7c3e381259..9b8a6f21e3 100644 --- a/bridges/relays/client-substrate/Cargo.toml +++ b/bridges/relays/client-substrate/Cargo.toml @@ -14,6 +14,7 @@ jsonrpsee-ws-client = "0.2" log = "0.4.11" num-traits = "0.2" rand = "0.7" +tokio = "1.8" # Bridge dependencies diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index 87917c022c..6a486a622e 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -32,13 +32,13 @@ use relay_utils::relay_loop::RECONNECT_DELAY; use sp_core::{storage::StorageKey, Bytes}; use sp_trie::StorageProof; use sp_version::RuntimeVersion; -use std::convert::TryFrom; +use std::{convert::TryFrom, future::Future}; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; /// Opaque justifications subscription type. -pub type JustificationsSubscription = Subscription; +pub struct JustificationsSubscription(tokio::runtime::Handle, Arc>>); /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec; @@ -47,6 +47,8 @@ pub type OpaqueGrandpaAuthoritiesSet = Vec; /// /// Cloning `Client` is a cheap operation. pub struct Client { + /// Tokio runtime handle. + tokio: Arc, /// Client connection params. params: ConnectionParams, /// Substrate RPC client. @@ -62,6 +64,7 @@ pub struct Client { impl Clone for Client { fn clone(&self) -> Self { Client { + tokio: self.tokio.clone(), params: self.params.clone(), client: self.client.clone(), genesis_hash: self.genesis_hash, @@ -103,12 +106,16 @@ impl Client { /// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection /// has been established or error otherwise. pub async fn try_connect(params: ConnectionParams) -> Result { - let client = Self::build_client(params.clone()).await?; + let (tokio, client) = Self::build_client(params.clone()).await?; let number: C::BlockNumber = Zero::zero(); - let genesis_hash = Substrate::::chain_get_block_hash(&*client, number).await?; + let genesis_hash_client = client.clone(); + let genesis_hash = tokio + .spawn(async move { Substrate::::chain_get_block_hash(&*genesis_hash_client, number).await }) + .await??; Ok(Self { + tokio, params, client, genesis_hash, @@ -118,37 +125,45 @@ impl Client { /// Reopen client connection. pub async fn reconnect(&mut self) -> Result<()> { - self.client = Self::build_client(self.params.clone()).await?; + self.client = Self::build_client(self.params.clone()).await?.1; Ok(()) } /// Build client to use in connection. - async fn build_client(params: ConnectionParams) -> Result> { + async fn build_client(params: ConnectionParams) -> Result<(Arc, Arc)> { + let tokio = tokio::runtime::Runtime::new()?; let uri = format!( "{}://{}:{}", if params.secure { "wss" } else { "ws" }, params.host, params.port, ); - let client = RpcClientBuilder::default() - .max_notifs_per_subscription(MAX_SUBSCRIPTION_CAPACITY) - .build(&uri) - .await?; + let client = tokio + .spawn(async move { + RpcClientBuilder::default() + .max_notifs_per_subscription(MAX_SUBSCRIPTION_CAPACITY) + .build(&uri) + .await + }) + .await??; - Ok(Arc::new(client)) + Ok((Arc::new(tokio), Arc::new(client))) } } impl Client { /// Returns true if client is connected to at least one peer and is in synced state. pub async fn ensure_synced(&self) -> Result<()> { - let health = Substrate::::system_health(&*self.client).await?; - let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0); - if is_synced { - Ok(()) - } else { - Err(Error::ClientNotSynced(health)) - } + self.jsonrpsee_execute(|client| async move { + let health = Substrate::::system_health(&*client).await?; + let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0); + if is_synced { + Ok(()) + } else { + Err(Error::ClientNotSynced(health)) + } + }) + .await } /// Return hash of the genesis block. @@ -158,7 +173,8 @@ impl Client { /// Return hash of the best finalized block. pub async fn best_finalized_header_hash(&self) -> Result { - Ok(Substrate::::chain_get_finalized_head(&*self.client).await?) + self.jsonrpsee_execute(|client| async move { Ok(Substrate::::chain_get_finalized_head(&*client).await?) }) + .await } /// Returns the best Substrate header. @@ -166,12 +182,16 @@ impl Client { where C::Header: DeserializeOwned, { - Ok(Substrate::::chain_get_header(&*self.client, None).await?) + self.jsonrpsee_execute(|client| async move { Ok(Substrate::::chain_get_header(&*client, None).await?) }) + .await } /// Get a Substrate block from its hash. pub async fn get_block(&self, block_hash: Option) -> Result { - Ok(Substrate::::chain_get_block(&*self.client, block_hash).await?) + self.jsonrpsee_execute( + move |client| async move { Ok(Substrate::::chain_get_block(&*client, block_hash).await?) }, + ) + .await } /// Get a Substrate header by its hash. @@ -179,12 +199,18 @@ impl Client { where C::Header: DeserializeOwned, { - Ok(Substrate::::chain_get_header(&*self.client, block_hash).await?) + self.jsonrpsee_execute(move |client| async move { + Ok(Substrate::::chain_get_header(&*client, block_hash).await?) + }) + .await } /// Get a Substrate block hash by its number. pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result { - Ok(Substrate::::chain_get_block_hash(&*self.client, number).await?) + self.jsonrpsee_execute(move |client| async move { + Ok(Substrate::::chain_get_block_hash(&*client, number).await?) + }) + .await } /// Get a Substrate header by its number. @@ -193,20 +219,25 @@ impl Client { C::Header: DeserializeOwned, { let block_hash = Self::block_hash_by_number(self, block_number).await?; - Ok(Self::header_by_hash(self, block_hash).await?) + let header_by_hash = Self::header_by_hash(self, block_hash).await?; + Ok(header_by_hash) } /// Return runtime version. pub async fn runtime_version(&self) -> Result { - Ok(Substrate::::state_runtime_version(&*self.client).await?) + self.jsonrpsee_execute(move |client| async move { Ok(Substrate::::state_runtime_version(&*client).await?) }) + .await } /// Read value from runtime storage. - pub async fn storage_value(&self, storage_key: StorageKey) -> Result> { - Substrate::::state_get_storage(&*self.client, storage_key) - .await? - .map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed)) - .transpose() + pub async fn storage_value(&self, storage_key: StorageKey) -> Result> { + self.jsonrpsee_execute(move |client| async move { + Substrate::::state_get_storage(&*client, storage_key) + .await? + .map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed)) + .transpose() + }) + .await } /// Return native tokens balance of the account. @@ -214,30 +245,39 @@ impl Client { where C: ChainWithBalances, { - let storage_key = C::account_info_storage_key(&account); - let encoded_account_data = Substrate::::state_get_storage(&*self.client, storage_key) - .await? - .ok_or(Error::AccountDoesNotExist)?; - let decoded_account_data = - AccountInfo::>::decode(&mut &encoded_account_data.0[..]) - .map_err(Error::ResponseParseFailed)?; - Ok(decoded_account_data.data.free) + self.jsonrpsee_execute(move |client| async move { + let storage_key = C::account_info_storage_key(&account); + let encoded_account_data = Substrate::::state_get_storage(&*client, storage_key) + .await? + .ok_or(Error::AccountDoesNotExist)?; + let decoded_account_data = + AccountInfo::>::decode(&mut &encoded_account_data.0[..]) + .map_err(Error::ResponseParseFailed)?; + Ok(decoded_account_data.data.free) + }) + .await } /// Get the nonce of the given Substrate account. /// /// Note: It's the caller's responsibility to make sure `account` is a valid ss58 address. pub async fn next_account_index(&self, account: C::AccountId) -> Result { - Ok(Substrate::::system_account_next_index(&*self.client, account).await?) + self.jsonrpsee_execute(move |client| async move { + Ok(Substrate::::system_account_next_index(&*client, account).await?) + }) + .await } /// Submit unsigned extrinsic for inclusion in a block. /// /// Note: The given transaction needs to be SCALE encoded beforehand. pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result { - let tx_hash = Substrate::::author_submit_extrinsic(&*self.client, transaction).await?; - log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash); - Ok(tx_hash) + self.jsonrpsee_execute(move |client| async move { + let tx_hash = Substrate::::author_submit_extrinsic(&*client, transaction).await?; + log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash); + Ok(tx_hash) + }) + .await } /// Submit an extrinsic signed by given account. @@ -250,71 +290,117 @@ impl Client { pub async fn submit_signed_extrinsic( &self, extrinsic_signer: C::AccountId, - prepare_extrinsic: impl FnOnce(C::Index) -> Bytes, + prepare_extrinsic: impl FnOnce(C::Index) -> Bytes + Send + 'static, ) -> Result { let _guard = self.submit_signed_extrinsic_lock.lock().await; let transaction_nonce = self.next_account_index(extrinsic_signer).await?; - let extrinsic = prepare_extrinsic(transaction_nonce); - let tx_hash = Substrate::::author_submit_extrinsic(&*self.client, extrinsic).await?; - log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); - Ok(tx_hash) + self.jsonrpsee_execute(move |client| async move { + let extrinsic = prepare_extrinsic(transaction_nonce); + let tx_hash = Substrate::::author_submit_extrinsic(&*client, extrinsic).await?; + log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); + Ok(tx_hash) + }) + .await } /// Estimate fee that will be spent on given extrinsic. pub async fn estimate_extrinsic_fee(&self, transaction: Bytes) -> Result { - let fee_details = Substrate::::payment_query_fee_details(&*self.client, transaction, None).await?; - let inclusion_fee = fee_details - .inclusion_fee - .map(|inclusion_fee| { - InclusionFee { - base_fee: C::Balance::try_from(inclusion_fee.base_fee.into_u256()) - .unwrap_or_else(|_| C::Balance::max_value()), - len_fee: C::Balance::try_from(inclusion_fee.len_fee.into_u256()) - .unwrap_or_else(|_| C::Balance::max_value()), - adjusted_weight_fee: C::Balance::try_from(inclusion_fee.adjusted_weight_fee.into_u256()) - .unwrap_or_else(|_| C::Balance::max_value()), - } - .inclusion_fee() - }) - .unwrap_or_else(Zero::zero); - Ok(inclusion_fee) + self.jsonrpsee_execute(move |client| async move { + let fee_details = Substrate::::payment_query_fee_details(&*client, transaction, None).await?; + let inclusion_fee = fee_details + .inclusion_fee + .map(|inclusion_fee| { + InclusionFee { + base_fee: C::Balance::try_from(inclusion_fee.base_fee.into_u256()) + .unwrap_or_else(|_| C::Balance::max_value()), + len_fee: C::Balance::try_from(inclusion_fee.len_fee.into_u256()) + .unwrap_or_else(|_| C::Balance::max_value()), + adjusted_weight_fee: C::Balance::try_from(inclusion_fee.adjusted_weight_fee.into_u256()) + .unwrap_or_else(|_| C::Balance::max_value()), + } + .inclusion_fee() + }) + .unwrap_or_else(Zero::zero); + Ok(inclusion_fee) + }) + .await } /// Get the GRANDPA authority set at given block. pub async fn grandpa_authorities_set(&self, block: C::Hash) -> Result { - let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); - let data = Bytes(Vec::new()); + self.jsonrpsee_execute(move |client| async move { + let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); + let data = Bytes(Vec::new()); - let encoded_response = Substrate::::state_call(&*self.client, call, data, Some(block)).await?; - let authority_list = encoded_response.0; + let encoded_response = Substrate::::state_call(&*client, call, data, Some(block)).await?; + let authority_list = encoded_response.0; - Ok(authority_list) + Ok(authority_list) + }) + .await } /// Execute runtime call at given block. pub async fn state_call(&self, method: String, data: Bytes, at_block: Option) -> Result { - Substrate::::state_call(&*self.client, method, data, at_block) - .await - .map_err(Into::into) + self.jsonrpsee_execute(move |client| async move { + Substrate::::state_call(&*client, method, data, at_block) + .await + .map_err(Into::into) + }) + .await } /// Returns storage proof of given storage keys. pub async fn prove_storage(&self, keys: Vec, at_block: C::Hash) -> Result { - Substrate::::state_prove_storage(&*self.client, keys, Some(at_block)) - .await - .map(|proof| StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect())) - .map_err(Into::into) + self.jsonrpsee_execute(move |client| async move { + Substrate::::state_prove_storage(&*client, keys, Some(at_block)) + .await + .map(|proof| StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect())) + .map_err(Into::into) + }) + .await } /// Return new justifications stream. pub async fn subscribe_justifications(&self) -> Result { - Ok(self - .client - .subscribe( - "grandpa_subscribeJustifications", - JsonRpcParams::NoParams, - "grandpa_unsubscribeJustifications", - ) - .await?) + let subscription = self + .jsonrpsee_execute(move |client| async move { + Ok(client + .subscribe( + "grandpa_subscribeJustifications", + JsonRpcParams::NoParams, + "grandpa_unsubscribeJustifications", + ) + .await?) + }) + .await?; + Ok(JustificationsSubscription( + self.tokio.handle().clone(), + Arc::new(Mutex::new(subscription)), + )) + } + + /// Execute jsonrpsee future in tokio context. + async fn jsonrpsee_execute(&self, make_jsonrpsee_future: MF) -> Result + where + MF: FnOnce(Arc) -> F + Send + 'static, + F: Future> + Send, + T: Send + 'static, + { + let client = self.client.clone(); + self.tokio + .spawn(async move { make_jsonrpsee_future(client).await }) + .await? + } +} + +impl JustificationsSubscription { + /// Return next justification from the subscription. + pub async fn next(&self) -> Result> { + let subscription = self.1.clone(); + self.0 + .spawn(async move { subscription.lock().await.next().await }) + .await? + .map_err(Error::RpcError) } } diff --git a/bridges/relays/client-substrate/src/error.rs b/bridges/relays/client-substrate/src/error.rs index f553d6867e..f06079ef5f 100644 --- a/bridges/relays/client-substrate/src/error.rs +++ b/bridges/relays/client-substrate/src/error.rs @@ -27,6 +27,8 @@ pub type Result = std::result::Result; /// a Substrate node through RPC. #[derive(Debug)] pub enum Error { + /// IO error. + Io(std::io::Error), /// An error that can occur when making a request to /// an JSON-RPC server. RpcError(RpcError), @@ -49,6 +51,7 @@ pub enum Error { impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { + Self::Io(ref e) => Some(e), Self::RpcError(ref e) => Some(e), Self::ResponseParseFailed(ref e) => Some(e), Self::UninitializedBridgePallet => None, @@ -67,6 +70,18 @@ impl From for Error { } } +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error::Io(error) + } +} + +impl From for Error { + fn from(error: tokio::task::JoinError) -> Self { + Error::Custom(format!("Failed to wait tokio task: {}", error)) + } +} + impl MaybeConnectionError for Error { fn is_connection_error(&self) -> bool { matches!( @@ -82,6 +97,7 @@ impl MaybeConnectionError for Error { impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let s = match self { + Self::Io(e) => e.to_string(), Self::RpcError(e) => e.to_string(), Self::ResponseParseFailed(e) => e.to_string(), Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(), diff --git a/bridges/relays/client-substrate/src/finality_source.rs b/bridges/relays/client-substrate/src/finality_source.rs index 546a3ede3e..81a98d2f1e 100644 --- a/bridges/relays/client-substrate/src/finality_source.rs +++ b/bridges/relays/client-substrate/src/finality_source.rs @@ -132,7 +132,7 @@ where async fn finality_proofs(&self) -> Result { Ok(unfold( self.client.clone().subscribe_justifications().await?, - move |mut subscription| async move { + move |subscription| async move { loop { let log_error = |err| { log::error!(