introduce jsonrpsee client abstraction + kill HTTP support. (#341)

* PoC async rpc client

* add client example should be removed from this repo

* fmt

* cargo fmt

* subxt client tests

* cargo fmt

* fix some nits

* try nightly for all CI jobs

* need wasm also for CI

* wasm for nightly run too

* client: add missing features

* update jsonrpsee

* hacky update jsonrpsee

* use jsonrpsee crates.io release

* ci: pin nightly 2021-12-15

* pin nightly to 2021-12-15

* fix build

* fmt

* compile please

* rewrite me

* fixes

* fixes

* pre-generate metadata

* fix nit

* get rid of needless deps

* remove embedded client

* Update Cargo.toml

* Update subxt/Cargo.toml

* Update subxt/Cargo.toml

* Update subxt/src/client.rs

* Update subxt/src/rpc.rs

* Update test-runtime/build.rs

* cargo fmt

Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Niklas Adolfsson
2022-02-04 11:35:59 +01:00
committed by GitHub
parent 7f3f686cef
commit abd7a4145b
4 changed files with 88 additions and 150 deletions
+1 -2
View File
@@ -20,13 +20,12 @@ chameleon = "0.1.0"
scale-info = { version = "1.0.0", features = ["bit-vec"] } scale-info = { version = "1.0.0", features = ["bit-vec"] }
futures = "0.3.13" futures = "0.3.13"
hex = "0.4.3" hex = "0.4.3"
jsonrpsee = { version = "0.8.0", features = ["ws-client", "http-client"] } jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] }
log = "0.4.14" log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false } num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] } serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64" serde_json = "1.0.64"
thiserror = "1.0.24" thiserror = "1.0.24"
url = "2.2.1"
subxt-macro = { version = "0.16.0", path = "../macro" } subxt-macro = { version = "0.16.0", path = "../macro" }
+1 -1
View File
@@ -86,7 +86,7 @@ impl ClientBuilder {
client client
} else { } else {
let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944"); let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944");
RpcClient::try_from_url(url).await? crate::rpc::ws_client(url).await?
}; };
let rpc = Rpc::new(client); let rpc = Rpc::new(client);
let (metadata, genesis_hash, runtime_version, properties) = future::join4( let (metadata, genesis_hash, runtime_version, properties) = future::join4(
+72 -140
View File
@@ -46,10 +46,18 @@ use core::{
marker::PhantomData, marker::PhantomData,
}; };
use frame_metadata::RuntimeMetadataPrefixed; use frame_metadata::RuntimeMetadataPrefixed;
use jsonrpsee::{ pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver as WsReceiver,
Sender as WsSender,
Uri,
WsTransportClientBuilder,
},
core::{ core::{
client::{ client::{
Client, Client as RpcClient,
ClientBuilder as RpcClientBuilder,
ClientT, ClientT,
Subscription, Subscription,
SubscriptionClientT, SubscriptionClientT,
@@ -59,11 +67,7 @@ use jsonrpsee::{
Error as RpcError, Error as RpcError,
JsonValue, JsonValue,
}, },
http_client::{ rpc_params,
HttpClient,
HttpClientBuilder,
},
ws_client::WsClientBuilder,
}; };
use serde::{ use serde::{
Deserialize, Deserialize,
@@ -193,99 +197,6 @@ pub struct RuntimeVersion {
pub other: HashMap<String, serde_json::Value>, pub other: HashMap<String, serde_json::Value>,
} }
/// Rpc client wrapper.
/// This is workaround because adding generic types causes the macros to fail.
#[derive(Clone)]
pub enum RpcClient {
/// JSONRPC client WebSocket transport.
WebSocket(Arc<Client>),
/// JSONRPC client HTTP transport.
// NOTE: Arc because `HttpClient` is not clone.
Http(Arc<HttpClient>),
}
impl RpcClient {
/// Create a new [`RpcClient`] from the given URL.
///
/// Infers the protocol from the URL, supports:
/// - Websockets (`ws://`, `wss://`)
/// - Http (`http://`, `https://`)
pub async fn try_from_url(url: &str) -> Result<Self, RpcError> {
if url.starts_with("ws://") || url.starts_with("wss://") {
let client = WsClientBuilder::default()
.max_notifs_per_subscription(4096)
.build(url)
.await?;
Ok(RpcClient::WebSocket(Arc::new(client)))
} else {
let client = HttpClientBuilder::default().build(&url)?;
Ok(RpcClient::Http(Arc::new(client)))
}
}
/// Start a JSON-RPC request.
pub async fn request<'a, T: DeserializeOwned + std::fmt::Debug>(
&self,
method: &str,
params: &[JsonValue],
) -> Result<T, RpcError> {
let params = Some(params.into());
log::debug!("request {}: {:?}", method, params);
let data = match self {
RpcClient::WebSocket(inner) => inner.request(method, params).await,
RpcClient::Http(inner) => inner.request(method, params).await,
};
log::debug!("response: {:?}", data);
data
}
/// Start a JSON-RPC Subscription.
pub async fn subscribe<'a, T: DeserializeOwned>(
&self,
subscribe_method: &str,
params: &[JsonValue],
unsubscribe_method: &str,
) -> Result<Subscription<T>, RpcError> {
let params = Some(params.into());
match self {
RpcClient::WebSocket(inner) => {
inner
.subscribe(subscribe_method, params, unsubscribe_method)
.await
}
RpcClient::Http(_) => {
Err(RpcError::Custom(
"Subscriptions not supported on HTTP transport".to_owned(),
))
}
}
}
}
impl From<Client> for RpcClient {
fn from(client: Client) -> Self {
RpcClient::WebSocket(Arc::new(client))
}
}
impl From<Arc<Client>> for RpcClient {
fn from(client: Arc<Client>) -> Self {
RpcClient::WebSocket(client)
}
}
impl From<HttpClient> for RpcClient {
fn from(client: HttpClient) -> Self {
RpcClient::Http(Arc::new(client))
}
}
impl From<Arc<HttpClient>> for RpcClient {
fn from(client: Arc<HttpClient>) -> Self {
RpcClient::Http(client)
}
}
/// ReadProof struct returned by the RPC /// ReadProof struct returned by the RPC
/// ///
/// # Note /// # Note
@@ -304,7 +215,7 @@ pub struct ReadProof<Hash> {
/// Client for substrate rpc interfaces /// Client for substrate rpc interfaces
pub struct Rpc<T: Config> { pub struct Rpc<T: Config> {
/// Rpc client for sending requests. /// Rpc client for sending requests.
pub client: RpcClient, pub client: Arc<RpcClient>,
marker: PhantomData<T>, marker: PhantomData<T>,
} }
@@ -321,7 +232,7 @@ impl<T: Config> Rpc<T> {
/// Create a new [`Rpc`] /// Create a new [`Rpc`]
pub fn new(client: RpcClient) -> Self { pub fn new(client: RpcClient) -> Self {
Self { Self {
client, client: Arc::new(client),
marker: PhantomData, marker: PhantomData,
} }
} }
@@ -332,7 +243,7 @@ impl<T: Config> Rpc<T> {
key: &StorageKey, key: &StorageKey,
hash: Option<T::Hash>, hash: Option<T::Hash>,
) -> Result<Option<StorageData>, BasicError> { ) -> Result<Option<StorageData>, BasicError> {
let params = &[to_json_value(key)?, to_json_value(hash)?]; let params = rpc_params![key, hash];
let data = self.client.request("state_getStorage", params).await?; let data = self.client.request("state_getStorage", params).await?;
Ok(data) Ok(data)
} }
@@ -348,12 +259,7 @@ impl<T: Config> Rpc<T> {
hash: Option<T::Hash>, hash: Option<T::Hash>,
) -> Result<Vec<StorageKey>, BasicError> { ) -> Result<Vec<StorageKey>, BasicError> {
let prefix = prefix.map(|p| p.to_storage_key()); let prefix = prefix.map(|p| p.to_storage_key());
let params = &[ let params = rpc_params![prefix, count, start_key, hash];
to_json_value(prefix)?,
to_json_value(count)?,
to_json_value(start_key)?,
to_json_value(hash)?,
];
let data = self.client.request("state_getKeysPaged", params).await?; let data = self.client.request("state_getKeysPaged", params).await?;
Ok(data) Ok(data)
} }
@@ -365,11 +271,7 @@ impl<T: Config> Rpc<T> {
from: T::Hash, from: T::Hash,
to: Option<T::Hash>, to: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<T::Hash>>, BasicError> { ) -> Result<Vec<StorageChangeSet<T::Hash>>, BasicError> {
let params = &[ let params = rpc_params![keys, from, to];
to_json_value(keys)?,
to_json_value(from)?,
to_json_value(to)?,
];
self.client self.client
.request("state_queryStorage", params) .request("state_queryStorage", params)
.await .await
@@ -382,7 +284,7 @@ impl<T: Config> Rpc<T> {
keys: &[StorageKey], keys: &[StorageKey],
at: Option<T::Hash>, at: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<T::Hash>>, BasicError> { ) -> Result<Vec<StorageChangeSet<T::Hash>>, BasicError> {
let params = &[to_json_value(keys)?, to_json_value(at)?]; let params = rpc_params![keys, at];
self.client self.client
.request("state_queryStorageAt", params) .request("state_queryStorageAt", params)
.await .await
@@ -392,7 +294,7 @@ impl<T: Config> Rpc<T> {
/// Fetch the genesis hash /// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, BasicError> { pub async fn genesis_hash(&self) -> Result<T::Hash, BasicError> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0))); let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));
let params = &[to_json_value(block_zero)?]; let params = rpc_params![block_zero];
let list_or_value: ListOrValue<Option<T::Hash>> = let list_or_value: ListOrValue<Option<T::Hash>> =
self.client.request("chain_getBlockHash", params).await?; self.client.request("chain_getBlockHash", params).await?;
match list_or_value { match list_or_value {
@@ -405,7 +307,10 @@ impl<T: Config> Rpc<T> {
/// Fetch the metadata /// Fetch the metadata
pub async fn metadata(&self) -> Result<Metadata, BasicError> { pub async fn metadata(&self) -> Result<Metadata, BasicError> {
let bytes: Bytes = self.client.request("state_getMetadata", &[]).await?; let bytes: Bytes = self
.client
.request("state_getMetadata", rpc_params![])
.await?;
let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?; let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?;
let metadata: Metadata = meta.try_into()?; let metadata: Metadata = meta.try_into()?;
Ok(metadata) Ok(metadata)
@@ -413,22 +318,25 @@ impl<T: Config> Rpc<T> {
/// Fetch system properties /// Fetch system properties
pub async fn system_properties(&self) -> Result<SystemProperties, BasicError> { pub async fn system_properties(&self) -> Result<SystemProperties, BasicError> {
Ok(self.client.request("system_properties", &[]).await?) Ok(self
.client
.request("system_properties", rpc_params![])
.await?)
} }
/// Fetch system chain /// Fetch system chain
pub async fn system_chain(&self) -> Result<String, BasicError> { pub async fn system_chain(&self) -> Result<String, BasicError> {
Ok(self.client.request("system_chain", &[]).await?) Ok(self.client.request("system_chain", rpc_params![]).await?)
} }
/// Fetch system name /// Fetch system name
pub async fn system_name(&self) -> Result<String, BasicError> { pub async fn system_name(&self) -> Result<String, BasicError> {
Ok(self.client.request("system_name", &[]).await?) Ok(self.client.request("system_name", rpc_params![]).await?)
} }
/// Fetch system version /// Fetch system version
pub async fn system_version(&self) -> Result<String, BasicError> { pub async fn system_version(&self) -> Result<String, BasicError> {
Ok(self.client.request("system_version", &[]).await?) Ok(self.client.request("system_version", rpc_params![]).await?)
} }
/// Get a header /// Get a header
@@ -436,7 +344,7 @@ impl<T: Config> Rpc<T> {
&self, &self,
hash: Option<T::Hash>, hash: Option<T::Hash>,
) -> Result<Option<T::Header>, BasicError> { ) -> Result<Option<T::Header>, BasicError> {
let params = &[to_json_value(hash)?]; let params = rpc_params![hash];
let header = self.client.request("chain_getHeader", params).await?; let header = self.client.request("chain_getHeader", params).await?;
Ok(header) Ok(header)
} }
@@ -447,7 +355,7 @@ impl<T: Config> Rpc<T> {
block_number: Option<BlockNumber>, block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, BasicError> { ) -> Result<Option<T::Hash>, BasicError> {
let block_number = block_number.map(ListOrValue::Value); let block_number = block_number.map(ListOrValue::Value);
let params = &[to_json_value(block_number)?]; let params = rpc_params![block_number];
let list_or_value = self.client.request("chain_getBlockHash", params).await?; let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value { match list_or_value {
ListOrValue::Value(hash) => Ok(hash), ListOrValue::Value(hash) => Ok(hash),
@@ -457,7 +365,10 @@ impl<T: Config> Rpc<T> {
/// Get a block hash of the latest finalized block /// Get a block hash of the latest finalized block
pub async fn finalized_head(&self) -> Result<T::Hash, BasicError> { pub async fn finalized_head(&self) -> Result<T::Hash, BasicError> {
let hash = self.client.request("chain_getFinalizedHead", &[]).await?; let hash = self
.client
.request("chain_getFinalizedHead", rpc_params![])
.await?;
Ok(hash) Ok(hash)
} }
@@ -466,7 +377,7 @@ impl<T: Config> Rpc<T> {
&self, &self,
hash: Option<T::Hash>, hash: Option<T::Hash>,
) -> Result<Option<ChainBlock<T>>, BasicError> { ) -> Result<Option<ChainBlock<T>>, BasicError> {
let params = &[to_json_value(hash)?]; let params = rpc_params![hash];
let block = self.client.request("chain_getBlock", params).await?; let block = self.client.request("chain_getBlock", params).await?;
Ok(block) Ok(block)
} }
@@ -477,7 +388,7 @@ impl<T: Config> Rpc<T> {
keys: Vec<StorageKey>, keys: Vec<StorageKey>,
hash: Option<T::Hash>, hash: Option<T::Hash>,
) -> Result<ReadProof<T::Hash>, BasicError> { ) -> Result<ReadProof<T::Hash>, BasicError> {
let params = &[to_json_value(keys)?, to_json_value(hash)?]; let params = rpc_params![keys, hash];
let proof = self.client.request("state_getReadProof", params).await?; let proof = self.client.request("state_getReadProof", params).await?;
Ok(proof) Ok(proof)
} }
@@ -487,7 +398,7 @@ impl<T: Config> Rpc<T> {
&self, &self,
at: Option<T::Hash>, at: Option<T::Hash>,
) -> Result<RuntimeVersion, BasicError> { ) -> Result<RuntimeVersion, BasicError> {
let params = &[to_json_value(at)?]; let params = rpc_params![at];
let version = self let version = self
.client .client
.request("state_getRuntimeVersion", params) .request("state_getRuntimeVersion", params)
@@ -503,7 +414,7 @@ impl<T: Config> Rpc<T> {
&self, &self,
) -> Result<EventStorageSubscription<T>, BasicError> { ) -> Result<EventStorageSubscription<T>, BasicError> {
let keys = Some(vec![StorageKey::from(SystemEvents::new())]); let keys = Some(vec![StorageKey::from(SystemEvents::new())]);
let params = &[to_json_value(keys)?]; let params = rpc_params![keys];
let subscription = self let subscription = self
.client .client
@@ -528,7 +439,11 @@ impl<T: Config> Rpc<T> {
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, BasicError> { pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, BasicError> {
let subscription = self let subscription = self
.client .client
.subscribe("chain_subscribeNewHeads", &[], "chain_unsubscribeNewHeads") .subscribe(
"chain_subscribeNewHeads",
rpc_params![],
"chain_unsubscribeNewHeads",
)
.await?; .await?;
Ok(subscription) Ok(subscription)
@@ -542,7 +457,7 @@ impl<T: Config> Rpc<T> {
.client .client
.subscribe( .subscribe(
"chain_subscribeFinalizedHeads", "chain_subscribeFinalizedHeads",
&[], rpc_params![],
"chain_unsubscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads",
) )
.await?; .await?;
@@ -555,7 +470,7 @@ impl<T: Config> Rpc<T> {
extrinsic: X, extrinsic: X,
) -> Result<T::Hash, BasicError> { ) -> Result<T::Hash, BasicError> {
let bytes: Bytes = extrinsic.encode().into(); let bytes: Bytes = extrinsic.encode().into();
let params = &[to_json_value(bytes)?]; let params = rpc_params![bytes];
let xt_hash = self let xt_hash = self
.client .client
.request("author_submitExtrinsic", params) .request("author_submitExtrinsic", params)
@@ -570,7 +485,7 @@ impl<T: Config> Rpc<T> {
) -> Result<Subscription<SubstrateTransactionStatus<T::Hash, T::Hash>>, BasicError> ) -> Result<Subscription<SubstrateTransactionStatus<T::Hash, T::Hash>>, BasicError>
{ {
let bytes: Bytes = extrinsic.encode().into(); let bytes: Bytes = extrinsic.encode().into();
let params = &[to_json_value(bytes)?]; let params = rpc_params![bytes];
let subscription = self let subscription = self
.client .client
.subscribe( .subscribe(
@@ -589,18 +504,17 @@ impl<T: Config> Rpc<T> {
suri: String, suri: String,
public: Bytes, public: Bytes,
) -> Result<(), BasicError> { ) -> Result<(), BasicError> {
let params = &[ let params = rpc_params![key_type, suri, public];
to_json_value(key_type)?,
to_json_value(suri)?,
to_json_value(public)?,
];
self.client.request("author_insertKey", params).await?; self.client.request("author_insertKey", params).await?;
Ok(()) Ok(())
} }
/// Generate new session keys and returns the corresponding public keys. /// Generate new session keys and returns the corresponding public keys.
pub async fn rotate_keys(&self) -> Result<Bytes, BasicError> { pub async fn rotate_keys(&self) -> Result<Bytes, BasicError> {
Ok(self.client.request("author_rotateKeys", &[]).await?) Ok(self
.client
.request("author_rotateKeys", rpc_params![])
.await?)
} }
/// Checks if the keystore has private keys for the given session public keys. /// Checks if the keystore has private keys for the given session public keys.
@@ -612,7 +526,7 @@ impl<T: Config> Rpc<T> {
&self, &self,
session_keys: Bytes, session_keys: Bytes,
) -> Result<bool, BasicError> { ) -> Result<bool, BasicError> {
let params = &[to_json_value(session_keys)?]; let params = rpc_params![session_keys];
Ok(self.client.request("author_hasSessionKeys", params).await?) Ok(self.client.request("author_hasSessionKeys", params).await?)
} }
@@ -624,11 +538,29 @@ impl<T: Config> Rpc<T> {
public_key: Bytes, public_key: Bytes,
key_type: String, key_type: String,
) -> Result<bool, BasicError> { ) -> Result<bool, BasicError> {
let params = &[to_json_value(public_key)?, to_json_value(key_type)?]; let params = rpc_params![public_key, key_type];
Ok(self.client.request("author_hasKey", params).await?) Ok(self.client.request("author_hasKey", params).await?)
} }
} }
/// Build WS RPC client from URL
pub async fn ws_client(url: &str) -> Result<RpcClient, RpcError> {
let (sender, receiver) = ws_transport(url).await?;
Ok(RpcClientBuilder::default()
.max_notifs_per_subscription(4096)
.build(sender, receiver))
}
async fn ws_transport(url: &str) -> Result<(WsSender, WsReceiver), RpcError> {
let url: Uri = url
.parse()
.map_err(|e: InvalidUri| RpcError::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| RpcError::Transport(e.into()))
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
+14 -7
View File
@@ -31,6 +31,10 @@ use std::{
thread, thread,
time, time,
}; };
use subxt::rpc::{
self,
ClientT,
};
static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH"; static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH";
@@ -50,7 +54,7 @@ async fn run() {
let cmd = Command::new(&substrate_bin) let cmd = Command::new(&substrate_bin)
.arg("--dev") .arg("--dev")
.arg("--tmp") .arg("--tmp")
.arg(format!("--rpc-port={}", port)) .arg(format!("--ws-port={}", port))
.spawn(); .spawn();
let mut cmd = match cmd { let mut cmd = match cmd {
Ok(cmd) => KillOnDrop(cmd), Ok(cmd) => KillOnDrop(cmd),
@@ -63,16 +67,19 @@ async fn run() {
let metadata_bytes: sp_core::Bytes = { let metadata_bytes: sp_core::Bytes = {
const MAX_RETRIES: usize = 20; const MAX_RETRIES: usize = 20;
let mut retries = 0; let mut retries = 0;
loop { loop {
if retries >= MAX_RETRIES { if retries >= MAX_RETRIES {
panic!("Cannot connect to substrate node after {} retries", retries); panic!("Cannot connect to substrate node after {} retries", retries);
} }
let res =
subxt::RpcClient::try_from_url(&format!("http://localhost:{}", port)) // It might take a while for substrate node that spin up the RPC server.
.await // Thus, the connection might get rejected a few times.
.expect("should only error if malformed URL for an HTTP connection") let res = match rpc::ws_client(&format!("ws://localhost:{}", port)).await {
.request("state_getMetadata", &[]) Ok(c) => c.request("state_getMetadata", None).await,
.await; Err(e) => Err(e),
};
match res { match res {
Ok(res) => { Ok(res) => {
let _ = cmd.kill(); let _ = cmd.kill();