mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 11:41:04 +00:00
integration with jsonrpsee v2 (#214)
* hacky integration with jsonrpsee v2 * stray todos * fmt * add http support * make test build compile * Update src/rpc.rs * bring back set_client * use crates.io version jsonrpsee * WIP: workaround for embedded subxt client (#236) * workaround for embedded subxt client Signed-off-by: Gregory Hill <gregorydhill@outlook.com> * increase default channel size on subxt client Signed-off-by: Gregory Hill <gregorydhill@outlook.com> * remove client tests due to inference problem on From Signed-off-by: Gregory Hill <gregorydhill@outlook.com> * add comments for missing impls * more verbose errors * make subscription notifs buffer bigger * fmt Co-authored-by: Greg Hill <gregorydhill@outlook.com>
This commit is contained in:
+242
-166
@@ -20,12 +20,11 @@
|
||||
|
||||
use async_std::task;
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
compat::{
|
||||
Compat01As03,
|
||||
Sink01CompatExt,
|
||||
Stream01CompatExt,
|
||||
channel::{
|
||||
mpsc,
|
||||
oneshot,
|
||||
},
|
||||
compat::Stream01CompatExt,
|
||||
future::{
|
||||
select,
|
||||
FutureExt,
|
||||
@@ -34,12 +33,25 @@ use futures::{
|
||||
stream::StreamExt,
|
||||
};
|
||||
use futures01::sync::mpsc as mpsc01;
|
||||
use jsonrpsee::{
|
||||
common::{
|
||||
Request,
|
||||
Response,
|
||||
use jsonrpsee_types::{
|
||||
client::{
|
||||
FrontToBack,
|
||||
Subscription,
|
||||
},
|
||||
error::Error as JsonRpseeError,
|
||||
jsonrpc::{
|
||||
self,
|
||||
Call,
|
||||
DeserializeOwned,
|
||||
Id,
|
||||
MethodCall,
|
||||
Notification,
|
||||
Output,
|
||||
Request,
|
||||
SubscriptionId,
|
||||
SubscriptionNotif,
|
||||
Version,
|
||||
},
|
||||
transport::TransportClient,
|
||||
};
|
||||
use sc_network::config::TransportConfig;
|
||||
pub use sc_service::{
|
||||
@@ -63,12 +75,11 @@ use sc_service::{
|
||||
RpcSession,
|
||||
TaskManager,
|
||||
};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
};
|
||||
use std::marker::PhantomData;
|
||||
use thiserror::Error;
|
||||
|
||||
const DEFAULT_CHANNEL_SIZE: usize = 16;
|
||||
|
||||
/// Error thrown by the client.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SubxtClientError {
|
||||
@@ -81,28 +92,144 @@ pub enum SubxtClientError {
|
||||
}
|
||||
|
||||
/// Client for an embedded substrate node.
|
||||
#[derive(Clone)]
|
||||
pub struct SubxtClient {
|
||||
to_back: mpsc::Sender<String>,
|
||||
from_back: Compat01As03<mpsc01::Receiver<String>>,
|
||||
to_back: mpsc::Sender<FrontToBack>,
|
||||
}
|
||||
|
||||
impl SubxtClient {
|
||||
/// Create a new client.
|
||||
pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self {
|
||||
let (to_back, from_front) = mpsc::channel(4);
|
||||
let (to_front, from_back) = mpsc01::channel(4);
|
||||
let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
|
||||
|
||||
let session = RpcSession::new(to_front.clone());
|
||||
task::spawn(
|
||||
select(
|
||||
Box::pin(from_front.for_each(move |message: String| {
|
||||
Box::pin(from_front.for_each(move |message: FrontToBack| {
|
||||
let rpc = rpc.clone();
|
||||
let session = session.clone();
|
||||
let mut to_front = to_front.clone().sink_compat();
|
||||
let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE);
|
||||
let session = RpcSession::new(to_front.clone());
|
||||
async move {
|
||||
let response = rpc.rpc_query(&session, &message).await;
|
||||
if let Some(response) = response {
|
||||
to_front.send(response).await.ok();
|
||||
match message {
|
||||
FrontToBack::Notification { method, params } => {
|
||||
let request =
|
||||
Request::Single(Call::Notification(Notification {
|
||||
jsonrpc: Version::V2,
|
||||
method,
|
||||
params,
|
||||
}));
|
||||
if let Ok(message) = serde_json::to_string(&request) {
|
||||
rpc.rpc_query(&session, &message).await;
|
||||
}
|
||||
}
|
||||
|
||||
FrontToBack::StartRequest {
|
||||
method,
|
||||
params,
|
||||
send_back,
|
||||
} => {
|
||||
let request =
|
||||
Request::Single(Call::MethodCall(MethodCall {
|
||||
jsonrpc: Version::V2,
|
||||
method: method.into(),
|
||||
params: params.into(),
|
||||
id: Id::Num(0),
|
||||
}));
|
||||
if let Ok(message) = serde_json::to_string(&request) {
|
||||
if let Some(response) =
|
||||
rpc.rpc_query(&session, &message).await
|
||||
{
|
||||
let result = match serde_json::from_str::<Output>(
|
||||
&response,
|
||||
)
|
||||
.expect("failed to decode request response")
|
||||
{
|
||||
Output::Success(success) => {
|
||||
Ok(success.result)
|
||||
}
|
||||
Output::Failure(failure) => {
|
||||
Err(JsonRpseeError::Request(
|
||||
failure.error,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
send_back
|
||||
.send(result)
|
||||
.expect("failed to send request response");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FrontToBack::Subscribe {
|
||||
subscribe_method,
|
||||
params,
|
||||
unsubscribe_method: _,
|
||||
send_back,
|
||||
} => {
|
||||
let request =
|
||||
Request::Single(Call::MethodCall(MethodCall {
|
||||
jsonrpc: Version::V2,
|
||||
method: subscribe_method,
|
||||
params,
|
||||
id: Id::Num(0),
|
||||
}));
|
||||
|
||||
let (mut send_front_sub, send_back_sub) =
|
||||
mpsc::channel(DEFAULT_CHANNEL_SIZE);
|
||||
if let Ok(message) = serde_json::to_string(&request) {
|
||||
if let Some(response) =
|
||||
rpc.rpc_query(&session, &message).await
|
||||
{
|
||||
let result = match serde_json::from_str::<Output>(
|
||||
&response,
|
||||
)
|
||||
.expect("failed to decode subscription response")
|
||||
{
|
||||
Output::Success(_) => {
|
||||
Ok((
|
||||
send_back_sub,
|
||||
// NOTE: The ID is used to unsubscribe to specific subscription
|
||||
// which the `SubxtClient` doesn't support so hardcoding it to `0`
|
||||
// is fine.
|
||||
SubscriptionId::Num(0),
|
||||
))
|
||||
}
|
||||
Output::Failure(failure) => {
|
||||
Err(JsonRpseeError::Request(
|
||||
failure.error,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
send_back.send(result).expect(
|
||||
"failed to send subscription response",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
task::spawn(async move {
|
||||
let mut from_back = from_back.compat();
|
||||
let _session = session.clone();
|
||||
|
||||
while let Some(Ok(response)) = from_back.next().await
|
||||
{
|
||||
let notif = serde_json::from_str::<
|
||||
SubscriptionNotif,
|
||||
>(
|
||||
&response
|
||||
)
|
||||
.expect("failed to decode subscription notif");
|
||||
send_front_sub
|
||||
.send(notif.params.result)
|
||||
.await
|
||||
.expect("failed to send subscription notif")
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
FrontToBack::SubscriptionClosed(_) => {
|
||||
// NOTE: unsubscriptions are not supported by SubxtClient.
|
||||
}
|
||||
}
|
||||
}
|
||||
})),
|
||||
@@ -113,10 +240,7 @@ impl SubxtClient {
|
||||
.map(drop),
|
||||
);
|
||||
|
||||
Self {
|
||||
to_back,
|
||||
from_back: from_back.compat(),
|
||||
}
|
||||
Self { to_back }
|
||||
}
|
||||
|
||||
/// Creates a new client from a config.
|
||||
@@ -128,41 +252,98 @@ impl SubxtClient {
|
||||
let (task_manager, rpc_handlers) = (builder)(config)?;
|
||||
Ok(Self::new(task_manager, rpc_handlers))
|
||||
}
|
||||
}
|
||||
|
||||
impl TransportClient for SubxtClient {
|
||||
type Error = SubxtClientError;
|
||||
|
||||
fn send_request<'a>(
|
||||
&'a mut self,
|
||||
request: Request,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let request = serde_json::to_string(&request)?;
|
||||
self.to_back.send(request).await?;
|
||||
Ok(())
|
||||
})
|
||||
/// Send a JSONRPC notification.
|
||||
pub async fn notification<M, P>(
|
||||
&self,
|
||||
method: M,
|
||||
params: P,
|
||||
) -> Result<(), JsonRpseeError>
|
||||
where
|
||||
M: Into<String> + Send,
|
||||
P: Into<jsonrpc::Params> + Send,
|
||||
{
|
||||
self.to_back
|
||||
.clone()
|
||||
.send(FrontToBack::Notification {
|
||||
method: method.into(),
|
||||
params: params.into(),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))
|
||||
}
|
||||
|
||||
fn next_response<'a>(
|
||||
&'a mut self,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Response, Self::Error>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let response = self
|
||||
.from_back
|
||||
.next()
|
||||
.await
|
||||
.expect("channel shouldn't close")
|
||||
.unwrap();
|
||||
Ok(serde_json::from_str(&response)?)
|
||||
})
|
||||
}
|
||||
}
|
||||
/// Send a JSONRPC request.
|
||||
pub async fn request<T, M, P>(
|
||||
&self,
|
||||
method: M,
|
||||
params: P,
|
||||
) -> Result<T, JsonRpseeError>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
M: Into<String> + Send,
|
||||
P: Into<jsonrpc::Params> + Send,
|
||||
{
|
||||
let (send_back_tx, send_back_rx) = oneshot::channel();
|
||||
|
||||
impl From<SubxtClient> for jsonrpsee::Client {
|
||||
fn from(client: SubxtClient) -> Self {
|
||||
let client = jsonrpsee::raw::RawClient::new(client);
|
||||
jsonrpsee::Client::new(client)
|
||||
self.to_back
|
||||
.clone()
|
||||
.send(FrontToBack::StartRequest {
|
||||
method: method.into(),
|
||||
params: params.into(),
|
||||
send_back: send_back_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?;
|
||||
|
||||
let json_value = match send_back_rx.await {
|
||||
Ok(Ok(v)) => v,
|
||||
Ok(Err(err)) => return Err(err),
|
||||
Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))),
|
||||
};
|
||||
jsonrpc::from_value(json_value).map_err(JsonRpseeError::ParseError)
|
||||
}
|
||||
|
||||
/// Send a subscription request to the server.
|
||||
pub async fn subscribe<SM, UM, P, N>(
|
||||
&self,
|
||||
subscribe_method: SM,
|
||||
params: P,
|
||||
unsubscribe_method: UM,
|
||||
) -> Result<Subscription<N>, JsonRpseeError>
|
||||
where
|
||||
SM: Into<String> + Send,
|
||||
UM: Into<String> + Send,
|
||||
P: Into<jsonrpc::Params> + Send,
|
||||
N: DeserializeOwned,
|
||||
{
|
||||
let subscribe_method = subscribe_method.into();
|
||||
let unsubscribe_method = unsubscribe_method.into();
|
||||
let params = params.into();
|
||||
|
||||
let (send_back_tx, send_back_rx) = oneshot::channel();
|
||||
self.to_back
|
||||
.clone()
|
||||
.send(FrontToBack::Subscribe {
|
||||
subscribe_method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
send_back: send_back_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(JsonRpseeError::Internal)?;
|
||||
|
||||
let (notifs_rx, id) = match send_back_rx.await {
|
||||
Ok(Ok(val)) => val,
|
||||
Ok(Err(err)) => return Err(err),
|
||||
Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))),
|
||||
};
|
||||
Ok(Subscription {
|
||||
to_back: self.to_back.clone(),
|
||||
notifs_rx,
|
||||
marker: PhantomData,
|
||||
id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,7 +352,7 @@ impl From<SubxtClient> for jsonrpsee::Client {
|
||||
pub enum Role {
|
||||
/// Light client.
|
||||
Light,
|
||||
/// A full node (maninly used for testing purposes).
|
||||
/// A full node (mainly used for testing purposes).
|
||||
Authority(sp_keyring::AccountKeyring),
|
||||
}
|
||||
|
||||
@@ -311,108 +492,3 @@ impl<C: ChainSpec + 'static> SubxtClientConfig<C> {
|
||||
service_config
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_std::path::Path;
|
||||
use sp_keyring::AccountKeyring;
|
||||
use substrate_subxt::{
|
||||
balances::TransferCallExt,
|
||||
ClientBuilder,
|
||||
NodeTemplateRuntime,
|
||||
PairSigner,
|
||||
};
|
||||
use tempdir::TempDir;
|
||||
|
||||
#[async_std::test]
|
||||
#[ignore]
|
||||
async fn test_client() {
|
||||
env_logger::try_init().ok();
|
||||
let client = ClientBuilder::<NodeTemplateRuntime>::new()
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
let signer = PairSigner::new(AccountKeyring::Alice.pair());
|
||||
let to = AccountKeyring::Bob.to_account_id().into();
|
||||
client
|
||||
.transfer_and_watch(&signer, &to, 10_000)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
#[ignore]
|
||||
async fn test_light_client() {
|
||||
env_logger::try_init().ok();
|
||||
let chain_spec_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("dev-chain.json");
|
||||
let bytes = async_std::fs::read(chain_spec_path).await.unwrap();
|
||||
let chain_spec =
|
||||
test_node::chain_spec::ChainSpec::from_json_bytes(bytes).unwrap();
|
||||
let tmp = TempDir::new("subxt-").expect("failed to create tempdir");
|
||||
let config = SubxtClientConfig {
|
||||
// base_path:
|
||||
impl_name: "substrate-subxt-light-client",
|
||||
impl_version: "0.0.1",
|
||||
author: "David Craven",
|
||||
copyright_start_year: 2020,
|
||||
db: DatabaseConfig::RocksDb {
|
||||
path: tmp.path().into(),
|
||||
cache_size: 64,
|
||||
},
|
||||
keystore: KeystoreConfig::InMemory,
|
||||
chain_spec,
|
||||
role: Role::Light,
|
||||
telemetry: None,
|
||||
wasm_method: Default::default(),
|
||||
};
|
||||
let client = ClientBuilder::<NodeTemplateRuntime>::new()
|
||||
.set_client(
|
||||
SubxtClient::from_config(config, test_node::service::new_light).unwrap(),
|
||||
)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
let signer = PairSigner::new(AccountKeyring::Alice.pair());
|
||||
let to = AccountKeyring::Bob.to_account_id().into();
|
||||
client
|
||||
.transfer_and_watch(&signer, &to, 10_000)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn test_full_client() {
|
||||
env_logger::try_init().ok();
|
||||
let tmp = TempDir::new("subxt-").expect("failed to create tempdir");
|
||||
let config = SubxtClientConfig {
|
||||
impl_name: "substrate-subxt-full-client",
|
||||
impl_version: "0.0.1",
|
||||
author: "David Craven",
|
||||
copyright_start_year: 2020,
|
||||
db: DatabaseConfig::RocksDb {
|
||||
path: tmp.path().into(),
|
||||
cache_size: 128,
|
||||
},
|
||||
keystore: KeystoreConfig::InMemory,
|
||||
chain_spec: test_node::chain_spec::development_config().unwrap(),
|
||||
role: Role::Authority(AccountKeyring::Alice),
|
||||
telemetry: None,
|
||||
wasm_method: Default::default(),
|
||||
};
|
||||
let client = ClientBuilder::<NodeTemplateRuntime>::new()
|
||||
.set_client(
|
||||
SubxtClient::from_config(config, test_node::service::new_full).unwrap(),
|
||||
)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
let signer = PairSigner::new(AccountKeyring::Alice.pair());
|
||||
let to = AccountKeyring::Bob.to_account_id().into();
|
||||
client
|
||||
.transfer_and_watch(&signer, &to, 10_000)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user