diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 2c0400db1c..a8ef23d64a 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -20,13 +20,12 @@ chameleon = "0.1.0" scale-info = { version = "1.0.0", features = ["bit-vec"] } futures = "0.3.13" hex = "0.4.3" -jsonrpsee = { version = "0.8.0", features = ["ws-client", "http-client"] } +jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] } log = "0.4.14" num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.124", features = ["derive"] } serde_json = "1.0.64" thiserror = "1.0.24" -url = "2.2.1" subxt-macro = { version = "0.16.0", path = "../macro" } diff --git a/subxt/src/client.rs b/subxt/src/client.rs index de074214b1..bd1d6298e6 100644 --- a/subxt/src/client.rs +++ b/subxt/src/client.rs @@ -86,7 +86,7 @@ impl ClientBuilder { client } else { let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944"); - RpcClient::try_from_url(url).await? + crate::rpc::ws_client(url).await? }; let rpc = Rpc::new(client); let (metadata, genesis_hash, runtime_version, properties) = future::join4( diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index e5e2d20620..0cec6a634b 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -46,10 +46,18 @@ use core::{ marker::PhantomData, }; use frame_metadata::RuntimeMetadataPrefixed; -use jsonrpsee::{ +pub use jsonrpsee::{ + client_transport::ws::{ + InvalidUri, + Receiver as WsReceiver, + Sender as WsSender, + Uri, + WsTransportClientBuilder, + }, core::{ client::{ - Client, + Client as RpcClient, + ClientBuilder as RpcClientBuilder, ClientT, Subscription, SubscriptionClientT, @@ -59,11 +67,7 @@ use jsonrpsee::{ Error as RpcError, JsonValue, }, - http_client::{ - HttpClient, - HttpClientBuilder, - }, - ws_client::WsClientBuilder, + rpc_params, }; use serde::{ Deserialize, @@ -193,99 +197,6 @@ pub struct RuntimeVersion { pub other: HashMap, } -/// Rpc client wrapper. -/// This is workaround because adding generic types causes the macros to fail. -#[derive(Clone)] -pub enum RpcClient { - /// JSONRPC client WebSocket transport. - WebSocket(Arc), - /// JSONRPC client HTTP transport. - // NOTE: Arc because `HttpClient` is not clone. - Http(Arc), -} - -impl RpcClient { - /// Create a new [`RpcClient`] from the given URL. - /// - /// Infers the protocol from the URL, supports: - /// - Websockets (`ws://`, `wss://`) - /// - Http (`http://`, `https://`) - pub async fn try_from_url(url: &str) -> Result { - if url.starts_with("ws://") || url.starts_with("wss://") { - let client = WsClientBuilder::default() - .max_notifs_per_subscription(4096) - .build(url) - .await?; - Ok(RpcClient::WebSocket(Arc::new(client))) - } else { - let client = HttpClientBuilder::default().build(&url)?; - Ok(RpcClient::Http(Arc::new(client))) - } - } - - /// Start a JSON-RPC request. - pub async fn request<'a, T: DeserializeOwned + std::fmt::Debug>( - &self, - method: &str, - params: &[JsonValue], - ) -> Result { - let params = Some(params.into()); - log::debug!("request {}: {:?}", method, params); - let data = match self { - RpcClient::WebSocket(inner) => inner.request(method, params).await, - RpcClient::Http(inner) => inner.request(method, params).await, - }; - log::debug!("response: {:?}", data); - data - } - - /// Start a JSON-RPC Subscription. - pub async fn subscribe<'a, T: DeserializeOwned>( - &self, - subscribe_method: &str, - params: &[JsonValue], - unsubscribe_method: &str, - ) -> Result, RpcError> { - let params = Some(params.into()); - match self { - RpcClient::WebSocket(inner) => { - inner - .subscribe(subscribe_method, params, unsubscribe_method) - .await - } - RpcClient::Http(_) => { - Err(RpcError::Custom( - "Subscriptions not supported on HTTP transport".to_owned(), - )) - } - } - } -} - -impl From for RpcClient { - fn from(client: Client) -> Self { - RpcClient::WebSocket(Arc::new(client)) - } -} - -impl From> for RpcClient { - fn from(client: Arc) -> Self { - RpcClient::WebSocket(client) - } -} - -impl From for RpcClient { - fn from(client: HttpClient) -> Self { - RpcClient::Http(Arc::new(client)) - } -} - -impl From> for RpcClient { - fn from(client: Arc) -> Self { - RpcClient::Http(client) - } -} - /// ReadProof struct returned by the RPC /// /// # Note @@ -304,7 +215,7 @@ pub struct ReadProof { /// Client for substrate rpc interfaces pub struct Rpc { /// Rpc client for sending requests. - pub client: RpcClient, + pub client: Arc, marker: PhantomData, } @@ -321,7 +232,7 @@ impl Rpc { /// Create a new [`Rpc`] pub fn new(client: RpcClient) -> Self { Self { - client, + client: Arc::new(client), marker: PhantomData, } } @@ -332,7 +243,7 @@ impl Rpc { key: &StorageKey, hash: Option, ) -> Result, BasicError> { - let params = &[to_json_value(key)?, to_json_value(hash)?]; + let params = rpc_params![key, hash]; let data = self.client.request("state_getStorage", params).await?; Ok(data) } @@ -348,12 +259,7 @@ impl Rpc { hash: Option, ) -> Result, BasicError> { let prefix = prefix.map(|p| p.to_storage_key()); - let params = &[ - to_json_value(prefix)?, - to_json_value(count)?, - to_json_value(start_key)?, - to_json_value(hash)?, - ]; + let params = rpc_params![prefix, count, start_key, hash]; let data = self.client.request("state_getKeysPaged", params).await?; Ok(data) } @@ -365,11 +271,7 @@ impl Rpc { from: T::Hash, to: Option, ) -> Result>, BasicError> { - let params = &[ - to_json_value(keys)?, - to_json_value(from)?, - to_json_value(to)?, - ]; + let params = rpc_params![keys, from, to]; self.client .request("state_queryStorage", params) .await @@ -382,7 +284,7 @@ impl Rpc { keys: &[StorageKey], at: Option, ) -> Result>, BasicError> { - let params = &[to_json_value(keys)?, to_json_value(at)?]; + let params = rpc_params![keys, at]; self.client .request("state_queryStorageAt", params) .await @@ -392,7 +294,7 @@ impl Rpc { /// Fetch the genesis hash pub async fn genesis_hash(&self) -> Result { let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0))); - let params = &[to_json_value(block_zero)?]; + let params = rpc_params![block_zero]; let list_or_value: ListOrValue> = self.client.request("chain_getBlockHash", params).await?; match list_or_value { @@ -405,7 +307,10 @@ impl Rpc { /// Fetch the metadata pub async fn metadata(&self) -> Result { - let bytes: Bytes = self.client.request("state_getMetadata", &[]).await?; + let bytes: Bytes = self + .client + .request("state_getMetadata", rpc_params![]) + .await?; let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?; let metadata: Metadata = meta.try_into()?; Ok(metadata) @@ -413,22 +318,25 @@ impl Rpc { /// Fetch system properties pub async fn system_properties(&self) -> Result { - Ok(self.client.request("system_properties", &[]).await?) + Ok(self + .client + .request("system_properties", rpc_params![]) + .await?) } /// Fetch system chain pub async fn system_chain(&self) -> Result { - Ok(self.client.request("system_chain", &[]).await?) + Ok(self.client.request("system_chain", rpc_params![]).await?) } /// Fetch system name pub async fn system_name(&self) -> Result { - Ok(self.client.request("system_name", &[]).await?) + Ok(self.client.request("system_name", rpc_params![]).await?) } /// Fetch system version pub async fn system_version(&self) -> Result { - Ok(self.client.request("system_version", &[]).await?) + Ok(self.client.request("system_version", rpc_params![]).await?) } /// Get a header @@ -436,7 +344,7 @@ impl Rpc { &self, hash: Option, ) -> Result, BasicError> { - let params = &[to_json_value(hash)?]; + let params = rpc_params![hash]; let header = self.client.request("chain_getHeader", params).await?; Ok(header) } @@ -447,7 +355,7 @@ impl Rpc { block_number: Option, ) -> Result, BasicError> { let block_number = block_number.map(ListOrValue::Value); - let params = &[to_json_value(block_number)?]; + let params = rpc_params![block_number]; let list_or_value = self.client.request("chain_getBlockHash", params).await?; match list_or_value { ListOrValue::Value(hash) => Ok(hash), @@ -457,7 +365,10 @@ impl Rpc { /// Get a block hash of the latest finalized block pub async fn finalized_head(&self) -> Result { - let hash = self.client.request("chain_getFinalizedHead", &[]).await?; + let hash = self + .client + .request("chain_getFinalizedHead", rpc_params![]) + .await?; Ok(hash) } @@ -466,7 +377,7 @@ impl Rpc { &self, hash: Option, ) -> Result>, BasicError> { - let params = &[to_json_value(hash)?]; + let params = rpc_params![hash]; let block = self.client.request("chain_getBlock", params).await?; Ok(block) } @@ -477,7 +388,7 @@ impl Rpc { keys: Vec, hash: Option, ) -> Result, BasicError> { - let params = &[to_json_value(keys)?, to_json_value(hash)?]; + let params = rpc_params![keys, hash]; let proof = self.client.request("state_getReadProof", params).await?; Ok(proof) } @@ -487,7 +398,7 @@ impl Rpc { &self, at: Option, ) -> Result { - let params = &[to_json_value(at)?]; + let params = rpc_params![at]; let version = self .client .request("state_getRuntimeVersion", params) @@ -503,7 +414,7 @@ impl Rpc { &self, ) -> Result, BasicError> { let keys = Some(vec![StorageKey::from(SystemEvents::new())]); - let params = &[to_json_value(keys)?]; + let params = rpc_params![keys]; let subscription = self .client @@ -528,7 +439,11 @@ impl Rpc { pub async fn subscribe_blocks(&self) -> Result, BasicError> { let subscription = self .client - .subscribe("chain_subscribeNewHeads", &[], "chain_unsubscribeNewHeads") + .subscribe( + "chain_subscribeNewHeads", + rpc_params![], + "chain_unsubscribeNewHeads", + ) .await?; Ok(subscription) @@ -542,7 +457,7 @@ impl Rpc { .client .subscribe( "chain_subscribeFinalizedHeads", - &[], + rpc_params![], "chain_unsubscribeFinalizedHeads", ) .await?; @@ -555,7 +470,7 @@ impl Rpc { extrinsic: X, ) -> Result { let bytes: Bytes = extrinsic.encode().into(); - let params = &[to_json_value(bytes)?]; + let params = rpc_params![bytes]; let xt_hash = self .client .request("author_submitExtrinsic", params) @@ -570,7 +485,7 @@ impl Rpc { ) -> Result>, BasicError> { let bytes: Bytes = extrinsic.encode().into(); - let params = &[to_json_value(bytes)?]; + let params = rpc_params![bytes]; let subscription = self .client .subscribe( @@ -589,18 +504,17 @@ impl Rpc { suri: String, public: Bytes, ) -> Result<(), BasicError> { - let params = &[ - to_json_value(key_type)?, - to_json_value(suri)?, - to_json_value(public)?, - ]; + let params = rpc_params![key_type, suri, public]; self.client.request("author_insertKey", params).await?; Ok(()) } /// Generate new session keys and returns the corresponding public keys. pub async fn rotate_keys(&self) -> Result { - Ok(self.client.request("author_rotateKeys", &[]).await?) + Ok(self + .client + .request("author_rotateKeys", rpc_params![]) + .await?) } /// Checks if the keystore has private keys for the given session public keys. @@ -612,7 +526,7 @@ impl Rpc { &self, session_keys: Bytes, ) -> Result { - let params = &[to_json_value(session_keys)?]; + let params = rpc_params![session_keys]; Ok(self.client.request("author_hasSessionKeys", params).await?) } @@ -624,11 +538,29 @@ impl Rpc { public_key: Bytes, key_type: String, ) -> Result { - let params = &[to_json_value(public_key)?, to_json_value(key_type)?]; + let params = rpc_params![public_key, key_type]; Ok(self.client.request("author_hasKey", params).await?) } } +/// Build WS RPC client from URL +pub async fn ws_client(url: &str) -> Result { + let (sender, receiver) = ws_transport(url).await?; + Ok(RpcClientBuilder::default() + .max_notifs_per_subscription(4096) + .build(sender, receiver)) +} + +async fn ws_transport(url: &str) -> Result<(WsSender, WsReceiver), RpcError> { + let url: Uri = url + .parse() + .map_err(|e: InvalidUri| RpcError::Transport(e.into()))?; + WsTransportClientBuilder::default() + .build(url) + .await + .map_err(|e| RpcError::Transport(e.into())) +} + #[cfg(test)] mod test { use super::*; diff --git a/test-runtime/build.rs b/test-runtime/build.rs index 71a075a37a..e82f8863ec 100644 --- a/test-runtime/build.rs +++ b/test-runtime/build.rs @@ -31,6 +31,10 @@ use std::{ thread, time, }; +use subxt::rpc::{ + self, + ClientT, +}; static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH"; @@ -50,7 +54,7 @@ async fn run() { let cmd = Command::new(&substrate_bin) .arg("--dev") .arg("--tmp") - .arg(format!("--rpc-port={}", port)) + .arg(format!("--ws-port={}", port)) .spawn(); let mut cmd = match cmd { Ok(cmd) => KillOnDrop(cmd), @@ -63,16 +67,19 @@ async fn run() { let metadata_bytes: sp_core::Bytes = { const MAX_RETRIES: usize = 20; let mut retries = 0; + loop { if retries >= MAX_RETRIES { panic!("Cannot connect to substrate node after {} retries", retries); } - let res = - subxt::RpcClient::try_from_url(&format!("http://localhost:{}", port)) - .await - .expect("should only error if malformed URL for an HTTP connection") - .request("state_getMetadata", &[]) - .await; + + // It might take a while for substrate node that spin up the RPC server. + // Thus, the connection might get rejected a few times. + let res = match rpc::ws_client(&format!("ws://localhost:{}", port)).await { + Ok(c) => c.request("state_getMetadata", None).await, + Err(e) => Err(e), + }; + match res { Ok(res) => { let _ = cmd.kill();