Allow generalising over RPC implementation (#634)

* WIP generalising RPC client

* WIP: non-object-safe RpcClientT.. aah generics everywhere

* WIP object-safe RpcClientT trait and no more extra generics

* Get core things compiling again with object-safe RpcClientT trait

* Make jsonrpsee optional and get test-runtime working again

* cargo fmt

* add RpcParams object to enforce correct formatting of rps params

* Wee tweaks

* clippy fixes

* cargo fmt

* TWeak a few types

* make sure we get jsonrpsee-types, too

* Add examples for rpc_params/RpcParams

* more doc tweaks

* remove a now unneeded dev note

* Option<Box<RawValue>> instead to avoid allocations in some cases

* update docs

* tweak RpcClientT trait docs

* Tweak docs around RpcClient and RpcClientT. Don't expose RpcClientT directly

* more doc tweaking about RpcParams and undo decision not to expose RpcParamsT

* Doc tweak

Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>

* more doc tweaks

* Fix doc thing

* Add an example of injecting a custom RPC client

* Fix a typo

* Address clippy things in example

* Fix a silly typo

* another clippy fix

* rpc_params to panic instead of returning a result, like serde_json::json, and deref on Rpc<T>

* fix docs

Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
This commit is contained in:
James Wilson
2022-08-31 10:00:49 +01:00
committed by GitHub
parent 5ff849318b
commit 599107b432
17 changed files with 737 additions and 143 deletions
-1
View File
@@ -19,4 +19,3 @@ futures = "0.3.13"
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
hex = "0.4.3"
tracing-subscriber = "0.3.11"
+1 -1
View File
@@ -2,7 +2,7 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! This example should compile but should aos fail to work, since we've modified the
//! This example should compile but should fail to work, since we've modified the
//! config to not align with a Polkadot node.
use sp_keyring::AccountKeyring;
+96
View File
@@ -0,0 +1,96 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use std::{
fmt::Write,
pin::Pin,
sync::{
Arc,
Mutex,
},
};
use subxt::{
rpc::{
RawValue,
RpcClientT,
RpcFuture,
RpcSubscription,
},
OnlineClient,
PolkadotConfig,
};
// A dummy RPC client that doesn't actually handle requests properly
// at all, but instead just logs what requests to it were made.
struct MyLoggingClient {
log: Arc<Mutex<String>>,
}
// We have to implement this fairly low level trait to turn [`MyLoggingClient`]
// into an RPC client that we can make use of in Subxt. Here we just log the requests
// made but don't forward them to any real node, and instead just return nonsense.
impl RpcClientT for MyLoggingClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
writeln!(
self.log.lock().unwrap(),
"{method}({})",
params.as_ref().map(|p| p.get()).unwrap_or("[]")
)
.unwrap();
// We've logged the request; just return garbage. Because a boxed future is returned,
// you're able to run whatever async code you'd need to actually talk to a node.
let res = RawValue::from_string("[]".to_string()).unwrap();
Box::pin(std::future::ready(Ok(res)))
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
writeln!(
self.log.lock().unwrap(),
"{sub}({}) (unsub: {unsub})",
params.as_ref().map(|p| p.get()).unwrap_or("[]")
)
.unwrap();
// We've logged the request; just return garbage. Because a boxed future is returned,
// and that will return a boxed Stream impl, you have a bunch of flexibility to build
// and return whatever type of Stream you see fit.
let res = RawValue::from_string("[]".to_string()).unwrap();
let stream = futures::stream::once(async move { Ok(res) });
let stream: Pin<Box<dyn futures::Stream<Item = _> + Send>> = Box::pin(stream);
Box::pin(std::future::ready(Ok(stream)))
}
}
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
pub mod polkadot {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Instantiate our replacement RPC client.
let log = Arc::default();
let rpc_client = MyLoggingClient {
log: Arc::clone(&log),
};
// Pass this into our OnlineClient to instantiate it. This will lead to some
// RPC calls being made to fetch chain details/metadata, which will immediately
// fail..
let _ = OnlineClient::<PolkadotConfig>::from_rpc_client(rpc_client).await;
// But, we can see that the calls were made via our custom RPC client:
println!("Log of calls made:\n\n{}", log.lock().unwrap().as_str());
Ok(())
}
+7 -1
View File
@@ -13,11 +13,17 @@ description = "Submit extrinsics (transactions) to a substrate node via RPC"
keywords = ["parity", "substrate", "blockchain"]
[features]
default = ["jsonrpsee"]
# Activate this to expose functionality only used for integration testing.
# The exposed functionality is subject to breaking changes at any point,
# and should not be relied upon.
integration-tests = []
# Jsonrpsee if the default RPC provider used in Subxt. However, it can be
# swapped out for an alternative implementation, and so is optional.
jsonrpsee = ["dep:jsonrpsee"]
[dependencies]
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
@@ -26,7 +32,7 @@ scale-value = "0.5.0"
scale-decode = "0.3.0"
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport"] }
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport", "jsonrpsee-types"], optional = true }
serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64"
thiserror = "1.0.24"
+53 -8
View File
@@ -12,7 +12,7 @@ use crate::{
events::EventsClient,
rpc::{
Rpc,
RpcClient,
RpcClientT,
RuntimeVersion,
},
storage::StorageClient,
@@ -52,12 +52,14 @@ struct Inner<T: Config> {
impl<T: Config> std::fmt::Debug for OnlineClient<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("rpc", &"<Rpc>")
.field("rpc", &"RpcClient")
.field("inner", &self.inner)
.finish()
}
}
// The default constructors assume Jsonrpsee.
#[cfg(feature = "jsonrpsee")]
impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] using default settings which
/// point to a locally running node on `ws://127.0.0.1:9944`.
@@ -68,16 +70,20 @@ impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`], providing a URL to connect to.
pub async fn from_url(url: impl AsRef<str>) -> Result<OnlineClient<T>, Error> {
let client = crate::rpc::ws_client(url.as_ref()).await?;
let client = jsonrpsee_helpers::ws_client(url.as_ref())
.await
.map_err(|e| crate::error::RpcError(e.to_string()))?;
OnlineClient::from_rpc_client(client).await
}
}
/// Construct a new [`OnlineClient`] by providing the underlying [`RpcClient`]
/// to use to drive the connection.
pub async fn from_rpc_client(
rpc_client: impl Into<RpcClient>,
impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] by providing an underlying [`RpcClientT`]
/// implementation to drive the connection.
pub async fn from_rpc_client<R: RpcClientT>(
rpc_client: R,
) -> Result<OnlineClient<T>, Error> {
let rpc = Rpc::new(rpc_client.into());
let rpc = Rpc::new(rpc_client);
let (genesis_hash, runtime_version, metadata) = future::join3(
rpc.genesis_hash(),
@@ -232,3 +238,42 @@ impl<T: Config> ClientRuntimeUpdater<T> {
Ok(())
}
}
// helpers for a jsonrpsee specific OnlineClient.
#[cfg(feature = "jsonrpsee")]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver,
Sender,
Uri,
WsTransportClientBuilder,
},
core::{
client::{
Client,
ClientBuilder,
},
Error,
},
};
/// Build WS RPC client from URL
pub async fn ws_client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(ClientBuilder::default()
.max_notifs_per_subscription(4096)
.build_with_tokio(sender, receiver))
}
async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url: Uri = url
.parse()
.map_err(|e: InvalidUri| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}
+8 -3
View File
@@ -15,7 +15,6 @@ pub use crate::metadata::{
InvalidMetadataError,
MetadataError,
};
pub use jsonrpsee::core::error::Error as RequestError;
pub use scale_value::scale::{
DecodeError,
EncodeError,
@@ -36,7 +35,7 @@ pub enum Error {
Codec(#[from] codec::Error),
/// Rpc error.
#[error("Rpc error: {0}")]
Rpc(#[from] RequestError),
Rpc(#[from] RpcError),
/// Serde serialization error
#[error("Serde json error: {0}")]
Serialization(#[from] serde_json::error::Error),
@@ -102,6 +101,12 @@ impl From<DispatchError> for Error {
}
}
/// An RPC error. Since we are generic over the RPC client that is used,
/// the error is any custom string.
#[derive(Debug, thiserror::Error)]
#[error("RPC error: {0}")]
pub struct RpcError(pub String);
/// This is our attempt to decode a runtime DispatchError. We either
/// successfully decode it into a [`ModuleError`], or we fail and keep
/// hold of the bytes, which we can attempt to decode if we have an
@@ -232,7 +237,7 @@ pub enum TransactionError {
/// block hasn't yet been finalized).
#[error("The finality subscription expired")]
FinalitySubscriptionTimeout,
/// The block hash that the tranaction was added to could not be found.
/// The block hash that the transaction was added to could not be found.
/// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
BlockHashNotFound,
+3 -3
View File
@@ -8,6 +8,7 @@ use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
rpc::Subscription,
Config,
};
use derivative::Derivative;
@@ -18,7 +19,6 @@ use futures::{
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
@@ -32,12 +32,12 @@ pub use super::{
FilterEvents,
};
/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;
/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
+86
View File
@@ -0,0 +1,86 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{
RpcClientT,
RpcFuture,
RpcSubscription,
};
use crate::error::RpcError;
use futures::stream::{
StreamExt,
TryStreamExt,
};
use jsonrpsee::{
core::client::{
Client,
ClientT,
SubscriptionClientT,
},
types::ParamsSer,
};
use serde_json::value::{
RawValue,
Value,
};
impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let params = prep_params_for_jsonrpsee(params)?;
let res = ClientT::request(self, method, Some(params))
.await
.map_err(|e| RpcError(e.to_string()))?;
Ok(res)
})
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
Box::pin(async move {
let params = prep_params_for_jsonrpsee(params)?;
let sub = SubscriptionClientT::subscribe::<Box<RawValue>>(
self,
sub,
Some(params),
unsub,
)
.await
.map_err(|e| RpcError(e.to_string()))?
.map_err(|e| RpcError(e.to_string()))
.boxed();
Ok(sub)
})
}
}
// This is ugly; we have to encode to Value's to be compat with the jsonrpc interface.
// Remove and simplify this once something like https://github.com/paritytech/jsonrpsee/issues/862 is in:
fn prep_params_for_jsonrpsee(
params: Option<Box<RawValue>>,
) -> Result<ParamsSer<'static>, RpcError> {
let params = match params {
Some(params) => params,
// No params? avoid any work and bail early.
None => return Ok(ParamsSer::Array(Vec::new())),
};
let val = serde_json::to_value(&params).expect("RawValue guarantees valid JSON");
let arr = match val {
Value::Array(arr) => Ok(arr),
_ => {
Err(RpcError(format!(
"RPC Params are expected to be an array but got {params}"
)))
}
}?;
Ok(ParamsSer::Array(arr))
}
+77
View File
@@ -0,0 +1,77 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! RPC types and client for interacting with a substrate node.
//!
//! These is used behind the scenes by various `subxt` APIs, but can
//! also be used directly.
//!
//! - [`Rpc`] is the highest level wrapper, and the one you will run into
//! first. It contains the higher level methods for interacting with a node.
//! - [`RpcClient`] is what [`Rpc`] uses to actually talk to a node, offering
//! a [`RpcClient::request`] and [`RpcClient::subscribe`] method to do so.
//! - [`RpcClientT`] is the underlying dynamic RPC implementation. This provides
//! the low level [`RpcClientT::request_raw`] and [`RpcClientT::subscribe_raw`]
//! methods. This can be swapped out for a custom implementation, but by default
//! we'll rely on `jsonrpsee` for this.
//!
//! # Example
//!
//! Fetching storage keys
//!
//! ```no_run
//! use subxt::{ PolkadotConfig, OnlineClient, storage::StorageKey };
//!
//! #[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
//! pub mod polkadot {}
//!
//! # #[tokio::main]
//! # async fn main() {
//! let api = OnlineClient::<PolkadotConfig>::new().await.unwrap();
//!
//! let key = polkadot::storage()
//! .xcm_pallet()
//! .version_notifiers_root()
//! .to_bytes();
//!
//! // Fetch up to 10 keys.
//! let keys = api
//! .rpc()
//! .storage_keys_paged(&key, 10, None, None)
//! .await
//! .unwrap();
//!
//! for key in keys.iter() {
//! println!("Key: 0x{}", hex::encode(&key));
//! }
//! # }
//! ```
// Allow an `rpc.rs` file in the `rpc` folder to align better
// with other file names for their types.
#![allow(clippy::module_inception)]
#[cfg(feature = "jsonrpsee")]
mod jsonrpsee_impl;
mod rpc;
mod rpc_client;
mod rpc_client_t;
// Expose the `Rpc` struct and any associated types.
pub use rpc::*;
pub use rpc_client_t::{
RawValue,
RpcClientT,
RpcFuture,
RpcSubscription,
};
pub use rpc_client::{
rpc_params,
RpcClient,
RpcParams,
Subscription,
};
+30 -69
View File
@@ -39,16 +39,12 @@
//! # }
//! ```
// jsonrpsee subscriptions are interminable.
// Allows `while let status = subscription.next().await {}`
// Related: https://github.com/paritytech/subxt/issues/66
#![allow(irrefutable_let_patterns)]
use std::{
collections::HashMap,
sync::Arc,
use super::{
rpc_params,
RpcClient,
RpcClientT,
Subscription,
};
use crate::{
error::Error,
utils::PhantomDataSendSync,
@@ -60,29 +56,6 @@ use codec::{
Encode,
};
use frame_metadata::RuntimeMetadataPrefixed;
pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver as WsReceiver,
Sender as WsSender,
Uri,
WsTransportClientBuilder,
},
core::{
client::{
Client as RpcClient,
ClientBuilder as RpcClientBuilder,
ClientT,
Subscription,
SubscriptionClientT,
},
to_json_value,
DeserializeOwned,
Error as RpcError,
JsonValue,
},
rpc_params,
};
use serde::{
Deserialize,
Serialize,
@@ -103,6 +76,7 @@ use sp_runtime::{
},
ApplyExtrinsicResult,
};
use std::collections::HashMap;
/// A number type that can be serialized both as a number or a string that encodes a number in a
/// string.
@@ -344,8 +318,7 @@ pub struct Health {
/// Client for substrate rpc interfaces
pub struct Rpc<T: Config> {
/// Rpc client for sending requests.
pub client: Arc<RpcClient>,
client: RpcClient,
_marker: PhantomDataSendSync<T>,
}
@@ -358,11 +331,20 @@ impl<T: Config> Clone for Rpc<T> {
}
}
// Expose subscribe/request, and also subscribe_raw/request_raw
// from the even-deeper `dyn RpcClientT` impl.
impl<T: Config> std::ops::Deref for Rpc<T> {
type Target = RpcClient;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl<T: Config> Rpc<T> {
/// Create a new [`Rpc`]
pub fn new(client: RpcClient) -> Self {
pub fn new<R: RpcClientT>(client: R) -> Self {
Self {
client: Arc::new(client),
client: RpcClient::new(client),
_marker: PhantomDataSendSync::new(),
}
}
@@ -445,30 +427,29 @@ impl<T: Config> Rpc<T> {
/// Fetch system properties
pub async fn system_properties(&self) -> Result<SystemProperties, Error> {
Ok(self
.client
self.client
.request("system_properties", rpc_params![])
.await?)
.await
}
/// Fetch system health
pub async fn system_health(&self) -> Result<Health, Error> {
Ok(self.client.request("system_health", rpc_params![]).await?)
self.client.request("system_health", rpc_params![]).await
}
/// Fetch system chain
pub async fn system_chain(&self) -> Result<String, Error> {
Ok(self.client.request("system_chain", rpc_params![]).await?)
self.client.request("system_chain", rpc_params![]).await
}
/// Fetch system name
pub async fn system_name(&self) -> Result<String, Error> {
Ok(self.client.request("system_name", rpc_params![]).await?)
self.client.request("system_name", rpc_params![]).await
}
/// Fetch system version
pub async fn system_version(&self) -> Result<String, Error> {
Ok(self.client.request("system_version", rpc_params![]).await?)
self.client.request("system_version", rpc_params![]).await
}
/// Fetch the current nonce for the given account ID.
@@ -476,10 +457,9 @@ impl<T: Config> Rpc<T> {
&self,
account: &T::AccountId,
) -> Result<T::Index, Error> {
Ok(self
.client
self.client
.request("system_accountNextIndex", rpc_params![account])
.await?)
.await
}
/// Get a header
@@ -650,10 +630,9 @@ impl<T: Config> Rpc<T> {
/// Generate new session keys and returns the corresponding public keys.
pub async fn rotate_keys(&self) -> Result<Bytes, Error> {
Ok(self
.client
self.client
.request("author_rotateKeys", rpc_params![])
.await?)
.await
}
/// Checks if the keystore has private keys for the given session public keys.
@@ -663,7 +642,7 @@ impl<T: Config> Rpc<T> {
/// Returns `true` iff all private keys could be found.
pub async fn has_session_keys(&self, session_keys: Bytes) -> Result<bool, Error> {
let params = rpc_params![session_keys];
Ok(self.client.request("author_hasSessionKeys", params).await?)
self.client.request("author_hasSessionKeys", params).await
}
/// Checks if the keystore has private keys for the given public key and key type.
@@ -675,7 +654,7 @@ impl<T: Config> Rpc<T> {
key_type: String,
) -> Result<bool, Error> {
let params = rpc_params![public_key, key_type];
Ok(self.client.request("author_hasKey", params).await?)
self.client.request("author_hasKey", params).await
}
/// Submits the extrinsic to the dry_run RPC, to test if it would succeed.
@@ -694,24 +673,6 @@ impl<T: Config> Rpc<T> {
}
}
/// Build WS RPC client from URL
pub async fn ws_client(url: &str) -> Result<RpcClient, RpcError> {
let (sender, receiver) = ws_transport(url).await?;
Ok(RpcClientBuilder::default()
.max_notifs_per_subscription(4096)
.build_with_tokio(sender, receiver))
}
async fn ws_transport(url: &str) -> Result<(WsSender, WsReceiver), RpcError> {
let url: Uri = url
.parse()
.map_err(|e: InvalidUri| RpcError::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| RpcError::Transport(e.into()))
}
fn to_hex(bytes: impl AsRef<[u8]>) -> String {
format!("0x{}", hex::encode(bytes.as_ref()))
}
+218
View File
@@ -0,0 +1,218 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{
RpcClientT,
RpcSubscription,
};
use crate::error::Error;
use futures::{
Stream,
StreamExt,
};
use serde::{
de::DeserializeOwned,
Serialize,
};
use serde_json::value::RawValue;
use std::{
pin::Pin,
sync::Arc,
task::Poll,
};
/// A concrete wrapper around an [`RpcClientT`] which exposes the udnerlying interface via some
/// higher level methods that make it a little easier to work with.
///
/// Wrapping [`RpcClientT`] in this way is simply a way to expose this additional functionality
/// without getting into issues with non-object-safe methods or no `async` in traits.
#[derive(Clone)]
pub struct RpcClient(Arc<dyn RpcClientT>);
impl RpcClient {
pub(crate) fn new<R: RpcClientT>(client: R) -> Self {
RpcClient(Arc::new(client))
}
/// Make an RPC request, given a method name and some parameters.
///
/// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to
/// construct the parameters.
pub async fn request<Res: DeserializeOwned>(
&self,
method: &str,
params: RpcParams,
) -> Result<Res, Error> {
let res = self.0.request_raw(method, params.build()).await?;
let val = serde_json::from_str(res.get())?;
Ok(val)
}
/// Subscribe to an RPC endpoint, providing the parameters and the method to call to
/// unsubscribe from it again.
///
/// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to
/// construct the parameters.
pub async fn subscribe<Res: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
unsub: &str,
) -> Result<Subscription<Res>, Error> {
let sub = self.0.subscribe_raw(sub, params.build(), unsub).await?;
Ok(Subscription::new(sub))
}
}
impl std::fmt::Debug for RpcClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RpcClient").finish()
}
}
impl std::ops::Deref for RpcClient {
type Target = dyn RpcClientT;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
/// Create some [`RpcParams`] to pass to our [`RpcClient`]. [`RpcParams`]
/// simply enforces that parameters handed to our [`RpcClient`] methods
/// are the correct shape.
///
/// As with the [`serde_json::json!`] macro, this will panic if you provide
/// parameters which cannot successfully be serialized to JSON.
///
/// # Example
///
/// ```rust
/// use subxt::rpc::{ rpc_params, RpcParams };
///
/// // If you provide no params you get `None` back
/// let params: RpcParams = rpc_params![];
/// assert!(params.build().is_none());
///
/// // If you provide params you get `Some<Box<RawValue>>` back.
/// let params: RpcParams = rpc_params![1, true, "foo"];
/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]");
/// ```
#[macro_export]
macro_rules! rpc_params {
($($p:expr), *) => {{
// May be unused if empty; no params.
#[allow(unused_mut)]
let mut params = $crate::rpc::RpcParams::new();
$(
params.push($p).expect("values passed to rpc_params! must be serializable to JSON");
)*
params
}}
}
pub use rpc_params;
/// This represents the parameters passed to an [`RpcClient`], and exists to
/// enforce that parameters are provided in the correct format.
///
/// Prefer to use the [`rpc_params!`] macro for simpler creation of these.
///
/// # Example
///
/// ```rust
/// use subxt::rpc::RpcParams;
///
/// let mut params = RpcParams::new();
/// params.push(1).unwrap();
/// params.push(true).unwrap();
/// params.push("foo").unwrap();
///
/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]");
/// ```
#[derive(Debug, Clone, Default)]
pub struct RpcParams(Vec<u8>);
impl RpcParams {
/// Create a new empty set of [`RpcParams`].
pub fn new() -> Self {
Self(Vec::new())
}
/// Push a parameter into our [`RpcParams`]. This serializes it to JSON
/// in the process, and so will return an error if this is not possible.
pub fn push<P: Serialize>(&mut self, param: P) -> Result<(), Error> {
if self.0.is_empty() {
self.0.push(b'[');
} else {
self.0.push(b',')
}
serde_json::to_writer(&mut self.0, &param)?;
Ok(())
}
/// Build a [`RawValue`] from our params, returning `None` if no parameters
/// were provided.
pub fn build(mut self) -> Option<Box<RawValue>> {
if self.0.is_empty() {
None
} else {
self.0.push(b']');
let s = unsafe { String::from_utf8_unchecked(self.0) };
Some(RawValue::from_string(s).expect("Should be valid JSON"))
}
}
}
/// A generic RPC Subscription. This implements [`Stream`], and so most of
/// the functionality you'll need to interact with it comes from the
/// [`StreamExt`] extension trait.
pub struct Subscription<Res> {
inner: RpcSubscription,
_marker: std::marker::PhantomData<Res>,
}
impl<Res> std::fmt::Debug for Subscription<Res> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("inner", &"RpcSubscription")
.field("_marker", &self._marker)
.finish()
}
}
impl<Res> Subscription<Res> {
fn new(inner: RpcSubscription) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
}
impl<Res: DeserializeOwned> Subscription<Res> {
/// Wait for the next item from the subscription.
pub async fn next(&mut self) -> Option<Result<Res, Error>> {
StreamExt::next(self).await
}
}
impl<Res> std::marker::Unpin for Subscription<Res> {}
impl<Res: DeserializeOwned> Stream for Subscription<Res> {
type Item = Result<Res, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = futures::ready!(self.inner.poll_next_unpin(cx));
// Decode the inner RawValue to the type we're expecting and map
// any errors to the right shape:
let res = res.map(|r| {
r.map_err(|e| e.into()).and_then(|raw_val| {
serde_json::from_str(raw_val.get()).map_err(|e| e.into())
})
});
Poll::Ready(res)
}
}
+59
View File
@@ -0,0 +1,59 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::error::RpcError;
use futures::Stream;
use std::{
future::Future,
pin::Pin,
};
// Re-exporting for simplicity since it's used a bunch in the trait definition.
pub use serde_json::value::RawValue;
/// Any RPC client which implements this can be used in our [`super::Rpc`] type
/// to talk to a node.
///
/// This is a low level interface whose methods expect an already-serialized set of params,
/// and return an owned but still-serialized [`RawValue`], deferring deserialization to
/// the caller. This is the case because we want the methods to be object-safe (which prohibits
/// generics), and want to avoid any unnecessary allocations in serializing/deserializing
/// parameters.
pub trait RpcClientT: Send + Sync + 'static {
/// Make a raw request for which we expect a single response back from. Implementations
/// should expect that the params will either be `None`, or be an already-serialized
/// JSON array of parameters.
///
/// See [`super::RpcParams`] and the [`super::rpc_params!`] macro for an example of how to
/// construct the parameters.
///
/// Prefer to use the interface provided on [`super::RpcClient`] where possible.
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>>;
/// Subscribe to some method. Implementations should expect that the params will
/// either be `None`, or be an already-serialized JSON array of parameters.
///
/// See [`super::RpcParams`] and the [`super::rpc_params!`] macro for an example of how to
/// construct the parameters.
///
/// Prefer to use the interface provided on [`super::RpcClient`] where possible.
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription>;
}
/// A boxed future that is returned from the [`RpcClientT`] methods.
pub type RpcFuture<'a, T> =
Pin<Box<dyn Future<Output = Result<T, RpcError>> + Send + 'a>>;
/// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method.
pub type RpcSubscription =
Pin<Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>>;
+1 -1
View File
@@ -318,7 +318,7 @@ pub struct KeyIter<T: Config, Client, ReturnTy> {
_marker: std::marker::PhantomData<ReturnTy>,
}
impl<'a, T: Config, Client: OnlineClientT<T>, ReturnTy> KeyIter<T, Client, ReturnTy>
impl<'a, T, Client, ReturnTy> KeyIter<T, Client, ReturnTy>
where
T: Config,
Client: OnlineClientT<T>,
+5 -1
View File
@@ -208,7 +208,11 @@ impl<T: Config, C: OfflineClientT<T>> TxClient<T, C> {
}
}
impl<T: Config, C: OnlineClientT<T>> TxClient<T, C> {
impl<T, C> TxClient<T, C>
where
T: Config,
C: OnlineClientT<T>,
{
/// Creates a raw signed extrinsic, without submitting it.
pub async fn create_signed<Call>(
&self,
+52 -50
View File
@@ -11,6 +11,7 @@ use crate::{
error::{
DispatchError,
Error,
RpcError,
TransactionError,
},
events::{
@@ -21,7 +22,10 @@ use crate::{
Phase,
StaticEvent,
},
rpc::SubstrateTxStatus,
rpc::{
Subscription,
SubstrateTxStatus,
},
Config,
};
use derivative::Derivative;
@@ -29,10 +33,6 @@ use futures::{
Stream,
StreamExt,
};
use jsonrpsee::core::{
client::Subscription as RpcSubscription,
Error as RpcError,
};
use sp_runtime::traits::Hash;
pub use sp_runtime::traits::SignedExtension;
@@ -41,7 +41,7 @@ pub use sp_runtime::traits::SignedExtension;
#[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub struct TxProgress<T: Config, C> {
sub: Option<RpcSubscription<SubstrateTxStatus<T::Hash, T::Hash>>>,
sub: Option<Subscription<SubstrateTxStatus<T::Hash, T::Hash>>>,
ext_hash: T::Hash,
client: C,
}
@@ -54,7 +54,7 @@ impl<T: Config, C> Unpin for TxProgress<T, C> {}
impl<T: Config, C> TxProgress<T, C> {
/// Instantiate a new [`TxProgress`] from a custom subscription.
pub fn new(
sub: RpcSubscription<SubstrateTxStatus<T::Hash, T::Hash>>,
sub: Subscription<SubstrateTxStatus<T::Hash, T::Hash>>,
client: C,
ext_hash: T::Hash,
) -> Self {
@@ -71,7 +71,11 @@ impl<T: Config, C> TxProgress<T, C> {
}
}
impl<T: Config, C: OnlineClientT<T>> TxProgress<T, C> {
impl<T, C> TxProgress<T, C>
where
T: Config,
C: OnlineClientT<T>,
{
/// Return the next transaction status when it's emitted. This just delegates to the
/// [`futures::Stream`] implementation for [`TxProgress`], but allows you to
/// avoid importing that trait if you don't otherwise need it.
@@ -103,7 +107,7 @@ impl<T: Config, C: OnlineClientT<T>> TxProgress<T, C> {
_ => continue,
}
}
Err(RpcError::Custom("RPC subscription dropped".into()).into())
Err(RpcError("RPC subscription dropped".to_string()).into())
}
/// Wait for the transaction to be finalized, and return a [`TxInBlock`]
@@ -129,7 +133,7 @@ impl<T: Config, C: OnlineClientT<T>> TxProgress<T, C> {
_ => continue,
}
}
Err(RpcError::Custom("RPC subscription dropped".into()).into())
Err(RpcError("RPC subscription dropped".to_string()).into())
}
/// Wait for the transaction to be finalized, and for the transaction events to indicate
@@ -161,47 +165,45 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
None => return Poll::Ready(None),
};
sub.poll_next_unpin(cx)
.map_err(|e| e.into())
.map_ok(|status| {
match status {
SubstrateTxStatus::Future => TxStatus::Future,
SubstrateTxStatus::Ready => TxStatus::Ready,
SubstrateTxStatus::Broadcast(peers) => TxStatus::Broadcast(peers),
SubstrateTxStatus::InBlock(hash) => {
TxStatus::InBlock(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
}
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash),
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash),
SubstrateTxStatus::Dropped => TxStatus::Dropped,
SubstrateTxStatus::Invalid => TxStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TxStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTxStatus::FinalityTimeout(hash) => {
self.sub = None;
TxStatus::FinalityTimeout(hash)
}
SubstrateTxStatus::Finalized(hash) => {
self.sub = None;
TxStatus::Finalized(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
}
sub.poll_next_unpin(cx).map_ok(|status| {
match status {
SubstrateTxStatus::Future => TxStatus::Future,
SubstrateTxStatus::Ready => TxStatus::Ready,
SubstrateTxStatus::Broadcast(peers) => TxStatus::Broadcast(peers),
SubstrateTxStatus::InBlock(hash) => {
TxStatus::InBlock(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
}
})
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash),
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash),
SubstrateTxStatus::Dropped => TxStatus::Dropped,
SubstrateTxStatus::Invalid => TxStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TxStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTxStatus::FinalityTimeout(hash) => {
self.sub = None;
TxStatus::FinalityTimeout(hash)
}
SubstrateTxStatus::Finalized(hash) => {
self.sub = None;
TxStatus::Finalized(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
}
}
})
}
}
+1
View File
@@ -13,3 +13,4 @@ subxt = { path = "../../subxt" }
sp-core = "6.0.0"
tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] }
which = "4.2.2"
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport"] }
+40 -5
View File
@@ -15,10 +15,6 @@ use std::{
thread,
time,
};
use subxt::rpc::{
self,
ClientT,
};
static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH";
@@ -62,7 +58,8 @@ async fn run() {
// It might take a while for substrate node that spin up the RPC server.
// Thus, the connection might get rejected a few times.
let res = match rpc::ws_client(&format!("ws://localhost:{}", port)).await {
use client::ClientT;
let res = match client::build(&format!("ws://localhost:{}", port)).await {
Ok(c) => c.request("state_getMetadata", None).await,
Err(e) => Err(e),
};
@@ -157,3 +154,41 @@ impl Drop for KillOnDrop {
let _ = self.0.kill();
}
}
// Use jsonrpsee to obtain metadata from the node.
mod client {
pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver,
Sender,
Uri,
WsTransportClientBuilder,
},
core::{
client::{
Client,
ClientBuilder,
},
Error,
},
};
pub use jsonrpsee::core::client::ClientT;
/// Build WS RPC client from URL
pub async fn build(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(ClientBuilder::default().build_with_tokio(sender, receiver))
}
async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url: Uri = url
.parse()
.map_err(|e: InvalidUri| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}