update jsonrpsee to 0.2.0-alpha.6 (#266)

* update jsonrpsee to 0.2.0-alpha.5

* downgrade subxt client

* cleanup

* make subxt-client compile again

* update jsonrpsee v0.2.0-alpha.6

* fix build again

* remove needless type hints

* cargo fmt

* address grumbles

* remove remaining type hints

* cargo fmt
This commit is contained in:
Niklas Adolfsson
2021-05-20 21:48:56 +02:00
committed by GitHub
parent 8bff0bebd4
commit 94db874942
7 changed files with 223 additions and 255 deletions
+164 -174
View File
@@ -40,27 +40,35 @@ use futures::{
};
use futures01::sync::mpsc as mpsc01;
use jsonrpsee_types::{
client::{
FrontToBack,
NotificationMessage,
RequestMessage,
Subscription,
SubscriptionMessage,
},
error::Error as JsonRpseeError,
jsonrpc::{
self,
Call,
DeserializeOwned,
Id,
MethodCall,
Notification,
Output,
Request,
SubscriptionId,
SubscriptionNotif,
Version,
v2::{
error::{
JsonRpcErrorAlloc,
JsonRpcErrorCode,
},
params::{
Id,
JsonRpcParams,
SubscriptionId,
TwoPointZero,
},
parse_request_id,
request::{
JsonRpcCallSer,
JsonRpcInvalidRequest,
JsonRpcNotificationSer,
},
response::{
JsonRpcNotifResponse,
JsonRpcResponse,
},
},
DeserializeOwned,
Error as JsonRpseeError,
FrontToBack,
JsonValue,
RequestMessage,
Subscription,
SubscriptionMessage,
};
use sc_network::config::TransportConfig;
pub use sc_service::{
@@ -87,6 +95,10 @@ use sc_service::{
use std::{
collections::HashMap,
marker::PhantomData,
sync::atomic::{
AtomicU64,
Ordering,
},
};
use thiserror::Error;
@@ -107,15 +119,15 @@ pub enum SubxtClientError {
#[derive(Clone)]
pub struct SubxtClient {
to_back: mpsc::Sender<FrontToBack>,
next_id: Arc<AtomicU64>,
}
impl SubxtClient {
/// Create a new client.
pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self {
let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let request_id = Arc::new(RwLock::new(u64::MIN));
let subscriptions = Arc::new(RwLock::new(HashMap::<u64, String>::new()));
let subscriptions =
Arc::new(RwLock::new(HashMap::<SubscriptionId, (String, Id)>::new()));
task::spawn(
select(
@@ -124,118 +136,72 @@ impl SubxtClient {
let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE);
let session = RpcSession::new(to_front.clone());
let request_id = request_id.clone();
let subscriptions = subscriptions.clone();
async move {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};
match message {
FrontToBack::Notification(NotificationMessage {
method,
params,
}) => {
let request =
Request::Single(Call::Notification(Notification {
jsonrpc: Version::V2,
method,
params,
}));
if let Ok(message) = serde_json::to_string(&request) {
rpc.rpc_query(&session, &message).await;
}
FrontToBack::Notification(raw) => {
let _ = rpc.rpc_query(&session, &raw).await;
}
FrontToBack::StartRequest(RequestMessage {
method,
params,
FrontToBack::Request(RequestMessage {
raw,
id,
send_back,
}) => {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: method.into(),
params: params.into(),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
if let Some(response) =
rpc.rpc_query(&session, &message).await
{
let result = match serde_json::from_str::<Output>(
&response,
)
.expect("failed to decode request response")
{
Output::Success(success) => {
Ok(success.result)
}
Output::Failure(failure) => {
Err(JsonRpseeError::Request(
failure.error,
))
}
};
let raw_response = rpc.rpc_query(&session, &raw).await;
let to_front = match read_jsonrpc_response(
raw_response,
Id::Number(id),
) {
Some(Err(e)) => Err(e),
Some(Ok(rp)) => Ok(rp),
None => return,
};
send_back.map(|tx| {
tx.send(result)
.expect("failed to send request response")
});
}
}
send_back
.expect("request should have send_back")
.send(to_front)
.expect("failed to send request response");
}
FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
params,
raw,
subscribe_id,
unsubscribe_id,
unsubscribe_method,
send_back,
}) => {
{
let mut subscriptions = subscriptions.write().await;
subscriptions.insert(request_id, unsubscribe_method);
}
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: subscribe_method,
params,
id: Id::Num(request_id),
}));
let raw_response = rpc.rpc_query(&session, &raw).await;
let sub_id: SubscriptionId = match read_jsonrpc_response(
raw_response,
Id::Number(subscribe_id),
) {
Some(Ok(rp)) => {
serde_json::from_value(rp)
.expect("infalliable; qed")
}
Some(Err(e)) => {
send_back
.send(Err(e))
.expect("failed to send request response");
return
}
None => return,
};
let (mut send_front_sub, send_back_sub) =
mpsc::channel(DEFAULT_CHANNEL_SIZE);
if let Ok(message) = serde_json::to_string(&request) {
if let Some(response) =
rpc.rpc_query(&session, &message).await
{
let result = match serde_json::from_str::<Output>(
&response,
)
.expect("failed to decode subscription response")
{
Output::Success(_) => {
Ok((
send_back_sub,
SubscriptionId::Num(request_id),
))
}
Output::Failure(failure) => {
Err(JsonRpseeError::Request(
failure.error,
))
}
};
send_back.send(result).expect(
"failed to send subscription response",
);
}
send_back
.send(Ok((send_back_sub, sub_id.clone())))
.expect("failed to send request response");
{
let mut subscriptions = subscriptions.write().await;
subscriptions.insert(
sub_id.clone(),
(unsubscribe_method, Id::Number(unsubscribe_id)),
);
}
task::spawn(async move {
@@ -245,7 +211,7 @@ impl SubxtClient {
while let Some(Ok(response)) = from_back.next().await
{
let notif = serde_json::from_str::<
SubscriptionNotif,
JsonRpcNotifResponse<JsonValue>,
>(
&response
)
@@ -258,31 +224,24 @@ impl SubxtClient {
});
}
FrontToBack::SubscriptionClosed(subscription_id) => {
let sub_id =
if let SubscriptionId::Num(num) = subscription_id {
num
} else {
unreachable!("subscription id should be num")
};
let json_sub_id = jsonrpc::to_value(sub_id).unwrap();
FrontToBack::SubscriptionClosed(sub_id) => {
let params: &[JsonValue] = &[sub_id.clone().into()];
let subscriptions = subscriptions.read().await;
if let Some(unsubscribe) = subscriptions.get(&sub_id) {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: unsubscribe.into(),
params: jsonrpc::Params::Array(vec![
json_sub_id,
]),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
rpc.rpc_query(&session, &message).await;
}
if let Some((unsub_method, unsub_id)) =
subscriptions.get(&sub_id)
{
let message =
serde_json::to_string(&JsonRpcCallSer::new(
unsub_id.clone(),
unsub_method,
params.into(),
))
.unwrap();
let _ = rpc.rpc_query(&session, &message).await;
}
}
FrontToBack::Batch(_) => (),
}
}
})),
@@ -293,7 +252,10 @@ impl SubxtClient {
.map(drop),
);
Self { to_back }
Self {
to_back,
next_id: Arc::new(AtomicU64::new(0)),
}
}
/// Creates a new client from a config.
@@ -307,43 +269,40 @@ impl SubxtClient {
}
/// Send a JSONRPC notification.
pub async fn notification<M, P>(
pub async fn notification<'a>(
&self,
method: M,
params: P,
) -> Result<(), JsonRpseeError>
where
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
{
method: &'a str,
params: JsonRpcParams<'a>,
) -> Result<(), JsonRpseeError> {
let msg = serde_json::to_string(&JsonRpcNotificationSer::new(method, params))
.map_err(JsonRpseeError::ParseError)?;
self.to_back
.clone()
.send(FrontToBack::Notification(NotificationMessage {
method: method.into(),
params: params.into(),
}))
.send(FrontToBack::Notification(msg))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))
}
/// Send a JSONRPC request.
pub async fn request<T, M, P>(
pub async fn request<'a, T>(
&self,
method: M,
params: P,
method: &'a str,
params: JsonRpcParams<'a>,
) -> Result<T, JsonRpseeError>
where
T: DeserializeOwned,
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let msg =
serde_json::to_string(&JsonRpcCallSer::new(Id::Number(id), method, params))
.map_err(JsonRpseeError::ParseError)?;
self.to_back
.clone()
.send(FrontToBack::StartRequest(RequestMessage {
method: method.into(),
params: params.into(),
.send(FrontToBack::Request(RequestMessage {
raw: msg,
id,
send_back: Some(send_back_tx),
}))
.await
@@ -354,33 +313,36 @@ impl SubxtClient {
Ok(Err(err)) => return Err(err),
Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))),
};
jsonrpc::from_value(json_value).map_err(JsonRpseeError::ParseError)
serde_json::from_value(json_value).map_err(JsonRpseeError::ParseError)
}
/// Send a subscription request to the server.
pub async fn subscribe<SM, UM, P, N>(
pub async fn subscribe<'a, N>(
&self,
subscribe_method: SM,
params: P,
unsubscribe_method: UM,
subscribe_method: &'a str,
params: JsonRpcParams<'a>,
unsubscribe_method: &'a str,
) -> Result<Subscription<N>, JsonRpseeError>
where
SM: Into<String> + Send,
UM: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
N: DeserializeOwned,
{
let subscribe_method = subscribe_method.into();
let unsubscribe_method = unsubscribe_method.into();
let params = params.into();
let sub_req_id = self.next_id.fetch_add(1, Ordering::Relaxed);
let unsub_req_id = self.next_id.fetch_add(1, Ordering::Relaxed);
let msg = serde_json::to_string(&JsonRpcCallSer::new(
Id::Number(sub_req_id),
subscribe_method,
params,
))
.map_err(JsonRpseeError::ParseError)?;
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
unsubscribe_method,
params,
raw: msg,
subscribe_id: sub_req_id,
unsubscribe_id: unsub_req_id,
unsubscribe_method: unsubscribe_method.to_owned(),
send_back: send_back_tx,
}))
.await
@@ -545,3 +507,31 @@ impl<C: ChainSpec + 'static> SubxtClientConfig<C> {
service_config
}
}
fn read_jsonrpc_response(
maybe_msg: Option<String>,
id: Id,
) -> Option<Result<JsonValue, JsonRpseeError>> {
let msg = maybe_msg?;
match serde_json::from_str::<JsonRpcResponse<JsonValue>>(&msg) {
Ok(rp) => {
match parse_request_id::<Id>(rp.id) {
Ok(rp_id) if rp_id == id => Some(Ok(rp.result)),
_ => Some(Err(JsonRpseeError::InvalidRequestId)),
}
}
Err(_) => {
match serde_json::from_str::<JsonRpcInvalidRequest<'_>>(&msg) {
Ok(err) => {
let err = JsonRpcErrorAlloc {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::InvalidRequest.into(),
id: parse_request_id(err.id).ok()?,
};
Some(Err(JsonRpseeError::Request(err)))
}
Err(_) => None,
}
}
}
}