implement session handling for unsubscribe in subxt-client (#242)

* implement session handling for unsubscribe in subxt-client

Signed-off-by: Gregory Hill <gregorydhill@outlook.com>

* update jsonrpsee to `v0.2.0-alpha.2`

Closes #241

* use new jsonrpsee request message types in subxt client

Signed-off-by: Gregory Hill <gregorydhill@outlook.com>

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Greg Hill
2021-03-09 21:29:21 +00:00
committed by GitHub
parent 9959f0d299
commit c1d4804ccd
4 changed files with 89 additions and 35 deletions
+82 -29
View File
@@ -18,7 +18,13 @@
#![deny(missing_docs)]
use async_std::task;
use async_std::{
sync::{
Arc,
RwLock,
},
task,
};
use futures::{
channel::{
mpsc,
@@ -36,7 +42,10 @@ use futures01::sync::mpsc as mpsc01;
use jsonrpsee_types::{
client::{
FrontToBack,
NotificationMessage,
RequestMessage,
Subscription,
SubscriptionMessage,
},
error::Error as JsonRpseeError,
jsonrpc::{
@@ -75,7 +84,10 @@ use sc_service::{
RpcSession,
TaskManager,
};
use std::marker::PhantomData;
use std::{
collections::HashMap,
marker::PhantomData,
};
use thiserror::Error;
const DEFAULT_CHANNEL_SIZE: usize = 16;
@@ -102,15 +114,31 @@ impl SubxtClient {
pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self {
let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let request_id = Arc::new(RwLock::new(u64::MIN));
let subscriptions = Arc::new(RwLock::new(HashMap::<u64, String>::new()));
task::spawn(
select(
Box::pin(from_front.for_each(move |message: FrontToBack| {
let rpc = rpc.clone();
let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE);
let session = RpcSession::new(to_front.clone());
let request_id = request_id.clone();
let subscriptions = subscriptions.clone();
async move {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};
match message {
FrontToBack::Notification { method, params } => {
FrontToBack::Notification(NotificationMessage {
method,
params,
}) => {
let request =
Request::Single(Call::Notification(Notification {
jsonrpc: Version::V2,
@@ -122,17 +150,17 @@ impl SubxtClient {
}
}
FrontToBack::StartRequest {
FrontToBack::StartRequest(RequestMessage {
method,
params,
send_back,
} => {
}) => {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: method.into(),
params: params.into(),
id: Id::Num(0),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
if let Some(response) =
@@ -153,25 +181,31 @@ impl SubxtClient {
}
};
send_back
.send(result)
.expect("failed to send request response");
send_back.map(|tx| {
tx.send(result)
.expect("failed to send request response")
});
}
}
}
FrontToBack::Subscribe {
FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
params,
unsubscribe_method: _,
unsubscribe_method,
send_back,
} => {
}) => {
{
let mut subscriptions = subscriptions.write().await;
subscriptions.insert(request_id, unsubscribe_method);
}
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: subscribe_method,
params,
id: Id::Num(0),
id: Id::Num(request_id),
}));
let (mut send_front_sub, send_back_sub) =
@@ -188,10 +222,7 @@ impl SubxtClient {
Output::Success(_) => {
Ok((
send_back_sub,
// NOTE: The ID is used to unsubscribe to specific subscription
// which the `SubxtClient` doesn't support so hardcoding it to `0`
// is fine.
SubscriptionId::Num(0),
SubscriptionId::Num(request_id),
))
}
Output::Failure(failure) => {
@@ -219,16 +250,38 @@ impl SubxtClient {
&response
)
.expect("failed to decode subscription notif");
send_front_sub
// ignore send error since the channel is probably closed
let _ = send_front_sub
.send(notif.params.result)
.await
.expect("failed to send subscription notif")
.await;
}
});
}
FrontToBack::SubscriptionClosed(_) => {
// NOTE: unsubscriptions are not supported by SubxtClient.
FrontToBack::SubscriptionClosed(subscription_id) => {
let sub_id =
if let SubscriptionId::Num(num) = subscription_id {
num
} else {
unreachable!("subscription id should be num")
};
let json_sub_id = jsonrpc::to_value(sub_id).unwrap();
let subscriptions = subscriptions.read().await;
if let Some(unsubscribe) = subscriptions.get(&sub_id) {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: unsubscribe.into(),
params: jsonrpc::Params::Array(vec![
json_sub_id,
]),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
rpc.rpc_query(&session, &message).await;
}
}
}
}
}
@@ -265,10 +318,10 @@ impl SubxtClient {
{
self.to_back
.clone()
.send(FrontToBack::Notification {
.send(FrontToBack::Notification(NotificationMessage {
method: method.into(),
params: params.into(),
})
}))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))
}
@@ -288,11 +341,11 @@ impl SubxtClient {
self.to_back
.clone()
.send(FrontToBack::StartRequest {
.send(FrontToBack::StartRequest(RequestMessage {
method: method.into(),
params: params.into(),
send_back: send_back_tx,
})
send_back: Some(send_back_tx),
}))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?;
@@ -324,12 +377,12 @@ impl SubxtClient {
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::Subscribe {
.send(FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
unsubscribe_method,
params,
send_back: send_back_tx,
})
}))
.await
.map_err(JsonRpseeError::Internal)?;