Use jsonrpsee subscriptions (#1533)

* splitted Substrate RPC trait

* introduce subscription methods

* removed commented code

* removed commented code
This commit is contained in:
Svyatoslav Nikolsky
2022-08-02 13:40:41 +03:00
committed by Bastian Köcher
parent 77af92b17b
commit e7a7396616
2 changed files with 119 additions and 80 deletions
+37 -46
View File
@@ -18,7 +18,11 @@
use crate::{
chain::{Chain, ChainWithBalances, TransactionStatusOf},
rpc::SubstrateClient,
rpc::{
SubstrateAuthorClient, SubstrateChainClient, SubstrateFrameSystemClient,
SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient,
SubstrateTransactionPaymentClient,
},
ConnectionParams, Error, HashOf, HeaderIdOf, Result,
};
@@ -29,8 +33,7 @@ use codec::{Decode, Encode};
use frame_system::AccountInfo;
use futures::{SinkExt, StreamExt};
use jsonrpsee::{
core::{client::SubscriptionClientT, DeserializeOwned},
types::params::ParamsSer,
core::DeserializeOwned,
ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder},
};
use num_traits::{Bounded, Zero};
@@ -153,8 +156,7 @@ impl<C: Chain> Client<C> {
let genesis_hash_client = client.clone();
let genesis_hash = tokio
.spawn(async move {
SubstrateClient::<C>::chain_get_block_hash(&*genesis_hash_client, Some(number))
.await
SubstrateChainClient::<C>::block_hash(&*genesis_hash_client, Some(number)).await
})
.await??;
@@ -212,7 +214,7 @@ impl<C: Chain> Client<C> {
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
self.jsonrpsee_execute(|client| async move {
let health = SubstrateClient::<C>::system_health(&*client).await?;
let health = SubstrateSystemClient::<C>::health(&*client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
@@ -231,7 +233,7 @@ impl<C: Chain> Client<C> {
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result<C::Hash> {
self.jsonrpsee_execute(|client| async move {
Ok(SubstrateClient::<C>::chain_get_finalized_head(&*client).await?)
Ok(SubstrateChainClient::<C>::finalized_head(&*client).await?)
})
.await
}
@@ -252,7 +254,7 @@ impl<C: Chain> Client<C> {
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(|client| async move {
Ok(SubstrateClient::<C>::chain_get_header(&*client, None).await?)
Ok(SubstrateChainClient::<C>::header(&*client, None).await?)
})
.await
}
@@ -260,7 +262,7 @@ impl<C: Chain> Client<C> {
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option<C::Hash>) -> Result<C::SignedBlock> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::chain_get_block(&*client, block_hash).await?)
Ok(SubstrateChainClient::<C>::block(&*client, block_hash).await?)
})
.await
}
@@ -271,7 +273,7 @@ impl<C: Chain> Client<C> {
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::chain_get_header(&*client, Some(block_hash)).await?)
Ok(SubstrateChainClient::<C>::header(&*client, Some(block_hash)).await?)
})
.await
}
@@ -279,7 +281,7 @@ impl<C: Chain> Client<C> {
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result<C::Hash> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::chain_get_block_hash(&*client, Some(number)).await?)
Ok(SubstrateChainClient::<C>::block_hash(&*client, Some(number)).await?)
})
.await
}
@@ -297,7 +299,7 @@ impl<C: Chain> Client<C> {
/// Return runtime version.
pub async fn runtime_version(&self) -> Result<RuntimeVersion> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::state_runtime_version(&*client).await?)
Ok(SubstrateStateClient::<C>::runtime_version(&*client).await?)
})
.await
}
@@ -341,7 +343,7 @@ impl<C: Chain> Client<C> {
block_hash: Option<C::Hash>,
) -> Result<Option<StorageData>> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::state_get_storage(&*client, storage_key, block_hash).await?)
Ok(SubstrateStateClient::<C>::storage(&*client, storage_key, block_hash).await?)
})
.await
}
@@ -354,7 +356,7 @@ impl<C: Chain> Client<C> {
self.jsonrpsee_execute(move |client| async move {
let storage_key = C::account_info_storage_key(&account);
let encoded_account_data =
SubstrateClient::<C>::state_get_storage(&*client, storage_key, None)
SubstrateStateClient::<C>::storage(&*client, storage_key, None)
.await?
.ok_or(Error::AccountDoesNotExist)?;
let decoded_account_data = AccountInfo::<C::Index, AccountData<C::Balance>>::decode(
@@ -371,7 +373,7 @@ impl<C: Chain> Client<C> {
/// Note: It's the caller's responsibility to make sure `account` is a valid SS58 address.
pub async fn next_account_index(&self, account: C::AccountId) -> Result<C::Index> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::system_account_next_index(&*client, account).await?)
Ok(SubstrateFrameSystemClient::<C>::account_next_index(&*client, account).await?)
})
.await
}
@@ -381,7 +383,7 @@ impl<C: Chain> Client<C> {
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
self.jsonrpsee_execute(move |client| async move {
let tx_hash = SubstrateClient::<C>::author_submit_extrinsic(&*client, transaction)
let tx_hash = SubstrateAuthorClient::<C>::submit_extrinsic(&*client, transaction)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
@@ -418,12 +420,12 @@ impl<C: Chain> Client<C> {
self.jsonrpsee_execute(move |client| async move {
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
let tx_hash = SubstrateClient::<C>::author_submit_extrinsic(&*client, extrinsic)
let tx_hash = SubstrateAuthorClient::<C>::submit_extrinsic(&*client, extrinsic)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(tx_hash)
})
@@ -445,18 +447,13 @@ impl<C: Chain> Client<C> {
.jsonrpsee_execute(move |client| async move {
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
let tx_hash = C::Hasher::hash(&extrinsic.0);
let subscription = client
.subscribe(
"author_submitAndWatchExtrinsic",
Some(ParamsSer::Array(vec![jsonrpsee::core::to_json_value(extrinsic)
.map_err(|e| Error::RpcError(e.into()))?])),
"author_unwatchExtrinsic",
)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
let subscription =
SubstrateAuthorClient::<C>::submit_and_watch_extrinsic(&*client, extrinsic)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(subscription)
})
@@ -474,7 +471,7 @@ impl<C: Chain> Client<C> {
/// Returns pending extrinsics from transaction pool.
pub async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateClient::<C>::author_pending_extrinsics(&*client).await?)
Ok(SubstrateAuthorClient::<C>::pending_extrinsics(&*client).await?)
})
.await
}
@@ -490,7 +487,7 @@ impl<C: Chain> Client<C> {
let data = Bytes((TransactionSource::External, transaction, at_block).encode());
let encoded_response =
SubstrateClient::<C>::state_call(&*client, call, data, Some(at_block)).await?;
SubstrateStateClient::<C>::call(&*client, call, data, Some(at_block)).await?;
let validity = TransactionValidity::decode(&mut &encoded_response.0[..])
.map_err(Error::ResponseParseFailed)?;
@@ -506,7 +503,7 @@ impl<C: Chain> Client<C> {
) -> Result<InclusionFee<C::Balance>> {
self.jsonrpsee_execute(move |client| async move {
let fee_details =
SubstrateClient::<C>::payment_query_fee_details(&*client, transaction, None)
SubstrateTransactionPaymentClient::<C>::fee_details(&*client, transaction, None)
.await?;
let inclusion_fee = fee_details
.inclusion_fee
@@ -540,7 +537,7 @@ impl<C: Chain> Client<C> {
let data = Bytes(Vec::new());
let encoded_response =
SubstrateClient::<C>::state_call(&*client, call, data, Some(block)).await?;
SubstrateStateClient::<C>::call(&*client, call, data, Some(block)).await?;
let authority_list = encoded_response.0;
Ok(authority_list)
@@ -556,7 +553,7 @@ impl<C: Chain> Client<C> {
at_block: Option<C::Hash>,
) -> Result<Bytes> {
self.jsonrpsee_execute(move |client| async move {
SubstrateClient::<C>::state_call(&*client, method, data, at_block)
SubstrateStateClient::<C>::call(&*client, method, data, at_block)
.await
.map_err(Into::into)
})
@@ -570,7 +567,7 @@ impl<C: Chain> Client<C> {
at_block: C::Hash,
) -> Result<StorageProof> {
self.jsonrpsee_execute(move |client| async move {
SubstrateClient::<C>::state_prove_storage(&*client, keys, Some(at_block))
SubstrateStateClient::<C>::prove_storage(&*client, keys, Some(at_block))
.await
.map(|proof| {
StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect::<Vec<_>>())
@@ -583,7 +580,7 @@ impl<C: Chain> Client<C> {
/// Return `tokenDecimals` property from the set of chain properties.
pub async fn token_decimals(&self) -> Result<Option<u64>> {
self.jsonrpsee_execute(move |client| async move {
let system_properties = SubstrateClient::<C>::system_properties(&*client).await?;
let system_properties = SubstrateSystemClient::<C>::properties(&*client).await?;
Ok(system_properties.get("tokenDecimals").and_then(|v| v.as_u64()))
})
.await
@@ -593,13 +590,7 @@ impl<C: Chain> Client<C> {
pub async fn subscribe_grandpa_justifications(&self) -> Result<Subscription<Bytes>> {
let subscription = self
.jsonrpsee_execute(move |client| async move {
Ok(client
.subscribe(
"grandpa_subscribeJustifications",
None,
"grandpa_unsubscribeJustifications",
)
.await?)
Ok(SubstrateGrandpaClient::<C>::subscribe_justifications(&*client).await?)
})
.await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);