From c8b090751bf9067a717deb139e9eea530751c760 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 30 Nov 2021 17:04:29 +0100 Subject: [PATCH] PoC async rpc client --- Cargo.toml | 2 +- src/client.rs | 2 +- src/rpc.rs | 216 +++++++++++++++--------------------------- test-runtime/build.rs | 7 +- 4 files changed, 81 insertions(+), 146 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e7141690f1..9b8a089454 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ chameleon = "0.1.0" scale-info = { version = "1.0.0", features = ["bit-vec"] } futures = "0.3.13" hex = "0.4.3" -jsonrpsee = { version = "0.5.1", features = ["macros", "ws-client", "http-client"] } +jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", branch = "extract-async-client", features = ["macros", "client"] } log = "0.4.14" num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.124", features = ["derive"] } diff --git a/src/client.rs b/src/client.rs index c243d0fc91..50899f398e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -91,7 +91,7 @@ impl ClientBuilder { client } else { let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944"); - RpcClient::try_from_url(url).await? + crate::rpc::build_ws_client(url).await? }; let mut rpc = Rpc::new(client); if self.accept_weak_inclusion { diff --git a/src/rpc.rs b/src/rpc.rs index 1543a6b228..4fc1a393cd 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -33,11 +33,18 @@ use core::{ marker::PhantomData, }; use frame_metadata::RuntimeMetadataPrefixed; -use jsonrpsee::{ - http_client::{ - HttpClient, - HttpClientBuilder, +pub use jsonrpsee::{ + client_transport::ws::{ + Uri, + WsTransportClientBuilder, + Sender as WsSender, + Receiver as WsReceiver, }, + core_client::{ + Client as RpcClient, + ClientBuilder as RpcClientBuilder, + }, + rpc_params, types::{ to_json_value, traits::{ @@ -49,10 +56,6 @@ use jsonrpsee::{ JsonValue, Subscription, }, - ws_client::{ - WsClient, - WsClientBuilder, - }, }; use serde::{ Deserialize, @@ -186,102 +189,6 @@ pub enum TransactionStatus { Invalid, } -/// Rpc client wrapper. -/// This is workaround because adding generic types causes the macros to fail. -#[derive(Clone)] -pub enum RpcClient { - /// JSONRPC client WebSocket transport. - WebSocket(Arc), - /// JSONRPC client HTTP transport. - // NOTE: Arc because `HttpClient` is not clone. - Http(Arc), -} - -impl RpcClient { - /// Create a new [`RpcClient`] from the given URL. - /// - /// Infers the protocol from the URL, supports: - /// - Websockets (`ws://`, `wss://`) - /// - Http (`http://`, `https://`) - pub async fn try_from_url(url: &str) -> Result { - if url.starts_with("ws://") || url.starts_with("wss://") { - let client = WsClientBuilder::default() - .max_notifs_per_subscription(4096) - .build(url) - .await?; - Ok(RpcClient::WebSocket(Arc::new(client))) - } else { - let client = HttpClientBuilder::default().build(&url)?; - Ok(RpcClient::Http(Arc::new(client))) - } - } - - /// Start a JSON-RPC request. - pub async fn request<'a, T: DeserializeOwned + std::fmt::Debug>( - &self, - method: &str, - params: &[JsonValue], - ) -> Result { - let params = Some(params.into()); - log::debug!("request {}: {:?}", method, params); - let data = match self { - Self::WebSocket(inner) => { - inner.request(method, params).await.map_err(Into::into) - } - Self::Http(inner) => inner.request(method, params).await.map_err(Into::into), - }; - data - } - - /// Start a JSON-RPC Subscription. - pub async fn subscribe<'a, T: DeserializeOwned>( - &self, - subscribe_method: &str, - params: &[JsonValue], - unsubscribe_method: &str, - ) -> Result, Error> { - let params = Some(params.into()); - match self { - Self::WebSocket(inner) => { - inner - .subscribe(subscribe_method, params, unsubscribe_method) - .await - .map_err(Into::into) - } - Self::Http(_) => { - Err(RpcError::Custom( - "Subscriptions not supported on HTTP transport".to_owned(), - ) - .into()) - } - } - } -} - -impl From for RpcClient { - fn from(client: WsClient) -> Self { - RpcClient::WebSocket(Arc::new(client)) - } -} - -impl From> for RpcClient { - fn from(client: Arc) -> Self { - RpcClient::WebSocket(client) - } -} - -impl From for RpcClient { - fn from(client: HttpClient) -> Self { - RpcClient::Http(Arc::new(client)) - } -} - -impl From> for RpcClient { - fn from(client: Arc) -> Self { - RpcClient::Http(client) - } -} - /// ReadProof struct returned by the RPC /// /// # Note @@ -300,7 +207,7 @@ pub struct ReadProof { /// Client for substrate rpc interfaces pub struct Rpc { /// Rpc client for sending requests. - pub client: RpcClient, + pub client: Arc, marker: PhantomData, accept_weak_inclusion: bool, } @@ -319,7 +226,7 @@ impl Rpc { /// Create a new [`Rpc`] pub fn new(client: RpcClient) -> Self { Self { - client, + client: Arc::new(client), marker: PhantomData, accept_weak_inclusion: false, } @@ -337,7 +244,7 @@ impl Rpc { key: &StorageKey, hash: Option, ) -> Result, Error> { - let params = &[to_json_value(key)?, to_json_value(hash)?]; + let params = rpc_params![key, hash]; let data = self.client.request("state_getStorage", params).await?; Ok(data) } @@ -353,12 +260,7 @@ impl Rpc { hash: Option, ) -> Result, Error> { let prefix = prefix.map(|p| p.to_storage_key()); - let params = &[ - to_json_value(prefix)?, - to_json_value(count)?, - to_json_value(start_key)?, - to_json_value(hash)?, - ]; + let params = rpc_params![prefix, count, start_key, hash]; let data = self.client.request("state_getKeysPaged", params).await?; Ok(data) } @@ -370,11 +272,7 @@ impl Rpc { from: T::Hash, to: Option, ) -> Result>, Error> { - let params = &[ - to_json_value(keys)?, - to_json_value(from)?, - to_json_value(to)?, - ]; + let params = rpc_params![keys, from, to]; self.client .request("state_queryStorage", params) .await @@ -387,7 +285,7 @@ impl Rpc { keys: &[StorageKey], at: Option, ) -> Result>, Error> { - let params = &[to_json_value(keys)?, to_json_value(at)?]; + let params = rpc_params![keys, at]; self.client .request("state_queryStorageAt", params) .await @@ -397,7 +295,7 @@ impl Rpc { /// Fetch the genesis hash pub async fn genesis_hash(&self) -> Result { let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0))); - let params = &[to_json_value(block_zero)?]; + let params = rpc_params![block_zero]; let list_or_value: ListOrValue> = self.client.request("chain_getBlockHash", params).await?; match list_or_value { @@ -410,7 +308,10 @@ impl Rpc { /// Fetch the metadata pub async fn metadata(&self) -> Result { - let bytes: Bytes = self.client.request("state_getMetadata", &[]).await?; + let bytes: Bytes = self + .client + .request("state_getMetadata", rpc_params![]) + .await?; let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?; let metadata: Metadata = meta.try_into()?; Ok(metadata) @@ -418,7 +319,10 @@ impl Rpc { /// Fetch system properties pub async fn system_properties(&self) -> Result { - Ok(self.client.request("system_properties", &[]).await?) + Ok(self + .client + .request("system_properties", rpc_params![]) + .await?) } /// Get a header @@ -426,7 +330,7 @@ impl Rpc { &self, hash: Option, ) -> Result, Error> { - let params = &[to_json_value(hash)?]; + let params = rpc_params![hash]; let header = self.client.request("chain_getHeader", params).await?; Ok(header) } @@ -437,7 +341,7 @@ impl Rpc { block_number: Option, ) -> Result, Error> { let block_number = block_number.map(ListOrValue::Value); - let params = &[to_json_value(block_number)?]; + let params = rpc_params![block_number]; let list_or_value = self.client.request("chain_getBlockHash", params).await?; match list_or_value { ListOrValue::Value(hash) => Ok(hash), @@ -447,7 +351,10 @@ impl Rpc { /// Get a block hash of the latest finalized block pub async fn finalized_head(&self) -> Result { - let hash = self.client.request("chain_getFinalizedHead", &[]).await?; + let hash = self + .client + .request("chain_getFinalizedHead", rpc_params![]) + .await?; Ok(hash) } @@ -456,7 +363,7 @@ impl Rpc { &self, hash: Option, ) -> Result>, Error> { - let params = &[to_json_value(hash)?]; + let params = rpc_params![hash]; let block = self.client.request("chain_getBlock", params).await?; Ok(block) } @@ -467,7 +374,7 @@ impl Rpc { keys: Vec, hash: Option, ) -> Result, Error> { - let params = &[to_json_value(keys)?, to_json_value(hash)?]; + let params = rpc_params![keys, hash]; let proof = self.client.request("state_getReadProof", params).await?; Ok(proof) } @@ -477,7 +384,7 @@ impl Rpc { &self, at: Option, ) -> Result { - let params = &[to_json_value(at)?]; + let params = rpc_params![at]; let version = self .client .request("state_getRuntimeVersion", params) @@ -491,7 +398,7 @@ impl Rpc { /// `subscribe_finalized_events` to ensure events are finalized. pub async fn subscribe_events(&self) -> Result, Error> { let keys = Some(vec![StorageKey::from(SystemEvents::new())]); - let params = &[to_json_value(keys)?]; + let params = rpc_params![keys]; let subscription = self .client @@ -516,7 +423,11 @@ impl Rpc { pub async fn subscribe_blocks(&self) -> Result, Error> { let subscription = self .client - .subscribe("chain_subscribeNewHeads", &[], "chain_unsubscribeNewHeads") + .subscribe( + "chain_subscribeNewHeads", + rpc_params![], + "chain_unsubscribeNewHeads", + ) .await?; Ok(subscription) @@ -530,7 +441,7 @@ impl Rpc { .client .subscribe( "chain_subscribeFinalizedHeads", - &[], + rpc_params![], "chain_unsubscribeFinalizedHeads", ) .await?; @@ -543,7 +454,7 @@ impl Rpc { extrinsic: E, ) -> Result { let bytes: Bytes = extrinsic.encode().into(); - let params = &[to_json_value(bytes)?]; + let params = rpc_params![bytes]; let xt_hash = self .client .request("author_submitExtrinsic", params) @@ -557,7 +468,7 @@ impl Rpc { extrinsic: E, ) -> Result>, Error> { let bytes: Bytes = extrinsic.encode().into(); - let params = &[to_json_value(bytes)?]; + let params = rpc_params![bytes]; let subscription = self .client .subscribe( @@ -669,18 +580,17 @@ impl Rpc { suri: String, public: Bytes, ) -> Result<(), Error> { - let params = &[ - to_json_value(key_type)?, - to_json_value(suri)?, - to_json_value(public)?, - ]; + let params = rpc_params![key_type, suri, public]; self.client.request("author_insertKey", params).await?; Ok(()) } /// Generate new session keys and returns the corresponding public keys. pub async fn rotate_keys(&self) -> Result { - Ok(self.client.request("author_rotateKeys", &[]).await?) + Ok(self + .client + .request("author_rotateKeys", rpc_params![]) + .await?) } /// Checks if the keystore has private keys for the given session public keys. @@ -689,7 +599,7 @@ impl Rpc { /// /// Returns `true` iff all private keys could be found. pub async fn has_session_keys(&self, session_keys: Bytes) -> Result { - let params = &[to_json_value(session_keys)?]; + let params = rpc_params![session_keys]; Ok(self.client.request("author_hasSessionKeys", params).await?) } @@ -701,7 +611,7 @@ impl Rpc { public_key: Bytes, key_type: String, ) -> Result { - let params = &[to_json_value(public_key)?, to_json_value(key_type)?]; + let params = rpc_params![public_key, key_type]; Ok(self.client.request("author_hasKey", params).await?) } } @@ -737,3 +647,27 @@ impl ExtrinsicSuccess { } } } + +/// Example to check that `From<(Sender, Receiver) for RpcClient` works. +pub async fn build_ws_client_default(url: &str) -> Result { + let client = ws_transport(url).await.into(); + Ok(client) +} + +/// Build WS RPC client from URL +pub async fn build_ws_client(url: &str) -> Result { + let (sender, receiver) = ws_transport(url).await; + Ok(RpcClientBuilder::default() + .max_notifs_per_subscription(4096) + .build(sender, receiver)) +} + +async fn ws_transport(url: &str) -> (WsSender, WsReceiver) { + // TODO: fix unwraps because I'm lazy. + + let url: Uri = url.parse().unwrap(); + WsTransportClientBuilder::default() + .build(url) + .await + .unwrap() +} diff --git a/test-runtime/build.rs b/test-runtime/build.rs index 0251219bc6..b6f50ba155 100644 --- a/test-runtime/build.rs +++ b/test-runtime/build.rs @@ -27,6 +27,7 @@ use std::{ thread, time, }; +use subxt::rpc::{Client as _, rpc_params, build_ws_client}; static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH"; @@ -42,7 +43,7 @@ async fn main() { let cmd = Command::new(&substrate_bin) .arg("--dev") .arg("--tmp") - .arg(format!("--rpc-port={}", port)) + .arg(format!("--ws-port={}", port)) .spawn(); let mut cmd = match cmd { Ok(cmd) => cmd, @@ -61,10 +62,10 @@ async fn main() { panic!("Cannot connect to substrate node after {} retries", retries); } let res = - subxt::RpcClient::try_from_url(&format!("http://localhost:{}", port)) + build_ws_client(&format!("ws://localhost:{}", port)) .await .expect("should only error if malformed URL for an HTTP connection") - .request("state_getMetadata", &[]) + .request("state_getMetadata", rpc_params![]) .await; match res { Ok(res) => {