Split RPCs into a separate crate (#1910)

* WIP extract RPCs into separate crate

* fmt

* Fix test

* Remove unused deps

* fix import

* WIP: Fix up errors and most tests. Start extracintg some tests/code to rpc crate

* MockRpcClient sync or async

* MockRpcClient only async but better type inference

* WIP MockRpcClient FnMuts and some test updates to use it

* Get all but one test working with new MockRpcClient

* WIP trying to debug failure

* WIP, Tests mostly fixed, need to add back oen more

* Get mock RPC tests working

* fmt

* fmt

* Clippy and comment tweak

* update CI to explicitly check subxt-rpc features

* clippy

* small tweaks after pass over

* feature flag rename

* update some docs

* Fix some examples

* fmt

* Fix features flags to work with web/wasm32

* Fix unused dep warning

* explicit targets in wasm CI

* Add better crate level docs

* fmt

* Address review comments

* Comment out flaky test for now and make more obvious how similar POlkadot and Substrate configs are

* Not a doc comment

* Remove unused imports
This commit is contained in:
James Wilson
2025-02-18 12:07:00 +00:00
committed by GitHub
parent 333de953ec
commit 816a86423b
50 changed files with 4575 additions and 1186 deletions
@@ -2,13 +2,13 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::rpc_methods::{ChainHeadRpcMethods, FollowEvent};
use crate::config::Config;
use crate::error::Error;
use futures::{FutureExt, Stream, StreamExt};
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use subxt_rpcs::methods::chain_head::{ChainHeadRpcMethods, FollowEvent};
/// A `Stream` whose goal is to remain subscribed to `chainHead_follow`. It will re-subscribe if the subscription
/// is ended for any reason, and it will return the current `subscription_id` as an event, along with the other
@@ -113,8 +113,10 @@ impl<Hash> FollowStream<Hash> {
.to_owned(),
));
};
// Return both:
// Map stream errors into the higher level subxt one:
let stream = stream.map_err(|e| e.into());
let stream: FollowEventStream<T::Hash> = Box::pin(stream);
// Return both:
Ok((stream, sub_id))
})
}),
@@ -215,12 +217,10 @@ impl<Hash> Stream for FollowStream<Hash> {
#[cfg(test)]
pub(super) mod test_utils {
use super::*;
use crate::backend::chain_head::rpc_methods::{
BestBlockChanged, Finalized, Initialized, NewBlock,
};
use crate::config::substrate::H256;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use subxt_rpcs::methods::chain_head::{BestBlockChanged, Finalized, Initialized, NewBlock};
/// Given some events, returns a follow stream getter that we can use in
/// place of the usual RPC method.
@@ -3,7 +3,6 @@
// see LICENSE for license details.
use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::backend::chain_head::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use futures::stream::{Stream, StreamExt};
@@ -12,6 +11,7 @@ use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use subxt_rpcs::methods::chain_head::{FollowEvent, Initialized, RuntimeEvent};
/// A `Stream` which builds on `FollowStreamDriver`, and allows multiple subscribers to obtain events
/// from the single underlying subscription (each being provided an `Initialized` message and all new
@@ -454,8 +454,11 @@ where
.iter()
.position(|b| b.hash() == p.hash())
else {
return Poll::Ready(Some(Err(RpcError::DisconnectedWillReconnect(
"Missed at least one block when the connection was lost".to_owned(),
return Poll::Ready(Some(Err(RpcError::ClientError(
subxt_rpcs::Error::DisconnectedWillReconnect(
"Missed at least one block when the connection was lost"
.to_owned(),
),
)
.into())));
};
@@ -739,7 +742,7 @@ mod test {
)
);
assert!(
matches!(&evs[1], Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) if e.contains("Missed at least one block when the connection was lost"))
matches!(&evs[1], Err(Error::Rpc(RpcError::ClientError(subxt_rpcs::Error::DisconnectedWillReconnect(e)))) if e.contains("Missed at least one block when the connection was lost"))
);
assert_eq!(
evs[2].as_ref().unwrap(),
@@ -4,12 +4,12 @@
use super::follow_stream::FollowStream;
use super::ChainHeadRpcMethods;
use crate::backend::chain_head::rpc_methods::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};
use crate::config::{BlockHash, Config};
use crate::error::Error;
use futures::stream::{FuturesUnordered, Stream, StreamExt};
use subxt_rpcs::methods::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};
use std::collections::{HashMap, HashSet};
use std::future::Future;
+32 -28
View File
@@ -16,15 +16,10 @@ mod follow_stream_driver;
mod follow_stream_unpin;
mod storage_items;
pub mod rpc_methods;
use self::follow_stream_driver::FollowStreamFinalizedHeads;
use self::rpc_methods::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
};
use crate::backend::{
rpc::RpcClient, utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse,
StreamOf, StreamOfResults, TransactionStatus,
utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
StreamOfResults, TransactionStatus,
};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
@@ -36,9 +31,18 @@ use futures::{Stream, StreamExt};
use std::collections::HashMap;
use std::task::Poll;
use storage_items::StorageItems;
use subxt_rpcs::methods::chain_head::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
};
use subxt_rpcs::RpcClient;
/// Re-export RPC types and methods from [`subxt_rpcs::methods::chain_head`].
pub mod rpc_methods {
pub use subxt_rpcs::methods::legacy::*;
}
// Expose the RPC methods.
pub use rpc_methods::ChainHeadRpcMethods;
pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods;
/// Configure and build an [`ChainHeadBackend`].
pub struct ChainHeadBackendBuilder<T> {
@@ -213,7 +217,7 @@ impl<T: Config> ChainHeadBackend<T> {
let header = match res {
Ok(header) => header,
Err(e) => return Some(Err(e)),
Err(e) => return Some(Err(e.into())),
};
Some(Ok((header, block_ref.into())))
@@ -338,13 +342,18 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
}
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
retry(|| self.methods.chainspec_v1_genesis_hash()).await
retry(|| async {
let genesis_hash = self.methods.chainspec_v1_genesis_hash().await?;
Ok(genesis_hash)
})
.await
}
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
retry(|| async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_v1_header(&sub_id, at).await
let header = self.methods.chainhead_v1_header(&sub_id, at).await?;
Ok(header)
})
.await
}
@@ -357,9 +366,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
let follow_events = self.follow_handle.subscribe().events();
let status = self.methods.chainhead_v1_body(&sub_id, at).await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::LimitReached => return Err(RpcError::LimitReached.into()),
MethodResponse::Started(s) => s.operation_id,
};
@@ -653,22 +660,21 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
let tx_progress_ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Some(Ok(ev))) => ev,
};
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus;
let tx_progress_ev = match tx_progress_ev {
rpc_methods::TransactionStatus::Finalized { block } => {
RpcTransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
// pinned and accessible.
finalized_hash = Some(block.hash);
continue;
}
rpc_methods::TransactionStatus::BestChainBlockIncluded {
block: Some(block),
} => {
RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => {
// Look up a pinned block ref if we can, else return a non-pinned
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
@@ -679,20 +685,20 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
};
TransactionStatus::InBestBlock { hash: block_ref }
}
rpc_methods::TransactionStatus::BestChainBlockIncluded { block: None } => {
RpcTransactionStatus::BestChainBlockIncluded { block: None } => {
TransactionStatus::NoLongerInBestBlock
}
rpc_methods::TransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
rpc_methods::TransactionStatus::Dropped { error, .. } => {
RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
RpcTransactionStatus::Dropped { error, .. } => {
TransactionStatus::Dropped { message: error }
}
rpc_methods::TransactionStatus::Error { error } => {
RpcTransactionStatus::Error { error } => {
TransactionStatus::Error { message: error }
}
rpc_methods::TransactionStatus::Invalid { error } => {
RpcTransactionStatus::Invalid { error } => {
TransactionStatus::Invalid { message: error }
}
rpc_methods::TransactionStatus::Validated => TransactionStatus::Validated,
RpcTransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(tx_progress_ev)));
}
@@ -718,9 +724,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
.chainhead_v1_call(&sub_id, at, method, call_parameters)
.await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::LimitReached => return Err(RpcError::LimitReached.into()),
MethodResponse::Started(s) => s.operation_id,
};
File diff suppressed because it is too large Load Diff
+10 -7
View File
@@ -4,9 +4,6 @@
use super::follow_stream_driver::FollowStreamDriverHandle;
use super::follow_stream_unpin::BlockRef;
use super::rpc_methods::{
ChainHeadRpcMethods, FollowEvent, MethodResponse, StorageQuery, StorageResult,
};
use crate::config::Config;
use crate::error::{Error, RpcError};
use futures::{FutureExt, Stream, StreamExt};
@@ -15,6 +12,9 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use subxt_rpcs::methods::chain_head::{
ChainHeadRpcMethods, FollowEvent, MethodResponse, StorageQuery, StorageResult,
};
/// Obtain a stream of storage items given some query. this handles continuing
/// and stopping under the hood, and returns a stream of `StorageResult`s.
@@ -45,9 +45,7 @@ impl<T: Config> StorageItems<T> {
.chainhead_v1_storage(&sub_id, at, queries, None)
.await?;
let operation_id: Arc<str> = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::LimitReached => return Err(RpcError::LimitReached.into()),
MethodResponse::Started(s) => s.operation_id.into(),
};
@@ -59,7 +57,12 @@ impl<T: Config> StorageItems<T> {
let operation_id = operation_id.clone();
let methods = methods.clone();
Box::pin(async move { methods.chainhead_v1_continue(&sub_id, &operation_id).await })
Box::pin(async move {
methods
.chainhead_v1_continue(&sub_id, &operation_id)
.await?;
Ok(())
})
})
};
@@ -5,21 +5,25 @@
//! This module exposes a legacy backend implementation, which relies
//! on the legacy RPC API methods.
pub mod rpc_methods;
use self::rpc_methods::TransactionStatus as RpcTransactionStatus;
use crate::backend::utils::{retry, retry_stream};
use crate::backend::{
rpc::RpcClient, Backend, BlockRef, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults,
Backend, BlockRef, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults,
TransactionStatus,
};
use crate::error::RpcError;
use crate::{config::Header, Config, Error};
use async_trait::async_trait;
use futures::TryStreamExt;
use futures::{future, future::Either, stream, Future, FutureExt, Stream, StreamExt};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use subxt_rpcs::RpcClient;
/// Re-export legacy RPC types and methods from [`subxt_rpcs::methods::legacy`].
pub mod rpc_methods {
pub use subxt_rpcs::methods::legacy::*;
}
// Expose the RPC methods.
pub use rpc_methods::LegacyRpcMethods;
@@ -181,11 +185,19 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
}
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
retry(|| self.methods.genesis_hash()).await
retry(|| async {
let hash = self.methods.genesis_hash().await?;
Ok(hash)
})
.await
}
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
retry(|| self.methods.chain_get_header(Some(at))).await
retry(|| async {
let header = self.methods.chain_get_header(Some(at)).await?;
Ok(header)
})
.await
}
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
@@ -227,7 +239,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
Box::pin(async move {
let sub = methods.state_subscribe_runtime_version().await?;
let sub = sub.map(|r| {
let sub = sub.map_err(|e| e.into()).map(|r| {
r.map(|v| RuntimeVersion {
spec_version: v.spec_version,
transaction_version: v.transaction_version,
@@ -244,8 +256,13 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
// Thus, it's technically possible that a runtime version can be missed if
// two runtime upgrades happen in quick succession, but this is very unlikely.
let stream = retry_sub.filter(|r| {
let forward = !matches!(r, Err(Error::Rpc(RpcError::DisconnectedWillReconnect(_))));
async move { forward }
let mut keep = true;
if let Err(e) = r {
if e.is_disconnected_will_reconnect() {
keep = false;
}
}
async move { keep }
});
Ok(StreamOf(Box::pin(stream)))
@@ -260,7 +277,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.chain_subscribe_all_heads().await?;
let sub = sub.map(|r| {
let sub = sub.map_err(|e| e.into()).map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
@@ -283,7 +300,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.chain_subscribe_new_heads().await?;
let sub = sub.map(|r| {
let sub = sub.map_err(|e| e.into()).map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
@@ -347,6 +364,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
let sub = sub.filter_map(|r| {
let mapped = r
.map_err(|e| e.into())
.map(|tx| {
match tx {
// We ignore these because they don't map nicely to the new API. They don't signal "end states" so this should be fine.
@@ -401,7 +419,14 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
call_parameters: Option<&[u8]>,
at: T::Hash,
) -> Result<Vec<u8>, Error> {
retry(|| self.methods.state_call(method, call_parameters, Some(at))).await
retry(|| async {
let res = self
.methods
.state_call(method, call_parameters, Some(at))
.await?;
Ok(res)
})
.await
}
}
@@ -530,14 +555,15 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
let storage_page_size = this.storage_page_size;
let pagination_start_key = this.pagination_start_key.clone();
let keys_fut = async move {
methods
let keys = methods
.state_get_keys_paged(
&key,
storage_page_size,
pagination_start_key.as_deref(),
Some(at),
)
.await
.await?;
Ok(keys)
};
this.keys_fut = Some(Box::pin(keys_fut));
}
@@ -599,9 +625,13 @@ impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
let at = this.keys.at;
let results_fut = async move {
let keys = keys.iter().map(|k| &**k);
let values =
retry(|| methods.state_query_storage_at(keys.clone(), Some(at)))
let values = retry(|| async {
let res = methods
.state_query_storage_at(keys.clone(), Some(at))
.await?;
Ok(res)
})
.await?;
let values: VecDeque<_> = values
.into_iter()
.flat_map(|v| {
-682
View File
@@ -1,682 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! An interface to call the raw legacy RPC methods.
use crate::backend::rpc::{rpc_params, RpcClient, RpcSubscription};
use crate::metadata::Metadata;
use crate::{Config, Error};
use codec::Decode;
use derive_where::derive_where;
use primitive_types::U256;
use serde::{Deserialize, Serialize};
/// An interface to call the legacy RPC methods. This interface is instantiated with
/// some `T: Config` trait which determines some of the types that the RPC methods will
/// take or hand back.
#[derive_where(Clone, Debug)]
pub struct LegacyRpcMethods<T> {
client: RpcClient,
_marker: std::marker::PhantomData<T>,
}
impl<T: Config> LegacyRpcMethods<T> {
/// Instantiate the legacy RPC method interface.
pub fn new(client: RpcClient) -> Self {
LegacyRpcMethods {
client,
_marker: std::marker::PhantomData,
}
}
/// Fetch the raw bytes for a given storage key
pub async fn state_get_storage(
&self,
key: &[u8],
hash: Option<T::Hash>,
) -> Result<Option<StorageData>, Error> {
let params = rpc_params![to_hex(key), hash];
let data: Option<Bytes> = self.client.request("state_getStorage", params).await?;
Ok(data.map(|b| b.0))
}
/// Returns the keys with prefix with pagination support.
/// Up to `count` keys will be returned.
/// If `start_key` is passed, return next keys in storage in lexicographic order.
pub async fn state_get_keys_paged(
&self,
key: &[u8],
count: u32,
start_key: Option<&[u8]>,
at: Option<T::Hash>,
) -> Result<Vec<StorageKey>, Error> {
let start_key = start_key.map(to_hex);
let params = rpc_params![to_hex(key), count, start_key, at];
let data: Vec<Bytes> = self.client.request("state_getKeysPaged", params).await?;
Ok(data.into_iter().map(|b| b.0).collect())
}
/// Query historical storage entries in the range from the start block to the end block,
/// defaulting the end block to the current best block if it's not given. The first
/// [`StorageChangeSet`] returned has all of the values for each key, and subsequent ones
/// only contain values for any keys which have changed since the last.
pub async fn state_query_storage(
&self,
keys: impl IntoIterator<Item = &[u8]>,
from: T::Hash,
to: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<T::Hash>>, Error> {
let keys: Vec<String> = keys.into_iter().map(to_hex).collect();
let params = rpc_params![keys, from, to];
self.client
.request("state_queryStorage", params)
.await
.map_err(Into::into)
}
/// Query storage entries at some block, using the best block if none is given.
/// This essentially provides a way to ask for a batch of values given a batch of keys,
/// despite the name of the [`StorageChangeSet`] type.
pub async fn state_query_storage_at(
&self,
keys: impl IntoIterator<Item = &[u8]>,
at: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<T::Hash>>, Error> {
let keys: Vec<String> = keys.into_iter().map(to_hex).collect();
let params = rpc_params![keys, at];
self.client
.request("state_queryStorageAt", params)
.await
.map_err(Into::into)
}
/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, Error> {
let block_zero = 0u32;
let params = rpc_params![block_zero];
let genesis_hash: Option<T::Hash> =
self.client.request("chain_getBlockHash", params).await?;
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}
/// Fetch the metadata via the legacy `state_getMetadata` RPC method.
pub async fn state_get_metadata(&self, at: Option<T::Hash>) -> Result<Metadata, Error> {
let bytes: Bytes = self
.client
.request("state_getMetadata", rpc_params![at])
.await?;
let metadata = Metadata::decode(&mut &bytes[..])?;
Ok(metadata)
}
/// Fetch system health
pub async fn system_health(&self) -> Result<SystemHealth, Error> {
self.client.request("system_health", rpc_params![]).await
}
/// Fetch system chain
pub async fn system_chain(&self) -> Result<String, Error> {
self.client.request("system_chain", rpc_params![]).await
}
/// Fetch system name
pub async fn system_name(&self) -> Result<String, Error> {
self.client.request("system_name", rpc_params![]).await
}
/// Fetch system version
pub async fn system_version(&self) -> Result<String, Error> {
self.client.request("system_version", rpc_params![]).await
}
/// Fetch system properties
pub async fn system_properties(&self) -> Result<SystemProperties, Error> {
self.client
.request("system_properties", rpc_params![])
.await
}
/// Fetch next nonce for an Account
///
/// Return account nonce adjusted for extrinsics currently in transaction pool
pub async fn system_account_next_index(&self, account_id: &T::AccountId) -> Result<u64, Error>
where
T::AccountId: Serialize,
{
self.client
.request("system_accountNextIndex", rpc_params![&account_id])
.await
}
/// Get a header
pub async fn chain_get_header(
&self,
hash: Option<T::Hash>,
) -> Result<Option<T::Header>, Error> {
let params = rpc_params![hash];
let header = self.client.request("chain_getHeader", params).await?;
Ok(header)
}
/// Get a block hash, returns hash of latest _best_ block by default.
pub async fn chain_get_block_hash(
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, Error> {
let params = rpc_params![block_number];
let block_hash = self.client.request("chain_getBlockHash", params).await?;
Ok(block_hash)
}
/// Get a block hash of the latest finalized block
pub async fn chain_get_finalized_head(&self) -> Result<T::Hash, Error> {
let hash = self
.client
.request("chain_getFinalizedHead", rpc_params![])
.await?;
Ok(hash)
}
/// Get a Block
pub async fn chain_get_block(
&self,
hash: Option<T::Hash>,
) -> Result<Option<BlockDetails<T>>, Error> {
let params = rpc_params![hash];
let block = self.client.request("chain_getBlock", params).await?;
Ok(block)
}
/// Reexecute the specified `block_hash` and gather statistics while doing so.
///
/// This function requires the specified block and its parent to be available
/// at the queried node. If either the specified block or the parent is pruned,
/// this function will return `None`.
pub async fn dev_get_block_stats(
&self,
block_hash: T::Hash,
) -> Result<Option<BlockStats>, Error> {
let params = rpc_params![block_hash];
let stats = self.client.request("dev_getBlockStats", params).await?;
Ok(stats)
}
/// Get proof of storage entries at a specific block's state.
pub async fn state_get_read_proof(
&self,
keys: impl IntoIterator<Item = &[u8]>,
hash: Option<T::Hash>,
) -> Result<ReadProof<T::Hash>, Error> {
let keys: Vec<String> = keys.into_iter().map(to_hex).collect();
let params = rpc_params![keys, hash];
let proof = self.client.request("state_getReadProof", params).await?;
Ok(proof)
}
/// Fetch the runtime version
pub async fn state_get_runtime_version(
&self,
at: Option<T::Hash>,
) -> Result<RuntimeVersion, Error> {
let params = rpc_params![at];
let version = self
.client
.request("state_getRuntimeVersion", params)
.await?;
Ok(version)
}
/// Subscribe to all new best block headers.
pub async fn chain_subscribe_new_heads(&self) -> Result<RpcSubscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
// Despite the name, this returns a stream of all new blocks
// imported by the node that happen to be added to the current best chain
// (ie all best blocks).
"chain_subscribeNewHeads",
rpc_params![],
"chain_unsubscribeNewHeads",
)
.await?;
Ok(subscription)
}
/// Subscribe to all new block headers.
pub async fn chain_subscribe_all_heads(&self) -> Result<RpcSubscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
// Despite the name, this returns a stream of all new blocks
// imported by the node that happen to be added to the current best chain
// (ie all best blocks).
"chain_subscribeAllHeads",
rpc_params![],
"chain_unsubscribeAllHeads",
)
.await?;
Ok(subscription)
}
/// Subscribe to finalized block headers.
///
/// Note: this may not produce _every_ block in the finalized chain;
/// sometimes multiple blocks are finalized at once, and in this case only the
/// latest one is returned. the higher level APIs that use this "fill in" the
/// gaps for us.
pub async fn chain_subscribe_finalized_heads(
&self,
) -> Result<RpcSubscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
"chain_subscribeFinalizedHeads",
rpc_params![],
"chain_unsubscribeFinalizedHeads",
)
.await?;
Ok(subscription)
}
/// Subscribe to runtime version updates that produce changes in the metadata.
/// The first item emitted by the stream is the current runtime version.
pub async fn state_subscribe_runtime_version(
&self,
) -> Result<RpcSubscription<RuntimeVersion>, Error> {
let subscription = self
.client
.subscribe(
"state_subscribeRuntimeVersion",
rpc_params![],
"state_unsubscribeRuntimeVersion",
)
.await?;
Ok(subscription)
}
/// Create and submit an extrinsic and return corresponding Hash if successful
pub async fn author_submit_extrinsic(&self, extrinsic: &[u8]) -> Result<T::Hash, Error> {
let params = rpc_params![to_hex(extrinsic)];
let xt_hash = self
.client
.request("author_submitExtrinsic", params)
.await?;
Ok(xt_hash)
}
/// Create and submit an extrinsic and return a subscription to the events triggered.
pub async fn author_submit_and_watch_extrinsic(
&self,
extrinsic: &[u8],
) -> Result<RpcSubscription<TransactionStatus<T::Hash>>, Error> {
let params = rpc_params![to_hex(extrinsic)];
let subscription = self
.client
.subscribe(
"author_submitAndWatchExtrinsic",
params,
"author_unwatchExtrinsic",
)
.await?;
Ok(subscription)
}
/// Insert a key into the keystore.
pub async fn author_insert_key(
&self,
key_type: String,
suri: String,
public: Vec<u8>,
) -> Result<(), Error> {
let params = rpc_params![key_type, suri, Bytes(public)];
self.client.request("author_insertKey", params).await
}
/// Generate new session keys and returns the corresponding public keys.
pub async fn author_rotate_keys(&self) -> Result<Vec<u8>, Error> {
let bytes: Bytes = self
.client
.request("author_rotateKeys", rpc_params![])
.await?;
Ok(bytes.0)
}
/// Checks if the keystore has private keys for the given session public keys.
///
/// `session_keys` is the SCALE encoded session keys object from the runtime.
///
/// Returns `true` if all private keys could be found.
pub async fn author_has_session_keys(&self, session_keys: Vec<u8>) -> Result<bool, Error> {
let params = rpc_params![Bytes(session_keys)];
self.client.request("author_hasSessionKeys", params).await
}
/// Checks if the keystore has private keys for the given public key and key type.
///
/// Returns `true` if a private key could be found.
pub async fn author_has_key(
&self,
public_key: Vec<u8>,
key_type: String,
) -> Result<bool, Error> {
let params = rpc_params![Bytes(public_key), key_type];
self.client.request("author_hasKey", params).await
}
/// Execute a runtime API call via `state_call` RPC method.
pub async fn state_call(
&self,
function: &str,
call_parameters: Option<&[u8]>,
at: Option<T::Hash>,
) -> Result<Vec<u8>, Error> {
let call_parameters = call_parameters.unwrap_or_default();
let bytes: Bytes = self
.client
.request(
"state_call",
rpc_params![function, to_hex(call_parameters), at],
)
.await?;
Ok(bytes.0)
}
/// Submits the extrinsic to the dry_run RPC, to test if it would succeed.
///
/// Returns a [`DryRunResult`], which is the result of performing the dry run.
pub async fn dry_run(
&self,
encoded_signed: &[u8],
at: Option<T::Hash>,
) -> Result<DryRunResultBytes, Error> {
let params = rpc_params![to_hex(encoded_signed), at];
let result_bytes: Bytes = self.client.request("system_dryRun", params).await?;
Ok(DryRunResultBytes(result_bytes.0))
}
}
/// Storage key.
pub type StorageKey = Vec<u8>;
/// Storage data.
pub type StorageData = Vec<u8>;
/// Health struct returned by the RPC
#[derive(Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SystemHealth {
/// Number of connected peers
pub peers: usize,
/// Is the node syncing
pub is_syncing: bool,
/// Should this node have any peers
///
/// Might be false for local chains or when running without discovery.
pub should_have_peers: bool,
}
/// System properties; an arbitrary JSON object.
pub type SystemProperties = serde_json::Map<String, serde_json::Value>;
/// A block number
pub type BlockNumber = NumberOrHex;
/// The response from `chain_getBlock`
#[derive(Debug, Deserialize)]
#[serde(bound = "T: Config")]
pub struct BlockDetails<T: Config> {
/// The block itself.
pub block: Block<T>,
/// Block justification.
pub justifications: Option<Vec<BlockJustification>>,
}
/// Block details in the [`BlockDetails`].
#[derive(Debug, Deserialize)]
pub struct Block<T: Config> {
/// The block header.
pub header: T::Header,
/// The accompanying extrinsics.
pub extrinsics: Vec<Bytes>,
}
/// An abstraction over justification for a block's validity under a consensus algorithm.
pub type BlockJustification = (ConsensusEngineId, EncodedJustification);
/// Consensus engine unique ID.
pub type ConsensusEngineId = [u8; 4];
/// The encoded justification specific to a consensus engine.
pub type EncodedJustification = Vec<u8>;
/// This contains the runtime version information necessary to make transactions, as obtained from
/// the RPC call `state_getRuntimeVersion`,
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RuntimeVersion {
/// Version of the runtime specification. A full-node will not attempt to use its native
/// runtime in substitute for the on-chain Wasm runtime unless all of `spec_name`,
/// `spec_version` and `authoring_version` are the same between Wasm and native.
pub spec_version: u32,
/// All existing dispatches are fully compatible when this number doesn't change. If this
/// number changes, then `spec_version` must change, also.
///
/// This number must change when an existing dispatchable (module ID, dispatch ID) is changed,
/// either through an alteration in its user-level semantics, a parameter
/// added/removed/changed, a dispatchable being removed, a module being removed, or a
/// dispatchable/module changing its index.
///
/// It need *not* change when a new module is added or when a dispatchable is added.
pub transaction_version: u32,
/// Fields unnecessary to Subxt are written out to this map.
#[serde(flatten)]
pub other: std::collections::HashMap<String, serde_json::Value>,
}
/// Possible transaction status events.
///
/// # Note
///
/// This is copied from `sp-transaction-pool` to avoid a dependency on that crate. Therefore it
/// must be kept compatible with that type from the target substrate version.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TransactionStatus<Hash> {
/// Transaction is part of the future queue.
Future,
/// Transaction is part of the ready queue.
Ready,
/// The transaction has been broadcast to the given peers.
Broadcast(Vec<String>),
/// Transaction has been included in block with given hash.
InBlock(Hash),
/// The block this transaction was included in has been retracted.
Retracted(Hash),
/// Maximum number of finality watchers has been reached,
/// old watchers are being removed.
FinalityTimeout(Hash),
/// Transaction has been finalized by a finality-gadget, e.g GRANDPA
Finalized(Hash),
/// Transaction has been replaced in the pool, by another transaction
/// that provides the same tags. (e.g. same (sender, nonce)).
Usurped(Hash),
/// Transaction has been dropped from the pool because of the limit.
Dropped,
/// Transaction is no longer valid in the current state.
Invalid,
}
/// The decoded result returned from calling `system_dryRun` on some extrinsic.
#[derive(Debug, PartialEq, Eq)]
pub enum DryRunResult {
/// The transaction could be included in the block and executed.
Success,
/// The transaction could be included in the block, but the call failed to dispatch.
DispatchError(crate::error::DispatchError),
/// The transaction could not be included in the block.
TransactionValidityError,
}
/// The bytes representing an error dry running an extrinsic. call [`DryRunResultBytes::into_dry_run_result`]
/// to attempt to decode this into something more meaningful.
pub struct DryRunResultBytes(pub Vec<u8>);
impl DryRunResultBytes {
/// Attempt to decode the error bytes into a [`DryRunResult`] using the provided [`Metadata`].
pub fn into_dry_run_result(
self,
metadata: &crate::metadata::Metadata,
) -> Result<DryRunResult, crate::Error> {
// dryRun returns an ApplyExtrinsicResult, which is basically a
// `Result<Result<(), DispatchError>, TransactionValidityError>`.
let bytes = self.0;
// We expect at least 2 bytes. In case we got a naff response back (or
// manually constructed this struct), just error to avoid a panic:
if bytes.len() < 2 {
return Err(crate::Error::Unknown(bytes));
}
if bytes[0] == 0 && bytes[1] == 0 {
// Ok(Ok(())); transaction is valid and executed ok
Ok(DryRunResult::Success)
} else if bytes[0] == 0 && bytes[1] == 1 {
// Ok(Err(dispatch_error)); transaction is valid but execution failed
let dispatch_error =
crate::error::DispatchError::decode_from(&bytes[2..], metadata.clone())?;
Ok(DryRunResult::DispatchError(dispatch_error))
} else if bytes[0] == 1 {
// Err(transaction_error); some transaction validity error (we ignore the details at the moment)
Ok(DryRunResult::TransactionValidityError)
} else {
// unable to decode the bytes; they aren't what we expect.
Err(crate::Error::Unknown(bytes))
}
}
}
/// Storage change set
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StorageChangeSet<Hash> {
/// Block hash
pub block: Hash,
/// A list of changes; tuples of storage key and optional storage data.
pub changes: Vec<(Bytes, Option<Bytes>)>,
}
/// Statistics of a block returned by the `dev_getBlockStats` RPC.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BlockStats {
/// The length in bytes of the storage proof produced by executing the block.
pub witness_len: u64,
/// The length in bytes of the storage proof after compaction.
pub witness_compact_len: u64,
/// Length of the block in bytes.
///
/// This information can also be acquired by downloading the whole block. This merely
/// saves some complexity on the client side.
pub block_len: u64,
/// Number of extrinsics in the block.
///
/// This information can also be acquired by downloading the whole block. This merely
/// saves some complexity on the client side.
pub num_extrinsics: u64,
}
/// ReadProof struct returned by the RPC
///
/// # Note
///
/// This is copied from `sc-rpc-api` to avoid a dependency on that crate. Therefore it
/// must be kept compatible with that type from the target substrate version.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadProof<Hash> {
/// Block hash used to generate the proof
pub at: Hash,
/// A proof used to prove that storage entries are included in the storage trie
pub proof: Vec<Bytes>,
}
/// A number type that can be serialized both as a number or a string that encodes a number in a
/// string.
///
/// We allow two representations of the block number as input. Either we deserialize to the type
/// that is specified in the block type or we attempt to parse given hex value.
///
/// The primary motivation for having this type is to avoid overflows when using big integers in
/// JavaScript (which we consider as an important RPC API consumer).
#[derive(Copy, Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum NumberOrHex {
/// The number represented directly.
Number(u64),
/// Hex representation of the number.
Hex(U256),
}
impl NumberOrHex {
/// Converts this number into an U256.
pub fn into_u256(self) -> U256 {
match self {
NumberOrHex::Number(n) => n.into(),
NumberOrHex::Hex(h) => h,
}
}
}
impl From<NumberOrHex> for U256 {
fn from(num_or_hex: NumberOrHex) -> U256 {
num_or_hex.into_u256()
}
}
macro_rules! into_number_or_hex {
($($t: ty)+) => {
$(
impl From<$t> for NumberOrHex {
fn from(x: $t) -> Self {
NumberOrHex::Number(x.into())
}
}
)+
}
}
into_number_or_hex!(u8 u16 u32 u64);
impl From<u128> for NumberOrHex {
fn from(n: u128) -> Self {
NumberOrHex::Hex(n.into())
}
}
impl From<U256> for NumberOrHex {
fn from(n: U256) -> Self {
NumberOrHex::Hex(n)
}
}
/// A quick helper to encode some bytes to hex.
fn to_hex(bytes: impl AsRef<[u8]>) -> String {
format!("0x{}", hex::encode(bytes.as_ref()))
}
/// Hex-serialized shim for `Vec<u8>`.
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Hash, PartialOrd, Ord, Debug)]
pub struct Bytes(#[serde(with = "impl_serde::serialize")] pub Vec<u8>);
impl std::ops::Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.0[..]
}
}
impl From<Vec<u8>> for Bytes {
fn from(s: Vec<u8>) -> Self {
Bytes(s)
}
}
+412 -785
View File
File diff suppressed because it is too large Load Diff
-68
View File
@@ -1,68 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::stream::{StreamExt, TryStreamExt};
use jsonrpsee::{
core::{
client::{Client, ClientT, SubscriptionClientT, SubscriptionKind},
traits::ToRpcParams,
},
types::SubscriptionId,
};
use serde_json::value::RawValue;
struct Params(Option<Box<RawValue>>);
impl ToRpcParams for Params {
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
Ok(self.0)
}
}
impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let res = ClientT::request(self, method, Params(params))
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
Ok(res)
})
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
Box::pin(async move {
let stream = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
self,
sub,
Params(params),
unsub,
)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match stream.kind() {
SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
Some(id.clone().into_owned())
}
_ => None,
};
let stream = stream
.map_err(|e| RpcError::ClientError(Box::new(e)))
.boxed();
Ok(RawRpcSubscription { stream, id })
})
}
}
-53
View File
@@ -1,53 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::stream::{StreamExt, TryStreamExt};
use serde_json::value::RawValue;
use subxt_lightclient::{LightClientRpc, LightClientRpcError};
impl RpcClientT for LightClientRpc {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let res = self.request(method.to_owned(), params)
.await
.map_err(lc_err_to_rpc_err)?;
Ok(res)
})
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
Box::pin(async move {
let sub = self.subscribe(sub.to_owned(), params, unsub.to_owned())
.await
.map_err(lc_err_to_rpc_err)?;
let id = Some(sub.id().to_owned());
let stream = sub
.map_err(|e| RpcError::ClientError(Box::new(e)))
.boxed();
Ok(RawRpcSubscription { id, stream })
})
}
}
fn lc_err_to_rpc_err(err: LightClientRpcError) -> RpcError {
match err {
LightClientRpcError::JsonRpcError(e) => RpcError::ClientError(Box::new(e)),
LightClientRpcError::SmoldotError(e) => RpcError::RequestRejected(e),
LightClientRpcError::BackgroundTaskDropped => RpcError::SubscriptionDropped,
}
}
-76
View File
@@ -1,76 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! RPC types and client for interacting with a substrate node.
//!
//! These are used behind the scenes by Subxt backend implementations, for
//! example [`crate::backend::legacy::LegacyBackend`]. If you need an RPC client,
//! then you can manually instantiate one, and then hand it to Subxt if you'd like
//! to re-use it for the Subxt connection.
//!
//! - [`RpcClientT`] is the underlying dynamic RPC implementation. This provides
//! the low level [`RpcClientT::request_raw`] and [`RpcClientT::subscribe_raw`]
//! methods.
//! - [`RpcClient`] is the higher level wrapper around this, offering
//! the [`RpcClient::request`] and [`RpcClient::subscribe`] methods.
//!
//! # Example
//!
//! Fetching the genesis hash.
//!
//! ```no_run
//! # #[tokio::main]
//! # async fn main() {
//! use subxt::{
//! client::OnlineClient,
//! config::SubstrateConfig,
//! backend::rpc::RpcClient,
//! backend::legacy::LegacyRpcMethods,
//! };
//!
//! // Instantiate a default RPC client pointing at some URL.
//! let rpc_client = RpcClient::from_url("ws://localhost:9944")
//! .await
//! .unwrap();
//!
//! // Instantiate the legacy RPC interface, providing an appropriate
//! // config so that it uses the correct types for your chain.
//! let rpc_methods = LegacyRpcMethods::<SubstrateConfig>::new(rpc_client.clone());
//!
//! // Use it to make RPC calls, here using the legacy genesis_hash method.
//! let genesis_hash = rpc_methods
//! .genesis_hash()
//! .await
//! .unwrap();
//!
//! println!("{genesis_hash}");
//!
//! // Instantiate the Subxt interface using the same client and config if you
//! // want to reuse the same connection:
//! let client = OnlineClient::<SubstrateConfig>::from_rpc_client(rpc_client);
//! # }
//! ```
// Allow an `rpc.rs` file in the `rpc` folder to align better
// with other file names for their types.
#![allow(clippy::module_inception)]
crate::macros::cfg_jsonrpsee! {
mod jsonrpsee_impl;
}
crate::macros::cfg_unstable_light_client! {
mod lightclient_impl;
}
crate::macros::cfg_reconnecting_rpc_client! {
/// reconnecting rpc client.
pub mod reconnecting_rpc_client;
}
mod rpc_client;
mod rpc_client_t;
pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription};
pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};
@@ -1,652 +0,0 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! # reconnecting-jsonrpsee-ws-client
//!
//! A simple reconnecting JSON-RPC WebSocket client for subxt which
//! automatically reconnects when the connection is lost but
//! it doesn't retain subscriptions and pending method calls when it reconnects.
//!
//! The logic which action to take for individual calls and subscriptions are
//! handled by the subxt backend implementations.
//!
//! # Example
//!
//! ```no_run
//! use std::time::Duration;
//! use futures::StreamExt;
//! use subxt::backend::rpc::reconnecting_rpc_client::{RpcClient, ExponentialBackoff};
//! use subxt::{OnlineClient, PolkadotConfig};
//!
//! #[tokio::main]
//! async fn main() {
//! let rpc = RpcClient::builder()
//! .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
//! .build("ws://localhost:9944".to_string())
//! .await
//! .unwrap();
//!
//! let subxt_client: OnlineClient<PolkadotConfig> = OnlineClient::from_rpc_client(rpc.clone()).await.unwrap();
//! let mut blocks_sub = subxt_client.blocks().subscribe_finalized().await.unwrap();
//!
//! while let Some(block) = blocks_sub.next().await {
//! let block = match block {
//! Ok(b) => b,
//! Err(e) => {
//! if e.is_disconnected_will_reconnect() {
//! println!("The RPC connection was lost and we may have missed a few blocks");
//! continue;
//! } else {
//! panic!("Error: {}", e);
//! }
//! }
//! };
//! println!("Block #{} ({})", block.number(), block.hash());
//! }
//! }
//! ```
mod platform;
#[cfg(test)]
mod tests;
mod utils;
use std::{
pin::Pin,
sync::Arc,
task::{self, Poll},
time::Duration,
};
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError as SubxtRpcError;
use finito::Retry;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use jsonrpsee::core::{
client::{
Client as WsClient, ClientT, Subscription as RpcSubscription, SubscriptionClientT,
SubscriptionKind,
},
traits::ToRpcParams,
};
use platform::spawn;
use serde_json::value::RawValue;
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot, Notify,
};
use url::Url;
use utils::display_close_reason;
// re-exports
pub use finito::{ExponentialBackoff, FibonacciBackoff, FixedInterval};
pub use jsonrpsee::core::client::IdKind;
pub use jsonrpsee::{core::client::error::Error as RpcError, rpc_params, types::SubscriptionId};
#[cfg(feature = "native")]
pub use jsonrpsee::ws_client::{HeaderMap, PingConfig};
const LOG_TARGET: &str = "subxt-reconnecting-rpc-client";
/// Method result.
pub type MethodResult = Result<Box<RawValue>, Error>;
/// Subscription result.
pub type SubscriptionResult = Result<Box<RawValue>, DisconnectedWillReconnect>;
/// The connection was closed, reconnect initiated and the subscription was dropped.
#[derive(Debug, thiserror::Error)]
#[error("The connection was closed because of `{0:?}` and reconnect initiated")]
pub struct DisconnectedWillReconnect(String);
/// New-type pattern which implements [`ToRpcParams`] that is required by jsonrpsee.
#[derive(Debug, Clone)]
struct RpcParams(Option<Box<RawValue>>);
impl ToRpcParams for RpcParams {
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
Ok(self.0)
}
}
#[derive(Debug)]
enum Op {
Call {
method: String,
params: RpcParams,
send_back: oneshot::Sender<MethodResult>,
},
Subscription {
subscribe_method: String,
params: RpcParams,
unsubscribe_method: String,
send_back: oneshot::Sender<Result<Subscription, Error>>,
},
}
/// Error that can occur when for a RPC call or subscription.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The client was dropped by the user.
#[error("The client was dropped")]
Dropped,
/// The connection was closed and reconnect initiated.
#[error(transparent)]
DisconnectedWillReconnect(#[from] DisconnectedWillReconnect),
/// Other rpc error.
#[error(transparent)]
RpcError(RpcError),
}
/// Represent a single subscription.
pub struct Subscription {
id: SubscriptionId<'static>,
stream: mpsc::UnboundedReceiver<SubscriptionResult>,
}
impl Subscription {
/// Returns the next notification from the stream.
/// This may return `None` if the subscription has been terminated,
/// which may happen if the channel becomes full or is dropped.
///
/// **Note:** This has an identical signature to the [`StreamExt::next`]
/// method (and delegates to that). Import [`StreamExt`] if you'd like
/// access to other stream combinator methods.
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<SubscriptionResult> {
StreamExt::next(self).await
}
/// Get the subscription ID.
pub fn id(&self) -> SubscriptionId<'static> {
self.id.clone()
}
}
impl Stream for Subscription {
type Item = SubscriptionResult;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
match self.stream.poll_recv(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(msg)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("id", &self.id)
.finish()
}
}
/// JSON-RPC client that reconnects automatically and may loose
/// subscription notifications when it reconnects.
#[derive(Clone, Debug)]
pub struct RpcClient {
tx: mpsc::UnboundedSender<Op>,
}
/// Builder for [`Client`].
#[derive(Clone, Debug)]
pub struct RpcClientBuilder<P> {
max_request_size: u32,
max_response_size: u32,
retry_policy: P,
#[cfg(feature = "native")]
ping_config: Option<PingConfig>,
#[cfg(feature = "native")]
// web doesn't support custom headers
// https://stackoverflow.com/a/4361358/6394734
headers: HeaderMap,
max_redirections: u32,
id_kind: IdKind,
max_log_len: u32,
max_concurrent_requests: u32,
request_timeout: Duration,
connection_timeout: Duration,
}
impl Default for RpcClientBuilder<ExponentialBackoff> {
fn default() -> Self {
Self {
max_request_size: 10 * 1024 * 1024,
max_response_size: 10 * 1024 * 1024,
retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)),
#[cfg(feature = "native")]
ping_config: Some(PingConfig::new()),
#[cfg(feature = "native")]
headers: HeaderMap::new(),
max_redirections: 5,
id_kind: IdKind::Number,
max_log_len: 1024,
max_concurrent_requests: 1024,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
}
}
}
impl RpcClientBuilder<ExponentialBackoff> {
/// Create a new builder.
pub fn new() -> Self {
Self::default()
}
}
impl<P> RpcClientBuilder<P>
where
P: Iterator<Item = Duration> + Send + Sync + 'static + Clone,
{
/// Configure the min response size a for websocket message.
///
/// Default: 10MB
pub fn max_request_size(mut self, max: u32) -> Self {
self.max_request_size = max;
self
}
/// Configure the max response size a for websocket message.
///
/// Default: 10MB
pub fn max_response_size(mut self, max: u32) -> Self {
self.max_response_size = max;
self
}
/// Set the max number of redirections to perform until a connection is regarded as failed.
///
/// Default: 5
pub fn max_redirections(mut self, redirect: u32) -> Self {
self.max_redirections = redirect;
self
}
/// Configure how many concurrent method calls are allowed.
///
/// Default: 1024
pub fn max_concurrent_requests(mut self, max: u32) -> Self {
self.max_concurrent_requests = max;
self
}
/// Configure how long until a method call is regarded as failed.
///
/// Default: 1 minute
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Set connection timeout for the WebSocket handshake
///
/// Default: 10 seconds
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
/// Configure the data type of the request object ID
///
/// Default: number
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}
/// Set maximum length for logging calls and responses.
/// Logs bigger than this limit will be truncated.
///
/// Default: 1024
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_len = max;
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Configure custom headers to use in the WebSocket handshake.
pub fn set_headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers;
self
}
/// Configure which retry policy to use when a connection is lost.
///
/// Default: Exponential backoff 10ms
pub fn retry_policy<T>(self, retry_policy: T) -> RpcClientBuilder<T> {
RpcClientBuilder {
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
retry_policy,
#[cfg(feature = "native")]
ping_config: self.ping_config,
#[cfg(feature = "native")]
headers: self.headers,
max_redirections: self.max_redirections,
max_log_len: self.max_log_len,
id_kind: self.id_kind,
max_concurrent_requests: self.max_concurrent_requests,
request_timeout: self.request_timeout,
connection_timeout: self.connection_timeout,
}
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Configure the WebSocket ping/pong interval.
///
/// Default: 30 seconds.
pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self {
self.ping_config = Some(ping_config);
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Disable WebSocket ping/pongs.
///
/// Default: 30 seconds.
pub fn disable_ws_ping(mut self) -> Self {
self.ping_config = None;
self
}
/// Build and connect to the target.
pub async fn build(self, url: impl AsRef<str>) -> Result<RpcClient, RpcError> {
let url = Url::parse(url.as_ref()).map_err(|e| RpcError::Transport(Box::new(e)))?;
let (tx, rx) = mpsc::unbounded_channel();
let client = Retry::new(self.retry_policy.clone(), || {
platform::ws_client(&url, &self)
})
.await?;
platform::spawn(background_task(client, rx, url, self));
Ok(RpcClient { tx })
}
}
impl RpcClient {
/// Create a builder.
pub fn builder() -> RpcClientBuilder<ExponentialBackoff> {
RpcClientBuilder::new()
}
/// Perform a JSON-RPC method call.
pub async fn request(
&self,
method: String,
params: Option<Box<RawValue>>,
) -> Result<Box<RawValue>, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Op::Call {
method,
params: RpcParams(params),
send_back: tx,
})
.map_err(|_| Error::Dropped)?;
rx.await.map_err(|_| Error::Dropped)?
}
/// Perform a JSON-RPC subscription.
pub async fn subscribe(
&self,
subscribe_method: String,
params: Option<Box<RawValue>>,
unsubscribe_method: String,
) -> Result<Subscription, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Op::Subscription {
subscribe_method,
params: RpcParams(params),
unsubscribe_method,
send_back: tx,
})
.map_err(|_| Error::Dropped)?;
rx.await.map_err(|_| Error::Dropped)?
}
}
impl RpcClientT for RpcClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request(method.to_string(), params)
.await
.map_err(|e| match e {
Error::DisconnectedWillReconnect(e) => {
SubxtRpcError::DisconnectedWillReconnect(e.to_string())
}
Error::Dropped => SubxtRpcError::ClientError(Box::new(e)),
Error::RpcError(e) => SubxtRpcError::ClientError(Box::new(e)),
})
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| SubxtRpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
// NOTE: The stream emits only one error `DisconnectWillReconnect if the connection was lost
// and safe to wrap it in a `SubxtRpcError::DisconnectWillReconnect` here
.map_err(|e: DisconnectedWillReconnect| {
SubxtRpcError::DisconnectedWillReconnect(e.to_string())
})
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
async fn background_task<P>(
mut client: Arc<WsClient>,
mut rx: UnboundedReceiver<Op>,
url: Url,
client_builder: RpcClientBuilder<P>,
) where
P: Iterator<Item = Duration> + Send + 'static + Clone,
{
let disconnect = Arc::new(tokio::sync::Notify::new());
loop {
tokio::select! {
// An incoming JSON-RPC call to dispatch.
next_message = rx.recv() => {
match next_message {
None => break,
Some(op) => {
spawn(dispatch_call(client.clone(), op, disconnect.clone()));
}
};
}
// The connection was terminated and try to reconnect.
_ = client.on_disconnect() => {
let params = ReconnectParams {
url: &url,
client_builder: &client_builder,
close_reason: client.disconnect_reason().await,
};
client = match reconnect(params).await {
Ok(client) => client,
Err(e) => {
tracing::debug!(target: LOG_TARGET, "Failed to reconnect: {e}; terminating the connection");
break;
}
};
}
}
}
disconnect.notify_waiters();
}
async fn dispatch_call(client: Arc<WsClient>, op: Op, on_disconnect: Arc<tokio::sync::Notify>) {
match op {
Op::Call {
method,
params,
send_back,
} => {
match client.request::<Box<RawValue>, _>(&method, params).await {
Ok(rp) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Ok(rp));
}
Err(RpcError::RestartNeeded(e)) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Err(DisconnectedWillReconnect(e.to_string()).into()));
}
Err(e) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Err(Error::RpcError(e)));
}
}
}
Op::Subscription {
subscribe_method,
params,
unsubscribe_method,
send_back,
} => {
match client
.subscribe::<Box<RawValue>, _>(
&subscribe_method,
params.clone(),
&unsubscribe_method,
)
.await
{
Ok(sub) => {
let (tx, rx) = mpsc::unbounded_channel();
let sub_id = match sub.kind() {
SubscriptionKind::Subscription(id) => id.clone().into_owned(),
_ => unreachable!("No method subscriptions possible in this crate; qed"),
};
platform::spawn(subscription_handler(
tx.clone(),
sub,
on_disconnect.clone(),
client.clone(),
));
let stream = Subscription {
id: sub_id,
stream: rx,
};
// Fails only if the request is dropped by the client.
let _ = send_back.send(Ok(stream));
}
Err(RpcError::RestartNeeded(e)) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Err(DisconnectedWillReconnect(e.to_string()).into()));
}
Err(e) => {
// Fails only if the request is dropped.
let _ = send_back.send(Err(Error::RpcError(e)));
}
}
}
}
}
/// Handler for each individual subscription.
async fn subscription_handler(
sub_tx: UnboundedSender<SubscriptionResult>,
mut rpc_sub: RpcSubscription<Box<RawValue>>,
client_closed: Arc<Notify>,
client: Arc<WsClient>,
) {
loop {
tokio::select! {
next_msg = rpc_sub.next() => {
let Some(notif) = next_msg else {
let close = client.disconnect_reason().await;
_ = sub_tx.send(Err(DisconnectedWillReconnect(close.to_string())));
break;
};
let msg = notif.expect("RawValue is valid JSON; qed");
// Fails only if subscription was closed by the user.
if sub_tx.send(Ok(msg)).is_err() {
break;
}
}
// This channel indices whether the subscription was closed by user.
_ = sub_tx.closed() => {
break;
}
// This channel indicates whether the main task has been closed.
// at this point no further messages are processed.
_ = client_closed.notified() => {
break;
}
}
}
}
struct ReconnectParams<'a, P> {
url: &'a Url,
client_builder: &'a RpcClientBuilder<P>,
close_reason: RpcError,
}
async fn reconnect<P>(params: ReconnectParams<'_, P>) -> Result<Arc<WsClient>, RpcError>
where
P: Iterator<Item = Duration> + Send + 'static + Clone,
{
let ReconnectParams {
url,
client_builder,
close_reason,
} = params;
let retry_policy = client_builder.retry_policy.clone();
tracing::debug!(target: LOG_TARGET, "Connection to {url} was closed: `{}`; starting to reconnect", display_close_reason(&close_reason));
let client = Retry::new(retry_policy.clone(), || {
platform::ws_client(url, client_builder)
})
.await?;
tracing::debug!(target: LOG_TARGET, "Connection to {url} was successfully re-established");
Ok(client)
}
@@ -1,84 +0,0 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::backend::rpc::reconnecting_rpc_client::{RpcClientBuilder, RpcError};
use jsonrpsee::core::client::Client;
use std::sync::Arc;
use url::Url;
#[cfg(feature = "native")]
pub use tokio::spawn;
#[cfg(feature = "web")]
pub use wasm_bindgen_futures::spawn_local as spawn;
#[cfg(feature = "native")]
pub async fn ws_client<P>(
url: &Url,
builder: &RpcClientBuilder<P>,
) -> Result<Arc<Client>, RpcError> {
use jsonrpsee::ws_client::WsClientBuilder;
let RpcClientBuilder {
max_request_size,
max_response_size,
ping_config,
headers,
max_redirections,
id_kind,
max_concurrent_requests,
max_log_len,
request_timeout,
connection_timeout,
..
} = builder;
let mut ws_client_builder = WsClientBuilder::new()
.max_request_size(*max_request_size)
.max_response_size(*max_response_size)
.set_headers(headers.clone())
.max_redirections(*max_redirections as usize)
.max_buffer_capacity_per_subscription(tokio::sync::Semaphore::MAX_PERMITS)
.max_concurrent_requests(*max_concurrent_requests as usize)
.set_max_logging_length(*max_log_len)
.set_tcp_no_delay(true)
.request_timeout(*request_timeout)
.connection_timeout(*connection_timeout)
.id_format(*id_kind);
if let Some(ping) = ping_config {
ws_client_builder = ws_client_builder.enable_ws_ping(*ping);
}
let client = ws_client_builder.build(url.as_str()).await?;
Ok(Arc::new(client))
}
#[cfg(feature = "web")]
pub async fn ws_client<P>(
url: &Url,
builder: &RpcClientBuilder<P>,
) -> Result<Arc<Client>, RpcError> {
use jsonrpsee::wasm_client::WasmClientBuilder;
let RpcClientBuilder {
id_kind,
max_concurrent_requests,
max_log_len,
request_timeout,
..
} = builder;
let ws_client_builder = WasmClientBuilder::new()
.max_buffer_capacity_per_subscription(tokio::sync::Semaphore::MAX_PERMITS)
.max_concurrent_requests(*max_concurrent_requests as usize)
.set_max_logging_length(*max_log_len)
.request_timeout(*request_timeout)
.id_format(*id_kind);
let client = ws_client_builder.build(url.as_str()).await?;
Ok(Arc::new(client))
}
@@ -1,270 +0,0 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::*;
use futures::{future::Either, FutureExt};
use jsonrpsee::core::BoxError;
use jsonrpsee::server::{
http, stop_channel, ws, ConnectionGuard, ConnectionState, HttpRequest, HttpResponse, RpcModule,
RpcServiceBuilder, ServerConfig, SubscriptionMessage,
};
#[tokio::test]
async fn call_works() {
let (_handle, addr) = run_server().await.unwrap();
let client = RpcClient::builder().build(addr).await.unwrap();
assert!(client.request("say_hello".to_string(), None).await.is_ok(),)
}
#[tokio::test]
async fn sub_works() {
let (_handle, addr) = run_server().await.unwrap();
let client = RpcClient::builder()
.retry_policy(ExponentialBackoff::from_millis(50))
.build(addr)
.await
.unwrap();
let mut sub = client
.subscribe(
"subscribe_lo".to_string(),
None,
"unsubscribe_lo".to_string(),
)
.await
.unwrap();
assert!(sub.next().await.is_some());
}
#[tokio::test]
async fn sub_with_reconnect() {
let (handle, addr) = run_server().await.unwrap();
let client = RpcClient::builder().build(addr.clone()).await.unwrap();
let mut sub = client
.subscribe(
"subscribe_lo".to_string(),
None,
"unsubscribe_lo".to_string(),
)
.await
.unwrap();
let _ = handle.send(());
// Hack to wait for the server to restart.
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(matches!(sub.next().await, Some(Ok(_))));
assert!(matches!(
sub.next().await,
Some(Err(DisconnectedWillReconnect(_)))
));
// Restart the server.
let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap();
// Hack to wait for the server to restart.
tokio::time::sleep(Duration::from_millis(100)).await;
// Subscription should work after reconnect.
let mut sub = client
.subscribe(
"subscribe_lo".to_string(),
None,
"unsubscribe_lo".to_string(),
)
.await
.unwrap();
assert!(matches!(sub.next().await, Some(Ok(_))));
}
#[tokio::test]
async fn call_with_reconnect() {
let (handle, addr) = run_server_with_settings(None, true).await.unwrap();
let client = Arc::new(RpcClient::builder().build(addr.clone()).await.unwrap());
let req_fut = client.request("say_hello".to_string(), None).boxed();
let timeout_fut = tokio::time::sleep(Duration::from_secs(5));
// If the call isn't replied in 5 secs then it's regarded as it's still pending.
let req_fut = match futures::future::select(Box::pin(timeout_fut), req_fut).await {
Either::Left((_, f)) => f,
Either::Right(_) => panic!("RPC call finished"),
};
// Close the connection with a pending call.
let _ = handle.send(());
// Restart the server
let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap();
// Hack to wait for the server to restart.
tokio::time::sleep(Duration::from_millis(100)).await;
// This call should fail because reconnect.
assert!(req_fut.await.is_err());
// Future call should work after reconnect.
assert!(client.request("say_hello".to_string(), None).await.is_ok());
}
async fn run_server() -> Result<(tokio::sync::broadcast::Sender<()>, String), BoxError> {
run_server_with_settings(None, false).await
}
async fn run_server_with_settings(
url: Option<&str>,
dont_respond_to_method_calls: bool,
) -> Result<(tokio::sync::broadcast::Sender<()>, String), BoxError> {
use jsonrpsee::server::HttpRequest;
let sockaddr = match url {
Some(url) => url.strip_prefix("ws://").unwrap(),
None => "127.0.0.1:0",
};
let mut i = 0;
let listener = loop {
if let Ok(l) = tokio::net::TcpListener::bind(sockaddr).await {
break l;
}
tokio::time::sleep(Duration::from_millis(100)).await;
if i >= 10 {
panic!("Addr already in use");
}
i += 1;
};
let mut module = RpcModule::new(());
if dont_respond_to_method_calls {
module.register_async_method("say_hello", |_, _, _| async {
futures::future::pending::<()>().await;
"timeout"
})?;
} else {
module.register_async_method("say_hello", |_, _, _| async { "lo" })?;
}
module.register_subscription(
"subscribe_lo",
"subscribe_lo",
"unsubscribe_lo",
|_params, pending, _ctx, _| async move {
let sink = pending.accept().await.unwrap();
let i = 0;
loop {
if sink
.send(SubscriptionMessage::from_json(&i).unwrap())
.await
.is_err()
{
break;
}
tokio::time::sleep(std::time::Duration::from_secs(6)).await;
}
},
)?;
let (tx, mut rx) = tokio::sync::broadcast::channel(4);
let tx2 = tx.clone();
let (stop_handle, server_handle) = stop_channel();
let addr = listener.local_addr().expect("Could not find local addr");
tokio::spawn(async move {
loop {
let sock = tokio::select! {
res = listener.accept() => {
match res {
Ok((stream, _remote_addr)) => stream,
Err(e) => {
tracing::error!("Failed to accept connection: {:?}", e);
continue;
}
}
}
_ = rx.recv() => {
break
}
};
let module = module.clone();
let rx2 = tx2.subscribe();
let tx2 = tx2.clone();
let stop_handle2 = stop_handle.clone();
let svc = tower::service_fn(move |req: HttpRequest<hyper::body::Incoming>| {
let module = module.clone();
let tx = tx2.clone();
let stop_handle = stop_handle2.clone();
let conn_permit = ConnectionGuard::new(1).try_acquire().unwrap();
if ws::is_upgrade_request(&req) {
let rpc_service = RpcServiceBuilder::new();
let conn = ConnectionState::new(stop_handle, 1, conn_permit);
async move {
let mut rx = tx.subscribe();
let (rp, conn_fut) =
ws::connect(req, ServerConfig::default(), module, conn, rpc_service)
.await
.unwrap();
tokio::spawn(async move {
tokio::select! {
_ = conn_fut => (),
_ = rx.recv() => {},
}
});
Ok::<_, BoxError>(rp)
}
.boxed()
} else {
async { Ok(http::response::denied()) }.boxed()
}
});
tokio::spawn(serve_with_graceful_shutdown(sock, svc, rx2));
}
drop(server_handle);
});
Ok((tx, format!("ws://{}", addr)))
}
async fn serve_with_graceful_shutdown<S, B, I>(
io: I,
service: S,
mut rx: tokio::sync::broadcast::Receiver<()>,
) where
S: tower::Service<HttpRequest<hyper::body::Incoming>, Response = HttpResponse<B>>
+ Clone
+ Send
+ 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
if let Err(e) =
jsonrpsee::server::serve_with_graceful_shutdown(io, service, rx.recv().map(|_| ())).await
{
tracing::error!("Error while serving: {:?}", e);
}
}
@@ -1,14 +0,0 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Utils.
use crate::backend::rpc::reconnecting_rpc_client::RpcError;
pub fn display_close_reason(err: &RpcError) -> String {
match err {
RpcError::RestartNeeded(e) => e.to_string(),
other => other.to_string(),
}
}
-292
View File
@@ -1,292 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcSubscription, RpcClientT};
use crate::error::Error;
use futures::{Stream, StreamExt};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::value::RawValue;
use std::{pin::Pin, sync::Arc, task::Poll};
/// A concrete wrapper around an [`RpcClientT`] which provides some higher level helper methods,
/// is cheaply cloneable, and can be handed to things like [`crate::client::OnlineClient`] to
/// instantiate it.
#[derive(Clone)]
pub struct RpcClient {
client: Arc<dyn RpcClientT>,
}
impl RpcClient {
#[cfg(feature = "jsonrpsee")]
#[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))]
/// Create a default RPC client pointed at some URL, currently based on [`jsonrpsee`].
///
/// Errors if an insecure URL is provided. In this case, use [`RpcClient::from_insecure_url`] instead.
pub async fn from_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
crate::utils::validate_url_is_secure(url.as_ref())?;
RpcClient::from_insecure_url(url).await
}
#[cfg(feature = "jsonrpsee")]
/// Create a default RPC client pointed at some URL, currently based on [`jsonrpsee`].
///
/// Allows insecure URLs without SSL encryption, e.g. (http:// and ws:// URLs).
pub async fn from_insecure_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
let client = jsonrpsee_helpers::client(url.as_ref())
.await
.map_err(|e| crate::error::RpcError::ClientError(Box::new(e)))?;
Ok(Self::new(client))
}
/// Create a new [`RpcClient`] from an arbitrary [`RpcClientT`] implementation.
pub fn new<R: RpcClientT>(client: R) -> Self {
RpcClient {
client: Arc::new(client),
}
}
/// Make an RPC request, given a method name and some parameters.
///
/// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to
/// construct the parameters.
pub async fn request<Res: DeserializeOwned>(
&self,
method: &str,
params: RpcParams,
) -> Result<Res, Error> {
let res = self.client.request_raw(method, params.build()).await?;
let val = serde_json::from_str(res.get())?;
Ok(val)
}
/// Subscribe to an RPC endpoint, providing the parameters and the method to call to
/// unsubscribe from it again.
///
/// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to
/// construct the parameters.
pub async fn subscribe<Res: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
unsub: &str,
) -> Result<RpcSubscription<Res>, Error> {
let sub = self
.client
.subscribe_raw(sub, params.build(), unsub)
.await?;
Ok(RpcSubscription::new(sub))
}
}
impl<C: RpcClientT> From<C> for RpcClient {
fn from(client: C) -> Self {
RpcClient::new(client)
}
}
impl std::fmt::Debug for RpcClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RpcClient").finish()
}
}
impl std::ops::Deref for RpcClient {
type Target = dyn RpcClientT;
fn deref(&self) -> &Self::Target {
&*self.client
}
}
/// Create some [`RpcParams`] to pass to our [`RpcClient`]. [`RpcParams`]
/// simply enforces that parameters handed to our [`RpcClient`] methods
/// are the correct shape.
///
/// As with the [`serde_json::json!`] macro, this will panic if you provide
/// parameters which cannot successfully be serialized to JSON.
///
/// # Example
///
/// ```rust
/// use subxt::backend::rpc::{ rpc_params, RpcParams };
///
/// // If you provide no params you get `None` back
/// let params: RpcParams = rpc_params![];
/// assert!(params.build().is_none());
///
/// // If you provide params you get `Some<Box<RawValue>>` back.
/// let params: RpcParams = rpc_params![1, true, "foo"];
/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]");
/// ```
#[macro_export]
macro_rules! rpc_params {
($($p:expr), *) => {{
// May be unused if empty; no params.
#[allow(unused_mut)]
let mut params = $crate::backend::rpc::RpcParams::new();
$(
params.push($p).expect("values passed to rpc_params! must be serializable to JSON");
)*
params
}}
}
pub use rpc_params;
/// This represents the parameters passed to an [`RpcClient`], and exists to
/// enforce that parameters are provided in the correct format.
///
/// Prefer to use the [`rpc_params!`] macro for simpler creation of these.
///
/// # Example
///
/// ```rust
/// use subxt::backend::rpc::RpcParams;
///
/// let mut params = RpcParams::new();
/// params.push(1).unwrap();
/// params.push(true).unwrap();
/// params.push("foo").unwrap();
///
/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]");
/// ```
#[derive(Debug, Clone, Default)]
pub struct RpcParams(Vec<u8>);
impl RpcParams {
/// Create a new empty set of [`RpcParams`].
pub fn new() -> Self {
Self(Vec::new())
}
/// Push a parameter into our [`RpcParams`]. This serializes it to JSON
/// in the process, and so will return an error if this is not possible.
pub fn push<P: Serialize>(&mut self, param: P) -> Result<(), Error> {
if self.0.is_empty() {
self.0.push(b'[');
} else {
self.0.push(b',')
}
serde_json::to_writer(&mut self.0, &param)?;
Ok(())
}
/// Build a [`RawValue`] from our params, returning `None` if no parameters
/// were provided.
pub fn build(mut self) -> Option<Box<RawValue>> {
if self.0.is_empty() {
None
} else {
self.0.push(b']');
let s = unsafe { String::from_utf8_unchecked(self.0) };
Some(RawValue::from_string(s).expect("Should be valid JSON"))
}
}
}
/// A generic RPC Subscription. This implements [`Stream`], and so most of
/// the functionality you'll need to interact with it comes from the
/// [`StreamExt`] extension trait.
pub struct RpcSubscription<Res> {
inner: RawRpcSubscription,
_marker: std::marker::PhantomData<Res>,
}
impl<Res> std::fmt::Debug for RpcSubscription<Res> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RpcSubscription")
.field("inner", &"RawRpcSubscription")
.field("_marker", &self._marker)
.finish()
}
}
impl<Res> RpcSubscription<Res> {
/// Creates a new [`RpcSubscription`].
pub fn new(inner: RawRpcSubscription) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
/// Obtain the ID associated with this subscription.
pub fn subscription_id(&self) -> Option<&str> {
self.inner.id.as_deref()
}
}
impl<Res: DeserializeOwned> RpcSubscription<Res> {
/// Returns the next item in the stream. This is just a wrapper around
/// [`StreamExt::next()`] so that you can avoid the extra import.
pub async fn next(&mut self) -> Option<Result<Res, Error>> {
StreamExt::next(self).await
}
}
impl<Res> std::marker::Unpin for RpcSubscription<Res> {}
impl<Res: DeserializeOwned> Stream for RpcSubscription<Res> {
type Item = Result<Res, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = futures::ready!(self.inner.stream.poll_next_unpin(cx));
// Decode the inner RawValue to the type we're expecting and map
// any errors to the right shape:
let res = res.map(|r| {
r.map_err(|e| e.into())
.and_then(|raw_val| serde_json::from_str(raw_val.get()).map_err(|e| e.into()))
});
Poll::Ready(res)
}
}
// helpers for a jsonrpsee specific RPC client.
#[cfg(all(feature = "jsonrpsee", feature = "native"))]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{self, EitherStream, Url, WsTransportClientBuilder},
core::client::{Client, Error},
};
use tokio_util::compat::Compat;
pub type Sender = ws::Sender<Compat<EitherStream>>;
pub type Receiver = ws::Receiver<Compat<EitherStream>>;
/// Build WS RPC client from URL
pub async fn client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(Client::builder()
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(sender, receiver))
}
async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url = Url::parse(url).map_err(|e| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}
// helpers for a jsonrpsee specific RPC client.
#[cfg(all(feature = "jsonrpsee", feature = "web", target_arch = "wasm32"))]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::web,
core::client::{Client, ClientBuilder, Error},
};
/// Build web RPC client from URL
pub async fn client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = web::connect(url)
.await
.map_err(|e| Error::Transport(e.into()))?;
Ok(ClientBuilder::default()
.max_buffer_capacity_per_subscription(4096)
.build_with_wasm(sender, receiver))
}
}
-103
View File
@@ -1,103 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::error::RpcError;
use futures::Stream;
use std::{future::Future, pin::Pin};
// Re-exporting for simplicity since it's used a bunch in the trait definition.
pub use serde_json::value::RawValue;
/// A trait describing low level JSON-RPC interactions. Implementations of this can be
/// used to instantiate a [`super::RpcClient`], which can be passed to [`crate::OnlineClient`]
/// or used for lower level RPC calls via eg [`crate::backend::legacy::LegacyRpcMethods`].
///
/// This is a low level interface whose methods expect an already-serialized set of params,
/// and return an owned but still-serialized [`RawValue`], deferring deserialization to
/// the caller. This is the case because we want the methods to be object-safe (which prohibits
/// generics), and want to avoid any unnecessary allocations in serializing/deserializing
/// parameters.
///
/// # Panics
///
/// Implementations are free to panic if the `RawValue`'s passed to `request_raw` or
/// `subscribe_raw` are not JSON arrays. Internally, we ensure that this is always the case.
pub trait RpcClientT: Send + Sync + 'static {
/// Make a raw request for which we expect a single response back from. Implementations
/// should expect that the params will either be `None`, or be an already-serialized
/// JSON array of parameters.
///
/// See [`super::RpcParams`] and the [`super::rpc_params!`] macro for an example of how to
/// construct the parameters.
///
/// Prefer to use the interface provided on [`super::RpcClient`] where possible.
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>>;
/// Subscribe to some method. Implementations should expect that the params will
/// either be `None`, or be an already-serialized JSON array of parameters.
///
/// See [`super::RpcParams`] and the [`super::rpc_params!`] macro for an example of how to
/// construct the parameters.
///
/// Prefer to use the interface provided on [`super::RpcClient`] where possible.
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription>;
}
/// A boxed future that is returned from the [`RpcClientT`] methods.
pub type RawRpcFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, RpcError>> + Send + 'a>>;
/// The RPC subscription returned from [`RpcClientT`]'s `subscription` method.
pub struct RawRpcSubscription {
/// The subscription stream.
pub stream: Pin<Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>>,
/// The ID associated with the subscription.
pub id: Option<String>,
}
impl<T: RpcClientT> RpcClientT for std::sync::Arc<T> {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
(**self).request_raw(method, params)
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
(**self).subscribe_raw(sub, params, unsub)
}
}
impl<T: RpcClientT> RpcClientT for Box<T> {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
(**self).request_raw(method, params)
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
(**self).subscribe_raw(sub, params, unsub)
}
}
+13 -9
View File
@@ -118,14 +118,20 @@ where
}
// TODO: https://github.com/paritytech/subxt/issues/1567
// This is a hack because if a reconnection occurs
// the order of pending calls is not guaranteed.
// This is a hack because, in the event of a disconnection,
// we may not get the correct subscription ID back on reconnecting.
//
// Such that it's possible the a pending future completes
// before `chainHead_follow` is established with fresh
// subscription id.
// This is because we have a race between this future and the
// separate chainHead subscription, which runs in a different task.
// if this future is too quick, it'll be given back an old
// subscription ID from the chainHead subscription which has yet
// to reconnect and establish a new subscription ID.
//
if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES {
// In the event of a wrong subscription Id being used, we happen to
// hand back an `RpcError::LimitReached`, and so can retry when we
// specifically hit that error to see if we get a new subscription ID
// eventually.
if e.is_rpc_limit_reached() && rejected_retries < REJECTED_MAX_RETRIES {
rejected_retries += 1;
continue;
}
@@ -182,9 +188,7 @@ mod tests {
use crate::backend::StreamOf;
fn disconnect_err() -> Error {
Error::Rpc(crate::error::RpcError::DisconnectedWillReconnect(
String::new(),
))
Error::Rpc(subxt_rpcs::Error::DisconnectedWillReconnect(String::new()).into())
}
fn custom_err() -> Error {