mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 15:21:08 +00:00
rpc: Expose the subscription ID for RpcClientT (#733)
* rpc: Extend `RpcClientT` to return the subscription ID Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc: Return `RpcSubscriptionId` for jsonrpsee clients Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc: Expose subscription ID via subxt subscription Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * examples: Adjust example to return subscription ID Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc: Add structure for subscription stream and subscription id Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
@@ -68,7 +68,8 @@ impl RpcClientT for MyLoggingClient {
|
||||
let res = RawValue::from_string("[]".to_string()).unwrap();
|
||||
let stream = futures::stream::once(async move { Ok(res) });
|
||||
let stream: Pin<Box<dyn futures::Stream<Item = _> + Send>> = Box::pin(stream);
|
||||
Box::pin(std::future::ready(Ok(stream)))
|
||||
// This subscription does not provide an ID.
|
||||
Box::pin(std::future::ready(Ok(RpcSubscription { stream, id: None })))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,14 +12,18 @@ use futures::stream::{
|
||||
StreamExt,
|
||||
TryStreamExt,
|
||||
};
|
||||
use jsonrpsee::core::{
|
||||
client::{
|
||||
Client,
|
||||
ClientT,
|
||||
SubscriptionClientT,
|
||||
use jsonrpsee::{
|
||||
core::{
|
||||
client::{
|
||||
Client,
|
||||
ClientT,
|
||||
SubscriptionClientT,
|
||||
SubscriptionKind,
|
||||
},
|
||||
traits::ToRpcParams,
|
||||
Error as JsonRpseeError,
|
||||
},
|
||||
traits::ToRpcParams,
|
||||
Error as JsonRpseeError,
|
||||
types::SubscriptionId,
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
@@ -52,17 +56,26 @@ impl RpcClientT for Client {
|
||||
unsub: &'a str,
|
||||
) -> RpcFuture<'a, RpcSubscription> {
|
||||
Box::pin(async move {
|
||||
let sub = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
|
||||
let stream = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
|
||||
self,
|
||||
sub,
|
||||
Params(params),
|
||||
unsub,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| RpcError::ClientError(Box::new(e)))?
|
||||
.map_err(|e| RpcError::ClientError(Box::new(e)))
|
||||
.boxed();
|
||||
Ok(sub)
|
||||
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
|
||||
|
||||
let id = match stream.kind() {
|
||||
SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
|
||||
Some(id.clone().into_owned())
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let stream = stream
|
||||
.map_err(|e| RpcError::ClientError(Box::new(e)))
|
||||
.boxed();
|
||||
Ok(RpcSubscription { stream, id })
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,6 +67,8 @@ pub use rpc_client_t::{
|
||||
RpcClientT,
|
||||
RpcFuture,
|
||||
RpcSubscription,
|
||||
RpcSubscriptionId,
|
||||
RpcSubscriptionStream,
|
||||
};
|
||||
|
||||
pub use rpc_client::{
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use super::{
|
||||
RpcClientT,
|
||||
RpcSubscription,
|
||||
RpcSubscriptionId,
|
||||
};
|
||||
use crate::error::Error;
|
||||
use futures::{
|
||||
@@ -185,6 +186,11 @@ impl<Res> Subscription<Res> {
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Obtain the ID associated with this subscription.
|
||||
pub fn subscription_id(&self) -> Option<&RpcSubscriptionId> {
|
||||
self.inner.id.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Res: DeserializeOwned> Subscription<Res> {
|
||||
@@ -203,7 +209,7 @@ impl<Res: DeserializeOwned> Stream for Subscription<Res> {
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let res = futures::ready!(self.inner.poll_next_unpin(cx));
|
||||
let res = futures::ready!(self.inner.stream.poll_next_unpin(cx));
|
||||
|
||||
// Decode the inner RawValue to the type we're expecting and map
|
||||
// any errors to the right shape:
|
||||
|
||||
@@ -59,6 +59,17 @@ pub trait RpcClientT: Send + Sync + 'static {
|
||||
pub type RpcFuture<'a, T> =
|
||||
Pin<Box<dyn Future<Output = Result<T, RpcError>> + Send + 'a>>;
|
||||
|
||||
/// The RPC subscription returned from [`RpcClientT`]'s `subscription` method.
|
||||
pub struct RpcSubscription {
|
||||
/// The subscription stream.
|
||||
pub stream: RpcSubscriptionStream,
|
||||
/// The ID associated with the subscription.
|
||||
pub id: Option<RpcSubscriptionId>,
|
||||
}
|
||||
|
||||
/// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method.
|
||||
pub type RpcSubscription =
|
||||
pub type RpcSubscriptionStream =
|
||||
Pin<Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>>;
|
||||
|
||||
/// The ID associated with the [`RpcClientT`]'s `subscription`.
|
||||
pub type RpcSubscriptionId = String;
|
||||
|
||||
Reference in New Issue
Block a user