use tokio reactor to execute jsonrpsee futures (#1061)

This commit is contained in:
Svyatoslav Nikolsky
2021-07-20 15:18:51 +03:00
committed by Bastian Köcher
parent 08fd53adef
commit 63d6fc436a
13 changed files with 337 additions and 168 deletions
@@ -155,11 +155,12 @@ impl SubmitEthereumHeaders for SubstrateClient<Rialto> {
headers: Vec<QueuedEthereumHeader>,
) -> SubmittedHeaders<EthereumHeaderId, RpcError> {
let ids = headers.iter().map(|header| header.id()).collect();
let genesis_hash = *self.genesis_hash();
let submission_result = async {
self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), |transaction_nonce| {
self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), move |transaction_nonce| {
Bytes(
Rialto::sign_transaction(
*self.genesis_hash(),
genesis_hash,
&params,
transaction_nonce,
instance.build_signed_header_call(headers),
@@ -257,10 +258,11 @@ impl SubmitEthereumExchangeTransactionProof for SubstrateClient<Rialto> {
instance: Arc<dyn BridgeInstance>,
proof: rialto_runtime::exchange::EthereumTransactionInclusionProof,
) -> RpcResult<()> {
self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), |transaction_nonce| {
let genesis_hash = *self.genesis_hash();
self.submit_signed_extrinsic((*params.public().as_array_ref()).into(), move |transaction_nonce| {
Bytes(
Rialto::sign_transaction(
*self.genesis_hash(),
genesis_hash,
&params,
transaction_nonce,
instance.build_currency_exchange_call(proof),
@@ -176,10 +176,11 @@ impl SendMessage {
fee,
})?;
let source_genesis_hash = *source_client.genesis_hash();
source_client
.submit_signed_extrinsic(source_sign.public().into(), |transaction_nonce| {
.submit_signed_extrinsic(source_sign.public().into(), move |transaction_nonce| {
let signed_source_call = Source::sign_transaction(
*source_client.genesis_hash(),
source_genesis_hash,
&source_sign,
transaction_nonce,
send_message_call,
@@ -80,10 +80,11 @@ where
}
async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), SubstrateError> {
let transactions_author = self.pipeline.transactions_author();
let pipeline = self.pipeline.clone();
self.client
.submit_signed_extrinsic(self.pipeline.transactions_author(), move |transaction_nonce| {
self.pipeline
.make_submit_finality_proof_transaction(transaction_nonce, header, proof)
.submit_signed_extrinsic(transactions_author, move |transaction_nonce| {
pipeline.make_submit_finality_proof_transaction(transaction_nonce, header, proof)
})
.await
.map(drop)
@@ -39,7 +39,9 @@ pub async fn initialize<SourceChain: Chain, TargetChain: Chain>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
target_transactions_signer: TargetChain::AccountId,
prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData<SourceChain::Header>) -> Bytes,
prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData<SourceChain::Header>) -> Bytes
+ Send
+ 'static,
) {
let result = do_initialize(
source_client,
@@ -72,7 +74,9 @@ async fn do_initialize<SourceChain: Chain, TargetChain: Chain>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
target_transactions_signer: TargetChain::AccountId,
prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData<SourceChain::Header>) -> Bytes,
prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData<SourceChain::Header>) -> Bytes
+ Send
+ 'static,
) -> Result<TargetChain::Hash, String> {
let initialization_data = prepare_initialization_data(source_client).await?;
log::info!(
@@ -102,7 +106,7 @@ async fn prepare_initialization_data<SourceChain: Chain>(
// But now there are problems with this approach - `CurrentSetId` may return invalid value. So here
// we're waiting for the next justification, read the authorities set and then try to figure out
// the set id with bruteforce.
let mut justifications = source_client
let justifications = source_client
.subscribe_justifications()
.await
.map_err(|err| format!("Failed to subscribe to {} justifications: {:?}", SourceChain::NAME, err))?;
@@ -233,10 +233,10 @@ where
generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof,
) -> Result<(), SubstrateError> {
let lane = self.lane.clone();
self.client
.submit_signed_extrinsic(self.lane.source_transactions_author(), move |transaction_nonce| {
self.lane
.make_messages_receiving_proof_transaction(transaction_nonce, generated_at_block, proof)
lane.make_messages_receiving_proof_transaction(transaction_nonce, generated_at_block, proof)
})
.await?;
Ok(())
@@ -219,14 +219,11 @@ where
nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, SubstrateError> {
let lane = self.lane.clone();
let nonces_clone = nonces.clone();
self.client
.submit_signed_extrinsic(self.lane.target_transactions_author(), |transaction_nonce| {
self.lane.make_messages_delivery_transaction(
transaction_nonce,
generated_at_header,
nonces.clone(),
proof,
)
.submit_signed_extrinsic(self.lane.target_transactions_author(), move |transaction_nonce| {
lane.make_messages_delivery_transaction(transaction_nonce, generated_at_header, nonces_clone, proof)
})
.await?;
Ok(nonces)
@@ -16,4 +16,5 @@ jsonrpsee-ws-client = "0.2"
libsecp256k1 = { version = "0.3.4", default-features = false, features = ["hmac"] }
log = "0.4.11"
relay-utils = { path = "../utils" }
tokio = "1.8"
web3 = { git = "https://github.com/svyatonik/rust-web3.git", branch = "bump-deps" }
+64 -22
View File
@@ -23,7 +23,7 @@ use crate::{ConnectionParams, Error, Result};
use jsonrpsee_ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder};
use relay_utils::relay_loop::RECONNECT_DELAY;
use std::sync::Arc;
use std::{future::Future, sync::Arc};
/// Number of headers missing from the Ethereum node for us to consider node not synced.
const MAJOR_SYNC_BLOCKS: u64 = 5;
@@ -31,6 +31,7 @@ const MAJOR_SYNC_BLOCKS: u64 = 5;
/// The client used to interact with an Ethereum node through RPC.
#[derive(Clone)]
pub struct Client {
tokio: Arc<tokio::runtime::Runtime>,
params: ConnectionParams,
client: Arc<RpcClient>,
}
@@ -59,22 +60,23 @@ impl Client {
/// Try to connect to Ethereum node. Returns Ethereum RPC client if connection has been established
/// or error otherwise.
pub async fn try_connect(params: ConnectionParams) -> Result<Self> {
Ok(Self {
client: Self::build_client(&params).await?,
params,
})
let (tokio, client) = Self::build_client(&params).await?;
Ok(Self { tokio, client, params })
}
/// Build client to use in connection.
async fn build_client(params: &ConnectionParams) -> Result<Arc<RpcClient>> {
async fn build_client(params: &ConnectionParams) -> Result<(Arc<tokio::runtime::Runtime>, Arc<RpcClient>)> {
let tokio = tokio::runtime::Runtime::new()?;
let uri = format!("ws://{}:{}", params.host, params.port);
let client = RpcClientBuilder::default().build(&uri).await?;
Ok(Arc::new(client))
let client = tokio
.spawn(async move { RpcClientBuilder::default().build(&uri).await })
.await??;
Ok((Arc::new(tokio), Arc::new(client)))
}
/// Reopen client connection.
pub async fn reconnect(&mut self) -> Result<()> {
self.client = Self::build_client(&self.params).await?;
self.client = Self::build_client(&self.params).await?.1;
Ok(())
}
}
@@ -82,7 +84,8 @@ impl Client {
impl Client {
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
match Ethereum::syncing(&*self.client).await? {
self.jsonrpsee_execute(move |client| async move {
match Ethereum::syncing(&*client).await? {
SyncState::NotSyncing => Ok(()),
SyncState::Syncing(syncing) => {
let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block);
@@ -93,43 +96,53 @@ impl Client {
Ok(())
}
}
})
.await
}
/// Estimate gas usage for the given call.
pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {
Ok(Ethereum::estimate_gas(&*self.client, call_request).await?)
self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::estimate_gas(&*client, call_request).await?) })
.await
}
/// Retrieve number of the best known block from the Ethereum node.
pub async fn best_block_number(&self) -> Result<u64> {
Ok(Ethereum::block_number(&*self.client).await?.as_u64())
self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::block_number(&*client).await?.as_u64()) })
.await
}
/// Retrieve number of the best known block from the Ethereum node.
pub async fn header_by_number(&self, block_number: u64) -> Result<Header> {
self.jsonrpsee_execute(move |client| async move {
let get_full_tx_objects = false;
let header = Ethereum::get_block_by_number(&*self.client, block_number, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_number(&*client, block_number, get_full_tx_objects).await?;
match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() {
true => Ok(header),
false => Err(Error::IncompleteHeader),
}
})
.await
}
/// Retrieve block header by its hash from Ethereum node.
pub async fn header_by_hash(&self, hash: H256) -> Result<Header> {
self.jsonrpsee_execute(move |client| async move {
let get_full_tx_objects = false;
let header = Ethereum::get_block_by_hash(&*self.client, hash, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_hash(&*client, hash, get_full_tx_objects).await?;
match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() {
true => Ok(header),
false => Err(Error::IncompleteHeader),
}
})
.await
}
/// Retrieve block header and its transactions by its number from Ethereum node.
pub async fn header_by_number_with_transactions(&self, number: u64) -> Result<HeaderWithTransactions> {
self.jsonrpsee_execute(move |client| async move {
let get_full_tx_objects = true;
let header =
Ethereum::get_block_by_number_with_transactions(&*self.client, number, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_number_with_transactions(&*client, number, get_full_tx_objects).await?;
let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some();
if !is_complete_header {
@@ -142,12 +155,15 @@ impl Client {
}
Ok(header)
})
.await
}
/// Retrieve block header and its transactions by its hash from Ethereum node.
pub async fn header_by_hash_with_transactions(&self, hash: H256) -> Result<HeaderWithTransactions> {
self.jsonrpsee_execute(move |client| async move {
let get_full_tx_objects = true;
let header = Ethereum::get_block_by_hash_with_transactions(&*self.client, hash, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_hash_with_transactions(&*client, hash, get_full_tx_objects).await?;
let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some();
if !is_complete_header {
@@ -160,35 +176,61 @@ impl Client {
}
Ok(header)
})
.await
}
/// Retrieve transaction by its hash from Ethereum node.
pub async fn transaction_by_hash(&self, hash: H256) -> Result<Option<Transaction>> {
Ok(Ethereum::transaction_by_hash(&*self.client, hash).await?)
self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::transaction_by_hash(&*client, hash).await?) })
.await
}
/// Retrieve transaction receipt by transaction hash.
pub async fn transaction_receipt(&self, transaction_hash: H256) -> Result<Receipt> {
Ok(Ethereum::get_transaction_receipt(&*self.client, transaction_hash).await?)
self.jsonrpsee_execute(move |client| async move {
Ok(Ethereum::get_transaction_receipt(&*client, transaction_hash).await?)
})
.await
}
/// Get the nonce of the given account.
pub async fn account_nonce(&self, address: Address) -> Result<U256> {
Ok(Ethereum::get_transaction_count(&*self.client, address).await?)
self.jsonrpsee_execute(
move |client| async move { Ok(Ethereum::get_transaction_count(&*client, address).await?) },
)
.await
}
/// Submit an Ethereum transaction.
///
/// The transaction must already be signed before sending it through this method.
pub async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result<TransactionHash> {
self.jsonrpsee_execute(move |client| async move {
let transaction = Bytes(signed_raw_tx);
let tx_hash = Ethereum::submit_transaction(&*self.client, transaction).await?;
let tx_hash = Ethereum::submit_transaction(&*client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Ethereum node: {:?}", tx_hash);
Ok(tx_hash)
})
.await
}
/// Call Ethereum smart contract.
pub async fn eth_call(&self, call_transaction: CallRequest) -> Result<Bytes> {
Ok(Ethereum::call(&*self.client, call_transaction).await?)
self.jsonrpsee_execute(move |client| async move { Ok(Ethereum::call(&*client, call_transaction).await?) })
.await
}
/// Execute jsonrpsee future in tokio context.
async fn jsonrpsee_execute<MF, F, T>(&self, make_jsonrpsee_future: MF) -> Result<T>
where
MF: FnOnce(Arc<RpcClient>) -> F + Send + 'static,
F: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let client = self.client.clone();
self.tokio
.spawn(async move { make_jsonrpsee_future(client).await })
.await?
}
}
@@ -28,6 +28,8 @@ pub type Result<T> = std::result::Result<T, Error>;
/// an Ethereum node through RPC.
#[derive(Debug)]
pub enum Error {
/// IO error.
Io(std::io::Error),
/// An error that can occur when making an HTTP request to
/// an JSON-RPC client.
RpcError(RpcError),
@@ -45,6 +47,8 @@ pub enum Error {
/// The client we're connected to is not synced, so we can't rely on its state. Contains
/// number of unsynced headers.
ClientNotSynced(U256),
/// Custom logic error.
Custom(String),
}
impl From<RpcError> for Error {
@@ -53,6 +57,18 @@ impl From<RpcError> for Error {
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error::Io(error)
}
}
impl From<tokio::task::JoinError> for Error {
fn from(error: tokio::task::JoinError) -> Self {
Error::Custom(format!("Failed to wait tokio task: {}", error))
}
}
impl MaybeConnectionError for Error {
fn is_connection_error(&self) -> bool {
matches!(
@@ -68,6 +84,7 @@ impl MaybeConnectionError for Error {
impl ToString for Error {
fn to_string(&self) -> String {
match self {
Self::Io(e) => e.to_string(),
Self::RpcError(e) => e.to_string(),
Self::ResponseParseFailed(e) => e.to_string(),
Self::IncompleteHeader => {
@@ -80,6 +97,7 @@ impl ToString for Error {
Self::ClientNotSynced(missing_headers) => {
format!("Ethereum client is not synced: syncing {} headers", missing_headers)
}
Self::Custom(ref e) => e.clone(),
}
}
}
@@ -14,6 +14,7 @@ jsonrpsee-ws-client = "0.2"
log = "0.4.11"
num-traits = "0.2"
rand = "0.7"
tokio = "1.8"
# Bridge dependencies
+116 -30
View File
@@ -32,13 +32,13 @@ use relay_utils::relay_loop::RECONNECT_DELAY;
use sp_core::{storage::StorageKey, Bytes};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::convert::TryFrom;
use std::{convert::TryFrom, future::Future};
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// Opaque justifications subscription type.
pub type JustificationsSubscription = Subscription<Bytes>;
pub struct JustificationsSubscription(tokio::runtime::Handle, Arc<Mutex<Subscription<Bytes>>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
@@ -47,6 +47,8 @@ pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
///
/// Cloning `Client` is a cheap operation.
pub struct Client<C: Chain> {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
/// Client connection params.
params: ConnectionParams,
/// Substrate RPC client.
@@ -62,6 +64,7 @@ pub struct Client<C: Chain> {
impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self {
Client {
tokio: self.tokio.clone(),
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
@@ -103,12 +106,16 @@ impl<C: Chain> Client<C> {
/// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection
/// has been established or error otherwise.
pub async fn try_connect(params: ConnectionParams) -> Result<Self> {
let client = Self::build_client(params.clone()).await?;
let (tokio, client) = Self::build_client(params.clone()).await?;
let number: C::BlockNumber = Zero::zero();
let genesis_hash = Substrate::<C>::chain_get_block_hash(&*client, number).await?;
let genesis_hash_client = client.clone();
let genesis_hash = tokio
.spawn(async move { Substrate::<C>::chain_get_block_hash(&*genesis_hash_client, number).await })
.await??;
Ok(Self {
tokio,
params,
client,
genesis_hash,
@@ -118,37 +125,45 @@ impl<C: Chain> Client<C> {
/// Reopen client connection.
pub async fn reconnect(&mut self) -> Result<()> {
self.client = Self::build_client(self.params.clone()).await?;
self.client = Self::build_client(self.params.clone()).await?.1;
Ok(())
}
/// Build client to use in connection.
async fn build_client(params: ConnectionParams) -> Result<Arc<RpcClient>> {
async fn build_client(params: ConnectionParams) -> Result<(Arc<tokio::runtime::Runtime>, Arc<RpcClient>)> {
let tokio = tokio::runtime::Runtime::new()?;
let uri = format!(
"{}://{}:{}",
if params.secure { "wss" } else { "ws" },
params.host,
params.port,
);
let client = RpcClientBuilder::default()
let client = tokio
.spawn(async move {
RpcClientBuilder::default()
.max_notifs_per_subscription(MAX_SUBSCRIPTION_CAPACITY)
.build(&uri)
.await?;
.await
})
.await??;
Ok(Arc::new(client))
Ok((Arc::new(tokio), Arc::new(client)))
}
}
impl<C: Chain> Client<C> {
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
let health = Substrate::<C>::system_health(&*self.client).await?;
self.jsonrpsee_execute(|client| async move {
let health = Substrate::<C>::system_health(&*client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
} else {
Err(Error::ClientNotSynced(health))
}
})
.await
}
/// Return hash of the genesis block.
@@ -158,7 +173,8 @@ impl<C: Chain> Client<C> {
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result<C::Hash> {
Ok(Substrate::<C>::chain_get_finalized_head(&*self.client).await?)
self.jsonrpsee_execute(|client| async move { Ok(Substrate::<C>::chain_get_finalized_head(&*client).await?) })
.await
}
/// Returns the best Substrate header.
@@ -166,12 +182,16 @@ impl<C: Chain> Client<C> {
where
C::Header: DeserializeOwned,
{
Ok(Substrate::<C>::chain_get_header(&*self.client, None).await?)
self.jsonrpsee_execute(|client| async move { Ok(Substrate::<C>::chain_get_header(&*client, None).await?) })
.await
}
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option<C::Hash>) -> Result<C::SignedBlock> {
Ok(Substrate::<C>::chain_get_block(&*self.client, block_hash).await?)
self.jsonrpsee_execute(
move |client| async move { Ok(Substrate::<C>::chain_get_block(&*client, block_hash).await?) },
)
.await
}
/// Get a Substrate header by its hash.
@@ -179,12 +199,18 @@ impl<C: Chain> Client<C> {
where
C::Header: DeserializeOwned,
{
Ok(Substrate::<C>::chain_get_header(&*self.client, block_hash).await?)
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::<C>::chain_get_header(&*client, block_hash).await?)
})
.await
}
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result<C::Hash> {
Ok(Substrate::<C>::chain_get_block_hash(&*self.client, number).await?)
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::<C>::chain_get_block_hash(&*client, number).await?)
})
.await
}
/// Get a Substrate header by its number.
@@ -193,20 +219,25 @@ impl<C: Chain> Client<C> {
C::Header: DeserializeOwned,
{
let block_hash = Self::block_hash_by_number(self, block_number).await?;
Ok(Self::header_by_hash(self, block_hash).await?)
let header_by_hash = Self::header_by_hash(self, block_hash).await?;
Ok(header_by_hash)
}
/// Return runtime version.
pub async fn runtime_version(&self) -> Result<RuntimeVersion> {
Ok(Substrate::<C>::state_runtime_version(&*self.client).await?)
self.jsonrpsee_execute(move |client| async move { Ok(Substrate::<C>::state_runtime_version(&*client).await?) })
.await
}
/// Read value from runtime storage.
pub async fn storage_value<T: Decode>(&self, storage_key: StorageKey) -> Result<Option<T>> {
Substrate::<C>::state_get_storage(&*self.client, storage_key)
pub async fn storage_value<T: Send + Decode + 'static>(&self, storage_key: StorageKey) -> Result<Option<T>> {
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_get_storage(&*client, storage_key)
.await?
.map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed))
.transpose()
})
.await
}
/// Return native tokens balance of the account.
@@ -214,30 +245,39 @@ impl<C: Chain> Client<C> {
where
C: ChainWithBalances,
{
self.jsonrpsee_execute(move |client| async move {
let storage_key = C::account_info_storage_key(&account);
let encoded_account_data = Substrate::<C>::state_get_storage(&*self.client, storage_key)
let encoded_account_data = Substrate::<C>::state_get_storage(&*client, storage_key)
.await?
.ok_or(Error::AccountDoesNotExist)?;
let decoded_account_data =
AccountInfo::<C::Index, AccountData<C::Balance>>::decode(&mut &encoded_account_data.0[..])
.map_err(Error::ResponseParseFailed)?;
Ok(decoded_account_data.data.free)
})
.await
}
/// Get the nonce of the given Substrate account.
///
/// Note: It's the caller's responsibility to make sure `account` is a valid ss58 address.
pub async fn next_account_index(&self, account: C::AccountId) -> Result<C::Index> {
Ok(Substrate::<C>::system_account_next_index(&*self.client, account).await?)
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::<C>::system_account_next_index(&*client, account).await?)
})
.await
}
/// Submit unsigned extrinsic for inclusion in a block.
///
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
let tx_hash = Substrate::<C>::author_submit_extrinsic(&*self.client, transaction).await?;
self.jsonrpsee_execute(move |client| async move {
let tx_hash = Substrate::<C>::author_submit_extrinsic(&*client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash);
Ok(tx_hash)
})
.await
}
/// Submit an extrinsic signed by given account.
@@ -250,19 +290,23 @@ impl<C: Chain> Client<C> {
pub async fn submit_signed_extrinsic(
&self,
extrinsic_signer: C::AccountId,
prepare_extrinsic: impl FnOnce(C::Index) -> Bytes,
prepare_extrinsic: impl FnOnce(C::Index) -> Bytes + Send + 'static,
) -> Result<C::Hash> {
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
self.jsonrpsee_execute(move |client| async move {
let extrinsic = prepare_extrinsic(transaction_nonce);
let tx_hash = Substrate::<C>::author_submit_extrinsic(&*self.client, extrinsic).await?;
let tx_hash = Substrate::<C>::author_submit_extrinsic(&*client, extrinsic).await?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(tx_hash)
})
.await
}
/// Estimate fee that will be spent on given extrinsic.
pub async fn estimate_extrinsic_fee(&self, transaction: Bytes) -> Result<C::Balance> {
let fee_details = Substrate::<C>::payment_query_fee_details(&*self.client, transaction, None).await?;
self.jsonrpsee_execute(move |client| async move {
let fee_details = Substrate::<C>::payment_query_fee_details(&*client, transaction, None).await?;
let inclusion_fee = fee_details
.inclusion_fee
.map(|inclusion_fee| {
@@ -278,43 +322,85 @@ impl<C: Chain> Client<C> {
})
.unwrap_or_else(Zero::zero);
Ok(inclusion_fee)
})
.await
}
/// Get the GRANDPA authority set at given block.
pub async fn grandpa_authorities_set(&self, block: C::Hash) -> Result<OpaqueGrandpaAuthoritiesSet> {
self.jsonrpsee_execute(move |client| async move {
let call = SUB_API_GRANDPA_AUTHORITIES.to_string();
let data = Bytes(Vec::new());
let encoded_response = Substrate::<C>::state_call(&*self.client, call, data, Some(block)).await?;
let encoded_response = Substrate::<C>::state_call(&*client, call, data, Some(block)).await?;
let authority_list = encoded_response.0;
Ok(authority_list)
})
.await
}
/// Execute runtime call at given block.
pub async fn state_call(&self, method: String, data: Bytes, at_block: Option<C::Hash>) -> Result<Bytes> {
Substrate::<C>::state_call(&*self.client, method, data, at_block)
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_call(&*client, method, data, at_block)
.await
.map_err(Into::into)
})
.await
}
/// Returns storage proof of given storage keys.
pub async fn prove_storage(&self, keys: Vec<StorageKey>, at_block: C::Hash) -> Result<StorageProof> {
Substrate::<C>::state_prove_storage(&*self.client, keys, Some(at_block))
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_prove_storage(&*client, keys, Some(at_block))
.await
.map(|proof| StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect()))
.map_err(Into::into)
})
.await
}
/// Return new justifications stream.
pub async fn subscribe_justifications(&self) -> Result<JustificationsSubscription> {
Ok(self
.client
let subscription = self
.jsonrpsee_execute(move |client| async move {
Ok(client
.subscribe(
"grandpa_subscribeJustifications",
JsonRpcParams::NoParams,
"grandpa_unsubscribeJustifications",
)
.await?)
})
.await?;
Ok(JustificationsSubscription(
self.tokio.handle().clone(),
Arc::new(Mutex::new(subscription)),
))
}
/// Execute jsonrpsee future in tokio context.
async fn jsonrpsee_execute<MF, F, T>(&self, make_jsonrpsee_future: MF) -> Result<T>
where
MF: FnOnce(Arc<RpcClient>) -> F + Send + 'static,
F: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let client = self.client.clone();
self.tokio
.spawn(async move { make_jsonrpsee_future(client).await })
.await?
}
}
impl JustificationsSubscription {
/// Return next justification from the subscription.
pub async fn next(&self) -> Result<Option<Bytes>> {
let subscription = self.1.clone();
self.0
.spawn(async move { subscription.lock().await.next().await })
.await?
.map_err(Error::RpcError)
}
}
@@ -27,6 +27,8 @@ pub type Result<T> = std::result::Result<T, Error>;
/// a Substrate node through RPC.
#[derive(Debug)]
pub enum Error {
/// IO error.
Io(std::io::Error),
/// An error that can occur when making a request to
/// an JSON-RPC server.
RpcError(RpcError),
@@ -49,6 +51,7 @@ pub enum Error {
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(ref e) => Some(e),
Self::RpcError(ref e) => Some(e),
Self::ResponseParseFailed(ref e) => Some(e),
Self::UninitializedBridgePallet => None,
@@ -67,6 +70,18 @@ impl From<RpcError> for Error {
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error::Io(error)
}
}
impl From<tokio::task::JoinError> for Error {
fn from(error: tokio::task::JoinError) -> Self {
Error::Custom(format!("Failed to wait tokio task: {}", error))
}
}
impl MaybeConnectionError for Error {
fn is_connection_error(&self) -> bool {
matches!(
@@ -82,6 +97,7 @@ impl MaybeConnectionError for Error {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
Self::Io(e) => e.to_string(),
Self::RpcError(e) => e.to_string(),
Self::ResponseParseFailed(e) => e.to_string(),
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
@@ -132,7 +132,7 @@ where
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Error> {
Ok(unfold(
self.client.clone().subscribe_justifications().await?,
move |mut subscription| async move {
move |subscription| async move {
loop {
let log_error = |err| {
log::error!(