PoC async rpc client

This commit is contained in:
Niklas Adolfsson
2021-11-30 17:04:29 +01:00
parent 4b9ee133ac
commit c8b090751b
4 changed files with 81 additions and 146 deletions
+1 -1
View File
@@ -24,7 +24,7 @@ chameleon = "0.1.0"
scale-info = { version = "1.0.0", features = ["bit-vec"] }
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.5.1", features = ["macros", "ws-client", "http-client"] }
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", branch = "extract-async-client", features = ["macros", "client"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
+1 -1
View File
@@ -91,7 +91,7 @@ impl ClientBuilder {
client
} else {
let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944");
RpcClient::try_from_url(url).await?
crate::rpc::build_ws_client(url).await?
};
let mut rpc = Rpc::new(client);
if self.accept_weak_inclusion {
+75 -141
View File
@@ -33,11 +33,18 @@ use core::{
marker::PhantomData,
};
use frame_metadata::RuntimeMetadataPrefixed;
use jsonrpsee::{
http_client::{
HttpClient,
HttpClientBuilder,
pub use jsonrpsee::{
client_transport::ws::{
Uri,
WsTransportClientBuilder,
Sender as WsSender,
Receiver as WsReceiver,
},
core_client::{
Client as RpcClient,
ClientBuilder as RpcClientBuilder,
},
rpc_params,
types::{
to_json_value,
traits::{
@@ -49,10 +56,6 @@ use jsonrpsee::{
JsonValue,
Subscription,
},
ws_client::{
WsClient,
WsClientBuilder,
},
};
use serde::{
Deserialize,
@@ -186,102 +189,6 @@ pub enum TransactionStatus<Hash, BlockHash> {
Invalid,
}
/// Rpc client wrapper.
/// This is workaround because adding generic types causes the macros to fail.
#[derive(Clone)]
pub enum RpcClient {
/// JSONRPC client WebSocket transport.
WebSocket(Arc<WsClient>),
/// JSONRPC client HTTP transport.
// NOTE: Arc because `HttpClient` is not clone.
Http(Arc<HttpClient>),
}
impl RpcClient {
/// Create a new [`RpcClient`] from the given URL.
///
/// Infers the protocol from the URL, supports:
/// - Websockets (`ws://`, `wss://`)
/// - Http (`http://`, `https://`)
pub async fn try_from_url(url: &str) -> Result<Self, Error> {
if url.starts_with("ws://") || url.starts_with("wss://") {
let client = WsClientBuilder::default()
.max_notifs_per_subscription(4096)
.build(url)
.await?;
Ok(RpcClient::WebSocket(Arc::new(client)))
} else {
let client = HttpClientBuilder::default().build(&url)?;
Ok(RpcClient::Http(Arc::new(client)))
}
}
/// Start a JSON-RPC request.
pub async fn request<'a, T: DeserializeOwned + std::fmt::Debug>(
&self,
method: &str,
params: &[JsonValue],
) -> Result<T, Error> {
let params = Some(params.into());
log::debug!("request {}: {:?}", method, params);
let data = match self {
Self::WebSocket(inner) => {
inner.request(method, params).await.map_err(Into::into)
}
Self::Http(inner) => inner.request(method, params).await.map_err(Into::into),
};
data
}
/// Start a JSON-RPC Subscription.
pub async fn subscribe<'a, T: DeserializeOwned>(
&self,
subscribe_method: &str,
params: &[JsonValue],
unsubscribe_method: &str,
) -> Result<Subscription<T>, Error> {
let params = Some(params.into());
match self {
Self::WebSocket(inner) => {
inner
.subscribe(subscribe_method, params, unsubscribe_method)
.await
.map_err(Into::into)
}
Self::Http(_) => {
Err(RpcError::Custom(
"Subscriptions not supported on HTTP transport".to_owned(),
)
.into())
}
}
}
}
impl From<WsClient> for RpcClient {
fn from(client: WsClient) -> Self {
RpcClient::WebSocket(Arc::new(client))
}
}
impl From<Arc<WsClient>> for RpcClient {
fn from(client: Arc<WsClient>) -> Self {
RpcClient::WebSocket(client)
}
}
impl From<HttpClient> for RpcClient {
fn from(client: HttpClient) -> Self {
RpcClient::Http(Arc::new(client))
}
}
impl From<Arc<HttpClient>> for RpcClient {
fn from(client: Arc<HttpClient>) -> Self {
RpcClient::Http(client)
}
}
/// ReadProof struct returned by the RPC
///
/// # Note
@@ -300,7 +207,7 @@ pub struct ReadProof<Hash> {
/// Client for substrate rpc interfaces
pub struct Rpc<T: Config> {
/// Rpc client for sending requests.
pub client: RpcClient,
pub client: Arc<RpcClient>,
marker: PhantomData<T>,
accept_weak_inclusion: bool,
}
@@ -319,7 +226,7 @@ impl<T: Config> Rpc<T> {
/// Create a new [`Rpc`]
pub fn new(client: RpcClient) -> Self {
Self {
client,
client: Arc::new(client),
marker: PhantomData,
accept_weak_inclusion: false,
}
@@ -337,7 +244,7 @@ impl<T: Config> Rpc<T> {
key: &StorageKey,
hash: Option<T::Hash>,
) -> Result<Option<StorageData>, Error> {
let params = &[to_json_value(key)?, to_json_value(hash)?];
let params = rpc_params![key, hash];
let data = self.client.request("state_getStorage", params).await?;
Ok(data)
}
@@ -353,12 +260,7 @@ impl<T: Config> Rpc<T> {
hash: Option<T::Hash>,
) -> Result<Vec<StorageKey>, Error> {
let prefix = prefix.map(|p| p.to_storage_key());
let params = &[
to_json_value(prefix)?,
to_json_value(count)?,
to_json_value(start_key)?,
to_json_value(hash)?,
];
let params = rpc_params![prefix, count, start_key, hash];
let data = self.client.request("state_getKeysPaged", params).await?;
Ok(data)
}
@@ -370,11 +272,7 @@ impl<T: Config> Rpc<T> {
from: T::Hash,
to: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<T::Hash>>, Error> {
let params = &[
to_json_value(keys)?,
to_json_value(from)?,
to_json_value(to)?,
];
let params = rpc_params![keys, from, to];
self.client
.request("state_queryStorage", params)
.await
@@ -387,7 +285,7 @@ impl<T: Config> Rpc<T> {
keys: &[StorageKey],
at: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<T::Hash>>, Error> {
let params = &[to_json_value(keys)?, to_json_value(at)?];
let params = rpc_params![keys, at];
self.client
.request("state_queryStorageAt", params)
.await
@@ -397,7 +295,7 @@ impl<T: Config> Rpc<T> {
/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, Error> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));
let params = &[to_json_value(block_zero)?];
let params = rpc_params![block_zero];
let list_or_value: ListOrValue<Option<T::Hash>> =
self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
@@ -410,7 +308,10 @@ impl<T: Config> Rpc<T> {
/// Fetch the metadata
pub async fn metadata(&self) -> Result<Metadata, Error> {
let bytes: Bytes = self.client.request("state_getMetadata", &[]).await?;
let bytes: Bytes = self
.client
.request("state_getMetadata", rpc_params![])
.await?;
let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?;
let metadata: Metadata = meta.try_into()?;
Ok(metadata)
@@ -418,7 +319,10 @@ impl<T: Config> Rpc<T> {
/// Fetch system properties
pub async fn system_properties(&self) -> Result<SystemProperties, Error> {
Ok(self.client.request("system_properties", &[]).await?)
Ok(self
.client
.request("system_properties", rpc_params![])
.await?)
}
/// Get a header
@@ -426,7 +330,7 @@ impl<T: Config> Rpc<T> {
&self,
hash: Option<T::Hash>,
) -> Result<Option<T::Header>, Error> {
let params = &[to_json_value(hash)?];
let params = rpc_params![hash];
let header = self.client.request("chain_getHeader", params).await?;
Ok(header)
}
@@ -437,7 +341,7 @@ impl<T: Config> Rpc<T> {
block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, Error> {
let block_number = block_number.map(ListOrValue::Value);
let params = &[to_json_value(block_number)?];
let params = rpc_params![block_number];
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(hash) => Ok(hash),
@@ -447,7 +351,10 @@ impl<T: Config> Rpc<T> {
/// Get a block hash of the latest finalized block
pub async fn finalized_head(&self) -> Result<T::Hash, Error> {
let hash = self.client.request("chain_getFinalizedHead", &[]).await?;
let hash = self
.client
.request("chain_getFinalizedHead", rpc_params![])
.await?;
Ok(hash)
}
@@ -456,7 +363,7 @@ impl<T: Config> Rpc<T> {
&self,
hash: Option<T::Hash>,
) -> Result<Option<ChainBlock<T>>, Error> {
let params = &[to_json_value(hash)?];
let params = rpc_params![hash];
let block = self.client.request("chain_getBlock", params).await?;
Ok(block)
}
@@ -467,7 +374,7 @@ impl<T: Config> Rpc<T> {
keys: Vec<StorageKey>,
hash: Option<T::Hash>,
) -> Result<ReadProof<T::Hash>, Error> {
let params = &[to_json_value(keys)?, to_json_value(hash)?];
let params = rpc_params![keys, hash];
let proof = self.client.request("state_getReadProof", params).await?;
Ok(proof)
}
@@ -477,7 +384,7 @@ impl<T: Config> Rpc<T> {
&self,
at: Option<T::Hash>,
) -> Result<RuntimeVersion, Error> {
let params = &[to_json_value(at)?];
let params = rpc_params![at];
let version = self
.client
.request("state_getRuntimeVersion", params)
@@ -491,7 +398,7 @@ impl<T: Config> Rpc<T> {
/// `subscribe_finalized_events` to ensure events are finalized.
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
let keys = Some(vec![StorageKey::from(SystemEvents::new())]);
let params = &[to_json_value(keys)?];
let params = rpc_params![keys];
let subscription = self
.client
@@ -516,7 +423,11 @@ impl<T: Config> Rpc<T> {
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
let subscription = self
.client
.subscribe("chain_subscribeNewHeads", &[], "chain_unsubscribeNewHeads")
.subscribe(
"chain_subscribeNewHeads",
rpc_params![],
"chain_unsubscribeNewHeads",
)
.await?;
Ok(subscription)
@@ -530,7 +441,7 @@ impl<T: Config> Rpc<T> {
.client
.subscribe(
"chain_subscribeFinalizedHeads",
&[],
rpc_params![],
"chain_unsubscribeFinalizedHeads",
)
.await?;
@@ -543,7 +454,7 @@ impl<T: Config> Rpc<T> {
extrinsic: E,
) -> Result<T::Hash, Error> {
let bytes: Bytes = extrinsic.encode().into();
let params = &[to_json_value(bytes)?];
let params = rpc_params![bytes];
let xt_hash = self
.client
.request("author_submitExtrinsic", params)
@@ -557,7 +468,7 @@ impl<T: Config> Rpc<T> {
extrinsic: E,
) -> Result<Subscription<TransactionStatus<T::Hash, T::Hash>>, Error> {
let bytes: Bytes = extrinsic.encode().into();
let params = &[to_json_value(bytes)?];
let params = rpc_params![bytes];
let subscription = self
.client
.subscribe(
@@ -669,18 +580,17 @@ impl<T: Config> Rpc<T> {
suri: String,
public: Bytes,
) -> Result<(), Error> {
let params = &[
to_json_value(key_type)?,
to_json_value(suri)?,
to_json_value(public)?,
];
let params = rpc_params![key_type, suri, public];
self.client.request("author_insertKey", params).await?;
Ok(())
}
/// Generate new session keys and returns the corresponding public keys.
pub async fn rotate_keys(&self) -> Result<Bytes, Error> {
Ok(self.client.request("author_rotateKeys", &[]).await?)
Ok(self
.client
.request("author_rotateKeys", rpc_params![])
.await?)
}
/// Checks if the keystore has private keys for the given session public keys.
@@ -689,7 +599,7 @@ impl<T: Config> Rpc<T> {
///
/// Returns `true` iff all private keys could be found.
pub async fn has_session_keys(&self, session_keys: Bytes) -> Result<bool, Error> {
let params = &[to_json_value(session_keys)?];
let params = rpc_params![session_keys];
Ok(self.client.request("author_hasSessionKeys", params).await?)
}
@@ -701,7 +611,7 @@ impl<T: Config> Rpc<T> {
public_key: Bytes,
key_type: String,
) -> Result<bool, Error> {
let params = &[to_json_value(public_key)?, to_json_value(key_type)?];
let params = rpc_params![public_key, key_type];
Ok(self.client.request("author_hasKey", params).await?)
}
}
@@ -737,3 +647,27 @@ impl<T: Config> ExtrinsicSuccess<T> {
}
}
}
/// Example to check that `From<(Sender, Receiver) for RpcClient` works.
pub async fn build_ws_client_default(url: &str) -> Result<RpcClient, RpcError> {
let client = ws_transport(url).await.into();
Ok(client)
}
/// Build WS RPC client from URL
pub async fn build_ws_client(url: &str) -> Result<RpcClient, RpcError> {
let (sender, receiver) = ws_transport(url).await;
Ok(RpcClientBuilder::default()
.max_notifs_per_subscription(4096)
.build(sender, receiver))
}
async fn ws_transport(url: &str) -> (WsSender, WsReceiver) {
// TODO: fix unwraps because I'm lazy.
let url: Uri = url.parse().unwrap();
WsTransportClientBuilder::default()
.build(url)
.await
.unwrap()
}
+4 -3
View File
@@ -27,6 +27,7 @@ use std::{
thread,
time,
};
use subxt::rpc::{Client as _, rpc_params, build_ws_client};
static SUBSTRATE_BIN_ENV_VAR: &str = "SUBSTRATE_NODE_PATH";
@@ -42,7 +43,7 @@ async fn main() {
let cmd = Command::new(&substrate_bin)
.arg("--dev")
.arg("--tmp")
.arg(format!("--rpc-port={}", port))
.arg(format!("--ws-port={}", port))
.spawn();
let mut cmd = match cmd {
Ok(cmd) => cmd,
@@ -61,10 +62,10 @@ async fn main() {
panic!("Cannot connect to substrate node after {} retries", retries);
}
let res =
subxt::RpcClient::try_from_url(&format!("http://localhost:{}", port))
build_ws_client(&format!("ws://localhost:{}", port))
.await
.expect("should only error if malformed URL for an HTTP connection")
.request("state_getMetadata", &[])
.request("state_getMetadata", rpc_params![])
.await;
match res {
Ok(res) => {