diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 00d8ef480d..31cd434a18 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -19,4 +19,3 @@ futures = "0.3.13" codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] } hex = "0.4.3" tracing-subscriber = "0.3.11" - diff --git a/examples/examples/custom_config.rs b/examples/examples/custom_config.rs index 5b2c00d75c..d9b2c95f5a 100644 --- a/examples/examples/custom_config.rs +++ b/examples/examples/custom_config.rs @@ -2,7 +2,7 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -//! This example should compile but should aos fail to work, since we've modified the +//! This example should compile but should fail to work, since we've modified the //! config to not align with a Polkadot node. use sp_keyring::AccountKeyring; diff --git a/examples/examples/custom_rpc_client.rs b/examples/examples/custom_rpc_client.rs new file mode 100644 index 0000000000..9329b67aad --- /dev/null +++ b/examples/examples/custom_rpc_client.rs @@ -0,0 +1,96 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use std::{ + fmt::Write, + pin::Pin, + sync::{ + Arc, + Mutex, + }, +}; +use subxt::{ + rpc::{ + RawValue, + RpcClientT, + RpcFuture, + RpcSubscription, + }, + OnlineClient, + PolkadotConfig, +}; + +// A dummy RPC client that doesn't actually handle requests properly +// at all, but instead just logs what requests to it were made. +struct MyLoggingClient { + log: Arc>, +} + +// We have to implement this fairly low level trait to turn [`MyLoggingClient`] +// into an RPC client that we can make use of in Subxt. Here we just log the requests +// made but don't forward them to any real node, and instead just return nonsense. +impl RpcClientT for MyLoggingClient { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RpcFuture<'a, Box> { + writeln!( + self.log.lock().unwrap(), + "{method}({})", + params.as_ref().map(|p| p.get()).unwrap_or("[]") + ) + .unwrap(); + + // We've logged the request; just return garbage. Because a boxed future is returned, + // you're able to run whatever async code you'd need to actually talk to a node. + let res = RawValue::from_string("[]".to_string()).unwrap(); + Box::pin(std::future::ready(Ok(res))) + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RpcFuture<'a, RpcSubscription> { + writeln!( + self.log.lock().unwrap(), + "{sub}({}) (unsub: {unsub})", + params.as_ref().map(|p| p.get()).unwrap_or("[]") + ) + .unwrap(); + + // We've logged the request; just return garbage. Because a boxed future is returned, + // and that will return a boxed Stream impl, you have a bunch of flexibility to build + // and return whatever type of Stream you see fit. + let res = RawValue::from_string("[]".to_string()).unwrap(); + let stream = futures::stream::once(async move { Ok(res) }); + let stream: Pin + Send>> = Box::pin(stream); + Box::pin(std::future::ready(Ok(stream))) + } +} + +#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")] +pub mod polkadot {} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + // Instantiate our replacement RPC client. + let log = Arc::default(); + let rpc_client = MyLoggingClient { + log: Arc::clone(&log), + }; + + // Pass this into our OnlineClient to instantiate it. This will lead to some + // RPC calls being made to fetch chain details/metadata, which will immediately + // fail.. + let _ = OnlineClient::::from_rpc_client(rpc_client).await; + + // But, we can see that the calls were made via our custom RPC client: + println!("Log of calls made:\n\n{}", log.lock().unwrap().as_str()); + Ok(()) +} diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 4c11208c44..9eb2309b66 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -13,11 +13,17 @@ description = "Submit extrinsics (transactions) to a substrate node via RPC" keywords = ["parity", "substrate", "blockchain"] [features] +default = ["jsonrpsee"] + # Activate this to expose functionality only used for integration testing. # The exposed functionality is subject to breaking changes at any point, # and should not be relied upon. integration-tests = [] +# Jsonrpsee if the default RPC provider used in Subxt. However, it can be +# swapped out for an alternative implementation, and so is optional. +jsonrpsee = ["dep:jsonrpsee"] + [dependencies] bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] } @@ -26,7 +32,7 @@ scale-value = "0.5.0" scale-decode = "0.3.0" futures = "0.3.13" hex = "0.4.3" -jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport"] } +jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport", "jsonrpsee-types"], optional = true } serde = { version = "1.0.124", features = ["derive"] } serde_json = "1.0.64" thiserror = "1.0.24" diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index 7043d58cfc..fa86fda667 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -12,7 +12,7 @@ use crate::{ events::EventsClient, rpc::{ Rpc, - RpcClient, + RpcClientT, RuntimeVersion, }, storage::StorageClient, @@ -52,12 +52,14 @@ struct Inner { impl std::fmt::Debug for OnlineClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Client") - .field("rpc", &"") + .field("rpc", &"RpcClient") .field("inner", &self.inner) .finish() } } +// The default constructors assume Jsonrpsee. +#[cfg(feature = "jsonrpsee")] impl OnlineClient { /// Construct a new [`OnlineClient`] using default settings which /// point to a locally running node on `ws://127.0.0.1:9944`. @@ -68,16 +70,20 @@ impl OnlineClient { /// Construct a new [`OnlineClient`], providing a URL to connect to. pub async fn from_url(url: impl AsRef) -> Result, Error> { - let client = crate::rpc::ws_client(url.as_ref()).await?; + let client = jsonrpsee_helpers::ws_client(url.as_ref()) + .await + .map_err(|e| crate::error::RpcError(e.to_string()))?; OnlineClient::from_rpc_client(client).await } +} - /// Construct a new [`OnlineClient`] by providing the underlying [`RpcClient`] - /// to use to drive the connection. - pub async fn from_rpc_client( - rpc_client: impl Into, +impl OnlineClient { + /// Construct a new [`OnlineClient`] by providing an underlying [`RpcClientT`] + /// implementation to drive the connection. + pub async fn from_rpc_client( + rpc_client: R, ) -> Result, Error> { - let rpc = Rpc::new(rpc_client.into()); + let rpc = Rpc::new(rpc_client); let (genesis_hash, runtime_version, metadata) = future::join3( rpc.genesis_hash(), @@ -232,3 +238,42 @@ impl ClientRuntimeUpdater { Ok(()) } } + +// helpers for a jsonrpsee specific OnlineClient. +#[cfg(feature = "jsonrpsee")] +mod jsonrpsee_helpers { + pub use jsonrpsee::{ + client_transport::ws::{ + InvalidUri, + Receiver, + Sender, + Uri, + WsTransportClientBuilder, + }, + core::{ + client::{ + Client, + ClientBuilder, + }, + Error, + }, + }; + + /// Build WS RPC client from URL + pub async fn ws_client(url: &str) -> Result { + let (sender, receiver) = ws_transport(url).await?; + Ok(ClientBuilder::default() + .max_notifs_per_subscription(4096) + .build_with_tokio(sender, receiver)) + } + + async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> { + let url: Uri = url + .parse() + .map_err(|e: InvalidUri| Error::Transport(e.into()))?; + WsTransportClientBuilder::default() + .build(url) + .await + .map_err(|e| Error::Transport(e.into())) + } +} diff --git a/subxt/src/error.rs b/subxt/src/error.rs index 85c61e98b7..04f1ed9134 100644 --- a/subxt/src/error.rs +++ b/subxt/src/error.rs @@ -15,7 +15,6 @@ pub use crate::metadata::{ InvalidMetadataError, MetadataError, }; -pub use jsonrpsee::core::error::Error as RequestError; pub use scale_value::scale::{ DecodeError, EncodeError, @@ -36,7 +35,7 @@ pub enum Error { Codec(#[from] codec::Error), /// Rpc error. #[error("Rpc error: {0}")] - Rpc(#[from] RequestError), + Rpc(#[from] RpcError), /// Serde serialization error #[error("Serde json error: {0}")] Serialization(#[from] serde_json::error::Error), @@ -102,6 +101,12 @@ impl From for Error { } } +/// An RPC error. Since we are generic over the RPC client that is used, +/// the error is any custom string. +#[derive(Debug, thiserror::Error)] +#[error("RPC error: {0}")] +pub struct RpcError(pub String); + /// This is our attempt to decode a runtime DispatchError. We either /// successfully decode it into a [`ModuleError`], or we fail and keep /// hold of the bytes, which we can attempt to decode if we have an @@ -232,7 +237,7 @@ pub enum TransactionError { /// block hasn't yet been finalized). #[error("The finality subscription expired")] FinalitySubscriptionTimeout, - /// The block hash that the tranaction was added to could not be found. + /// The block hash that the transaction was added to could not be found. /// This is probably because the block was retracted before being finalized. #[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")] BlockHashNotFound, diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 704a31c82f..c0d42e9be4 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -8,6 +8,7 @@ use crate::{ client::OnlineClientT, error::Error, events::EventsClient, + rpc::Subscription, Config, }; use derivative::Derivative; @@ -18,7 +19,6 @@ use futures::{ Stream, StreamExt, }; -use jsonrpsee::core::client::Subscription; use sp_runtime::traits::Header; use std::{ marker::Unpin, @@ -32,12 +32,12 @@ pub use super::{ FilterEvents, }; -/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back +/// A Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe_finalized`, and is exposed to be used in codegen. #[doc(hidden)] pub type FinalizedEventSub
= BoxStream<'static, Result>; -/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back +/// A Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe`, and is exposed to be used in codegen. #[doc(hidden)] pub type EventSub = Subscription; diff --git a/subxt/src/rpc/jsonrpsee_impl.rs b/subxt/src/rpc/jsonrpsee_impl.rs new file mode 100644 index 0000000000..6133faabf1 --- /dev/null +++ b/subxt/src/rpc/jsonrpsee_impl.rs @@ -0,0 +1,86 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::{ + RpcClientT, + RpcFuture, + RpcSubscription, +}; +use crate::error::RpcError; +use futures::stream::{ + StreamExt, + TryStreamExt, +}; +use jsonrpsee::{ + core::client::{ + Client, + ClientT, + SubscriptionClientT, + }, + types::ParamsSer, +}; +use serde_json::value::{ + RawValue, + Value, +}; + +impl RpcClientT for Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RpcFuture<'a, Box> { + Box::pin(async move { + let params = prep_params_for_jsonrpsee(params)?; + let res = ClientT::request(self, method, Some(params)) + .await + .map_err(|e| RpcError(e.to_string()))?; + Ok(res) + }) + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RpcFuture<'a, RpcSubscription> { + Box::pin(async move { + let params = prep_params_for_jsonrpsee(params)?; + let sub = SubscriptionClientT::subscribe::>( + self, + sub, + Some(params), + unsub, + ) + .await + .map_err(|e| RpcError(e.to_string()))? + .map_err(|e| RpcError(e.to_string())) + .boxed(); + Ok(sub) + }) + } +} + +// This is ugly; we have to encode to Value's to be compat with the jsonrpc interface. +// Remove and simplify this once something like https://github.com/paritytech/jsonrpsee/issues/862 is in: +fn prep_params_for_jsonrpsee( + params: Option>, +) -> Result, RpcError> { + let params = match params { + Some(params) => params, + // No params? avoid any work and bail early. + None => return Ok(ParamsSer::Array(Vec::new())), + }; + let val = serde_json::to_value(¶ms).expect("RawValue guarantees valid JSON"); + let arr = match val { + Value::Array(arr) => Ok(arr), + _ => { + Err(RpcError(format!( + "RPC Params are expected to be an array but got {params}" + ))) + } + }?; + Ok(ParamsSer::Array(arr)) +} diff --git a/subxt/src/rpc/mod.rs b/subxt/src/rpc/mod.rs new file mode 100644 index 0000000000..ad08b55d59 --- /dev/null +++ b/subxt/src/rpc/mod.rs @@ -0,0 +1,77 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! RPC types and client for interacting with a substrate node. +//! +//! These is used behind the scenes by various `subxt` APIs, but can +//! also be used directly. +//! +//! - [`Rpc`] is the highest level wrapper, and the one you will run into +//! first. It contains the higher level methods for interacting with a node. +//! - [`RpcClient`] is what [`Rpc`] uses to actually talk to a node, offering +//! a [`RpcClient::request`] and [`RpcClient::subscribe`] method to do so. +//! - [`RpcClientT`] is the underlying dynamic RPC implementation. This provides +//! the low level [`RpcClientT::request_raw`] and [`RpcClientT::subscribe_raw`] +//! methods. This can be swapped out for a custom implementation, but by default +//! we'll rely on `jsonrpsee` for this. +//! +//! # Example +//! +//! Fetching storage keys +//! +//! ```no_run +//! use subxt::{ PolkadotConfig, OnlineClient, storage::StorageKey }; +//! +//! #[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")] +//! pub mod polkadot {} +//! +//! # #[tokio::main] +//! # async fn main() { +//! let api = OnlineClient::::new().await.unwrap(); +//! +//! let key = polkadot::storage() +//! .xcm_pallet() +//! .version_notifiers_root() +//! .to_bytes(); +//! +//! // Fetch up to 10 keys. +//! let keys = api +//! .rpc() +//! .storage_keys_paged(&key, 10, None, None) +//! .await +//! .unwrap(); +//! +//! for key in keys.iter() { +//! println!("Key: 0x{}", hex::encode(&key)); +//! } +//! # } +//! ``` + +// Allow an `rpc.rs` file in the `rpc` folder to align better +// with other file names for their types. +#![allow(clippy::module_inception)] + +#[cfg(feature = "jsonrpsee")] +mod jsonrpsee_impl; + +mod rpc; +mod rpc_client; +mod rpc_client_t; + +// Expose the `Rpc` struct and any associated types. +pub use rpc::*; + +pub use rpc_client_t::{ + RawValue, + RpcClientT, + RpcFuture, + RpcSubscription, +}; + +pub use rpc_client::{ + rpc_params, + RpcClient, + RpcParams, + Subscription, +}; diff --git a/subxt/src/rpc.rs b/subxt/src/rpc/rpc.rs similarity index 91% rename from subxt/src/rpc.rs rename to subxt/src/rpc/rpc.rs index 49a4979413..cc650bea69 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc/rpc.rs @@ -39,16 +39,12 @@ //! # } //! ``` -// jsonrpsee subscriptions are interminable. -// Allows `while let status = subscription.next().await {}` -// Related: https://github.com/paritytech/subxt/issues/66 -#![allow(irrefutable_let_patterns)] - -use std::{ - collections::HashMap, - sync::Arc, +use super::{ + rpc_params, + RpcClient, + RpcClientT, + Subscription, }; - use crate::{ error::Error, utils::PhantomDataSendSync, @@ -60,29 +56,6 @@ use codec::{ Encode, }; use frame_metadata::RuntimeMetadataPrefixed; -pub use jsonrpsee::{ - client_transport::ws::{ - InvalidUri, - Receiver as WsReceiver, - Sender as WsSender, - Uri, - WsTransportClientBuilder, - }, - core::{ - client::{ - Client as RpcClient, - ClientBuilder as RpcClientBuilder, - ClientT, - Subscription, - SubscriptionClientT, - }, - to_json_value, - DeserializeOwned, - Error as RpcError, - JsonValue, - }, - rpc_params, -}; use serde::{ Deserialize, Serialize, @@ -103,6 +76,7 @@ use sp_runtime::{ }, ApplyExtrinsicResult, }; +use std::collections::HashMap; /// A number type that can be serialized both as a number or a string that encodes a number in a /// string. @@ -344,8 +318,7 @@ pub struct Health { /// Client for substrate rpc interfaces pub struct Rpc { - /// Rpc client for sending requests. - pub client: Arc, + client: RpcClient, _marker: PhantomDataSendSync, } @@ -358,11 +331,20 @@ impl Clone for Rpc { } } +// Expose subscribe/request, and also subscribe_raw/request_raw +// from the even-deeper `dyn RpcClientT` impl. +impl std::ops::Deref for Rpc { + type Target = RpcClient; + fn deref(&self) -> &Self::Target { + &self.client + } +} + impl Rpc { /// Create a new [`Rpc`] - pub fn new(client: RpcClient) -> Self { + pub fn new(client: R) -> Self { Self { - client: Arc::new(client), + client: RpcClient::new(client), _marker: PhantomDataSendSync::new(), } } @@ -445,30 +427,29 @@ impl Rpc { /// Fetch system properties pub async fn system_properties(&self) -> Result { - Ok(self - .client + self.client .request("system_properties", rpc_params![]) - .await?) + .await } /// Fetch system health pub async fn system_health(&self) -> Result { - Ok(self.client.request("system_health", rpc_params![]).await?) + self.client.request("system_health", rpc_params![]).await } /// Fetch system chain pub async fn system_chain(&self) -> Result { - Ok(self.client.request("system_chain", rpc_params![]).await?) + self.client.request("system_chain", rpc_params![]).await } /// Fetch system name pub async fn system_name(&self) -> Result { - Ok(self.client.request("system_name", rpc_params![]).await?) + self.client.request("system_name", rpc_params![]).await } /// Fetch system version pub async fn system_version(&self) -> Result { - Ok(self.client.request("system_version", rpc_params![]).await?) + self.client.request("system_version", rpc_params![]).await } /// Fetch the current nonce for the given account ID. @@ -476,10 +457,9 @@ impl Rpc { &self, account: &T::AccountId, ) -> Result { - Ok(self - .client + self.client .request("system_accountNextIndex", rpc_params![account]) - .await?) + .await } /// Get a header @@ -650,10 +630,9 @@ impl Rpc { /// Generate new session keys and returns the corresponding public keys. pub async fn rotate_keys(&self) -> Result { - Ok(self - .client + self.client .request("author_rotateKeys", rpc_params![]) - .await?) + .await } /// Checks if the keystore has private keys for the given session public keys. @@ -663,7 +642,7 @@ impl Rpc { /// Returns `true` iff all private keys could be found. pub async fn has_session_keys(&self, session_keys: Bytes) -> Result { let params = rpc_params![session_keys]; - Ok(self.client.request("author_hasSessionKeys", params).await?) + self.client.request("author_hasSessionKeys", params).await } /// Checks if the keystore has private keys for the given public key and key type. @@ -675,7 +654,7 @@ impl Rpc { key_type: String, ) -> Result { let params = rpc_params![public_key, key_type]; - Ok(self.client.request("author_hasKey", params).await?) + self.client.request("author_hasKey", params).await } /// Submits the extrinsic to the dry_run RPC, to test if it would succeed. @@ -694,24 +673,6 @@ impl Rpc { } } -/// Build WS RPC client from URL -pub async fn ws_client(url: &str) -> Result { - let (sender, receiver) = ws_transport(url).await?; - Ok(RpcClientBuilder::default() - .max_notifs_per_subscription(4096) - .build_with_tokio(sender, receiver)) -} - -async fn ws_transport(url: &str) -> Result<(WsSender, WsReceiver), RpcError> { - let url: Uri = url - .parse() - .map_err(|e: InvalidUri| RpcError::Transport(e.into()))?; - WsTransportClientBuilder::default() - .build(url) - .await - .map_err(|e| RpcError::Transport(e.into())) -} - fn to_hex(bytes: impl AsRef<[u8]>) -> String { format!("0x{}", hex::encode(bytes.as_ref())) } diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs new file mode 100644 index 0000000000..96560c60b7 --- /dev/null +++ b/subxt/src/rpc/rpc_client.rs @@ -0,0 +1,218 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::{ + RpcClientT, + RpcSubscription, +}; +use crate::error::Error; +use futures::{ + Stream, + StreamExt, +}; +use serde::{ + de::DeserializeOwned, + Serialize, +}; +use serde_json::value::RawValue; +use std::{ + pin::Pin, + sync::Arc, + task::Poll, +}; + +/// A concrete wrapper around an [`RpcClientT`] which exposes the udnerlying interface via some +/// higher level methods that make it a little easier to work with. +/// +/// Wrapping [`RpcClientT`] in this way is simply a way to expose this additional functionality +/// without getting into issues with non-object-safe methods or no `async` in traits. +#[derive(Clone)] +pub struct RpcClient(Arc); + +impl RpcClient { + pub(crate) fn new(client: R) -> Self { + RpcClient(Arc::new(client)) + } + + /// Make an RPC request, given a method name and some parameters. + /// + /// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to + /// construct the parameters. + pub async fn request( + &self, + method: &str, + params: RpcParams, + ) -> Result { + let res = self.0.request_raw(method, params.build()).await?; + let val = serde_json::from_str(res.get())?; + Ok(val) + } + + /// Subscribe to an RPC endpoint, providing the parameters and the method to call to + /// unsubscribe from it again. + /// + /// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to + /// construct the parameters. + pub async fn subscribe( + &self, + sub: &str, + params: RpcParams, + unsub: &str, + ) -> Result, Error> { + let sub = self.0.subscribe_raw(sub, params.build(), unsub).await?; + Ok(Subscription::new(sub)) + } +} + +impl std::fmt::Debug for RpcClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("RpcClient").finish() + } +} + +impl std::ops::Deref for RpcClient { + type Target = dyn RpcClientT; + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +/// Create some [`RpcParams`] to pass to our [`RpcClient`]. [`RpcParams`] +/// simply enforces that parameters handed to our [`RpcClient`] methods +/// are the correct shape. +/// +/// As with the [`serde_json::json!`] macro, this will panic if you provide +/// parameters which cannot successfully be serialized to JSON. +/// +/// # Example +/// +/// ```rust +/// use subxt::rpc::{ rpc_params, RpcParams }; +/// +/// // If you provide no params you get `None` back +/// let params: RpcParams = rpc_params![]; +/// assert!(params.build().is_none()); +/// +/// // If you provide params you get `Some>` back. +/// let params: RpcParams = rpc_params![1, true, "foo"]; +/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]"); +/// ``` +#[macro_export] +macro_rules! rpc_params { + ($($p:expr), *) => {{ + // May be unused if empty; no params. + #[allow(unused_mut)] + let mut params = $crate::rpc::RpcParams::new(); + $( + params.push($p).expect("values passed to rpc_params! must be serializable to JSON"); + )* + params + }} +} +pub use rpc_params; + +/// This represents the parameters passed to an [`RpcClient`], and exists to +/// enforce that parameters are provided in the correct format. +/// +/// Prefer to use the [`rpc_params!`] macro for simpler creation of these. +/// +/// # Example +/// +/// ```rust +/// use subxt::rpc::RpcParams; +/// +/// let mut params = RpcParams::new(); +/// params.push(1).unwrap(); +/// params.push(true).unwrap(); +/// params.push("foo").unwrap(); +/// +/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]"); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct RpcParams(Vec); + +impl RpcParams { + /// Create a new empty set of [`RpcParams`]. + pub fn new() -> Self { + Self(Vec::new()) + } + /// Push a parameter into our [`RpcParams`]. This serializes it to JSON + /// in the process, and so will return an error if this is not possible. + pub fn push(&mut self, param: P) -> Result<(), Error> { + if self.0.is_empty() { + self.0.push(b'['); + } else { + self.0.push(b',') + } + serde_json::to_writer(&mut self.0, ¶m)?; + Ok(()) + } + /// Build a [`RawValue`] from our params, returning `None` if no parameters + /// were provided. + pub fn build(mut self) -> Option> { + if self.0.is_empty() { + None + } else { + self.0.push(b']'); + let s = unsafe { String::from_utf8_unchecked(self.0) }; + Some(RawValue::from_string(s).expect("Should be valid JSON")) + } + } +} + +/// A generic RPC Subscription. This implements [`Stream`], and so most of +/// the functionality you'll need to interact with it comes from the +/// [`StreamExt`] extension trait. +pub struct Subscription { + inner: RpcSubscription, + _marker: std::marker::PhantomData, +} + +impl std::fmt::Debug for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Subscription") + .field("inner", &"RpcSubscription") + .field("_marker", &self._marker) + .finish() + } +} + +impl Subscription { + fn new(inner: RpcSubscription) -> Self { + Self { + inner, + _marker: std::marker::PhantomData, + } + } +} + +impl Subscription { + /// Wait for the next item from the subscription. + pub async fn next(&mut self) -> Option> { + StreamExt::next(self).await + } +} + +impl std::marker::Unpin for Subscription {} + +impl Stream for Subscription { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let res = futures::ready!(self.inner.poll_next_unpin(cx)); + + // Decode the inner RawValue to the type we're expecting and map + // any errors to the right shape: + let res = res.map(|r| { + r.map_err(|e| e.into()).and_then(|raw_val| { + serde_json::from_str(raw_val.get()).map_err(|e| e.into()) + }) + }); + + Poll::Ready(res) + } +} diff --git a/subxt/src/rpc/rpc_client_t.rs b/subxt/src/rpc/rpc_client_t.rs new file mode 100644 index 0000000000..17f4ec9ab9 --- /dev/null +++ b/subxt/src/rpc/rpc_client_t.rs @@ -0,0 +1,59 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use crate::error::RpcError; +use futures::Stream; +use std::{ + future::Future, + pin::Pin, +}; + +// Re-exporting for simplicity since it's used a bunch in the trait definition. +pub use serde_json::value::RawValue; + +/// Any RPC client which implements this can be used in our [`super::Rpc`] type +/// to talk to a node. +/// +/// This is a low level interface whose methods expect an already-serialized set of params, +/// and return an owned but still-serialized [`RawValue`], deferring deserialization to +/// the caller. This is the case because we want the methods to be object-safe (which prohibits +/// generics), and want to avoid any unnecessary allocations in serializing/deserializing +/// parameters. +pub trait RpcClientT: Send + Sync + 'static { + /// Make a raw request for which we expect a single response back from. Implementations + /// should expect that the params will either be `None`, or be an already-serialized + /// JSON array of parameters. + /// + /// See [`super::RpcParams`] and the [`super::rpc_params!`] macro for an example of how to + /// construct the parameters. + /// + /// Prefer to use the interface provided on [`super::RpcClient`] where possible. + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RpcFuture<'a, Box>; + + /// Subscribe to some method. Implementations should expect that the params will + /// either be `None`, or be an already-serialized JSON array of parameters. + /// + /// See [`super::RpcParams`] and the [`super::rpc_params!`] macro for an example of how to + /// construct the parameters. + /// + /// Prefer to use the interface provided on [`super::RpcClient`] where possible. + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RpcFuture<'a, RpcSubscription>; +} + +/// A boxed future that is returned from the [`RpcClientT`] methods. +pub type RpcFuture<'a, T> = + Pin> + Send + 'a>>; + +/// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method. +pub type RpcSubscription = + Pin, RpcError>> + Send + 'static>>; diff --git a/subxt/src/storage/storage_client.rs b/subxt/src/storage/storage_client.rs index af3635f330..e6777b567b 100644 --- a/subxt/src/storage/storage_client.rs +++ b/subxt/src/storage/storage_client.rs @@ -318,7 +318,7 @@ pub struct KeyIter { _marker: std::marker::PhantomData, } -impl<'a, T: Config, Client: OnlineClientT, ReturnTy> KeyIter +impl<'a, T, Client, ReturnTy> KeyIter where T: Config, Client: OnlineClientT, diff --git a/subxt/src/tx/tx_client.rs b/subxt/src/tx/tx_client.rs index 28d4e5bf28..47ba2ee94b 100644 --- a/subxt/src/tx/tx_client.rs +++ b/subxt/src/tx/tx_client.rs @@ -208,7 +208,11 @@ impl> TxClient { } } -impl> TxClient { +impl TxClient +where + T: Config, + C: OnlineClientT, +{ /// Creates a raw signed extrinsic, without submitting it. pub async fn create_signed( &self, diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 4143468928..9cbf752164 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -11,6 +11,7 @@ use crate::{ error::{ DispatchError, Error, + RpcError, TransactionError, }, events::{ @@ -21,7 +22,10 @@ use crate::{ Phase, StaticEvent, }, - rpc::SubstrateTxStatus, + rpc::{ + Subscription, + SubstrateTxStatus, + }, Config, }; use derivative::Derivative; @@ -29,10 +33,6 @@ use futures::{ Stream, StreamExt, }; -use jsonrpsee::core::{ - client::Subscription as RpcSubscription, - Error as RpcError, -}; use sp_runtime::traits::Hash; pub use sp_runtime::traits::SignedExtension; @@ -41,7 +41,7 @@ pub use sp_runtime::traits::SignedExtension; #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub struct TxProgress { - sub: Option>>, + sub: Option>>, ext_hash: T::Hash, client: C, } @@ -54,7 +54,7 @@ impl Unpin for TxProgress {} impl TxProgress { /// Instantiate a new [`TxProgress`] from a custom subscription. pub fn new( - sub: RpcSubscription>, + sub: Subscription>, client: C, ext_hash: T::Hash, ) -> Self { @@ -71,7 +71,11 @@ impl TxProgress { } } -impl> TxProgress { +impl TxProgress +where + T: Config, + C: OnlineClientT, +{ /// Return the next transaction status when it's emitted. This just delegates to the /// [`futures::Stream`] implementation for [`TxProgress`], but allows you to /// avoid importing that trait if you don't otherwise need it. @@ -103,7 +107,7 @@ impl> TxProgress { _ => continue, } } - Err(RpcError::Custom("RPC subscription dropped".into()).into()) + Err(RpcError("RPC subscription dropped".to_string()).into()) } /// Wait for the transaction to be finalized, and return a [`TxInBlock`] @@ -129,7 +133,7 @@ impl> TxProgress { _ => continue, } } - Err(RpcError::Custom("RPC subscription dropped".into()).into()) + Err(RpcError("RPC subscription dropped".to_string()).into()) } /// Wait for the transaction to be finalized, and for the transaction events to indicate @@ -161,47 +165,45 @@ impl> Stream for TxProgress { None => return Poll::Ready(None), }; - sub.poll_next_unpin(cx) - .map_err(|e| e.into()) - .map_ok(|status| { - match status { - SubstrateTxStatus::Future => TxStatus::Future, - SubstrateTxStatus::Ready => TxStatus::Ready, - SubstrateTxStatus::Broadcast(peers) => TxStatus::Broadcast(peers), - SubstrateTxStatus::InBlock(hash) => { - TxStatus::InBlock(TxInBlock::new( - hash, - self.ext_hash, - self.client.clone(), - )) - } - SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), - SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), - SubstrateTxStatus::Dropped => TxStatus::Dropped, - SubstrateTxStatus::Invalid => TxStatus::Invalid, - // Only the following statuses are actually considered "final" (see the substrate - // docs on `TxStatus`). Basically, either the transaction makes it into a - // block, or we eventually give up on waiting for it to make it into a block. - // Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. - // - // As an example, a transaction that is `Invalid` on one node due to having the wrong - // nonce might still be valid on some fork on another node which ends up being finalized. - // Equally, a transaction `Dropped` from one node may still be in the transaction pool, - // and make it into a block, on another node. Likewise with `Usurped`. - SubstrateTxStatus::FinalityTimeout(hash) => { - self.sub = None; - TxStatus::FinalityTimeout(hash) - } - SubstrateTxStatus::Finalized(hash) => { - self.sub = None; - TxStatus::Finalized(TxInBlock::new( - hash, - self.ext_hash, - self.client.clone(), - )) - } + sub.poll_next_unpin(cx).map_ok(|status| { + match status { + SubstrateTxStatus::Future => TxStatus::Future, + SubstrateTxStatus::Ready => TxStatus::Ready, + SubstrateTxStatus::Broadcast(peers) => TxStatus::Broadcast(peers), + SubstrateTxStatus::InBlock(hash) => { + TxStatus::InBlock(TxInBlock::new( + hash, + self.ext_hash, + self.client.clone(), + )) } - }) + SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), + SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), + SubstrateTxStatus::Dropped => TxStatus::Dropped, + SubstrateTxStatus::Invalid => TxStatus::Invalid, + // Only the following statuses are actually considered "final" (see the substrate + // docs on `TxStatus`). Basically, either the transaction makes it into a + // block, or we eventually give up on waiting for it to make it into a block. + // Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. + // + // As an example, a transaction that is `Invalid` on one node due to having the wrong + // nonce might still be valid on some fork on another node which ends up being finalized. + // Equally, a transaction `Dropped` from one node may still be in the transaction pool, + // and make it into a block, on another node. Likewise with `Usurped`. + SubstrateTxStatus::FinalityTimeout(hash) => { + self.sub = None; + TxStatus::FinalityTimeout(hash) + } + SubstrateTxStatus::Finalized(hash) => { + self.sub = None; + TxStatus::Finalized(TxInBlock::new( + hash, + self.ext_hash, + self.client.clone(), + )) + } + } + }) } } diff --git a/testing/test-runtime/Cargo.toml b/testing/test-runtime/Cargo.toml index 83e0808b24..2742ffd333 100644 --- a/testing/test-runtime/Cargo.toml +++ b/testing/test-runtime/Cargo.toml @@ -13,3 +13,4 @@ subxt = { path = "../../subxt" } sp-core = "6.0.0" tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] } which = "4.2.2" +jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport"] } diff --git a/testing/test-runtime/build.rs b/testing/test-runtime/build.rs index c36ded937c..607433fa59 100644 --- a/testing/test-runtime/build.rs +++ b/testing/test-runtime/build.rs @@ -15,10 +15,6 @@ use std::{ thread, time, }; -use subxt::rpc::{ - self, - ClientT, -}; static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH"; @@ -62,7 +58,8 @@ async fn run() { // It might take a while for substrate node that spin up the RPC server. // Thus, the connection might get rejected a few times. - let res = match rpc::ws_client(&format!("ws://localhost:{}", port)).await { + use client::ClientT; + let res = match client::build(&format!("ws://localhost:{}", port)).await { Ok(c) => c.request("state_getMetadata", None).await, Err(e) => Err(e), }; @@ -157,3 +154,41 @@ impl Drop for KillOnDrop { let _ = self.0.kill(); } } + +// Use jsonrpsee to obtain metadata from the node. +mod client { + pub use jsonrpsee::{ + client_transport::ws::{ + InvalidUri, + Receiver, + Sender, + Uri, + WsTransportClientBuilder, + }, + core::{ + client::{ + Client, + ClientBuilder, + }, + Error, + }, + }; + + pub use jsonrpsee::core::client::ClientT; + + /// Build WS RPC client from URL + pub async fn build(url: &str) -> Result { + let (sender, receiver) = ws_transport(url).await?; + Ok(ClientBuilder::default().build_with_tokio(sender, receiver)) + } + + async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> { + let url: Uri = url + .parse() + .map_err(|e: InvalidUri| Error::Transport(e.into()))?; + WsTransportClientBuilder::default() + .build(url) + .await + .map_err(|e| Error::Transport(e.into())) + } +}