refactor: replace reconnecting-jsonrpsee-ws-client with subxt-reconnecting-rpc-client (#1705)

* feat: add native subxt rpc reconn client

* add jsonrpsee dep to reconnecting-client

* Update subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs

* fix grumbles

* add simple wasm test for reconnecting client

* fix test build

* cargo fmt

* remove reconnect apis

* Update testing/wasm-rpc-tests/tests/wasm.rs

* Update subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs

* Update subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs
This commit is contained in:
Niklas Adolfsson
2024-08-27 15:18:06 +02:00
committed by GitHub
parent 193452e95f
commit 4bc27d4977
14 changed files with 1102 additions and 343 deletions
@@ -1,270 +0,0 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{Future, FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::{CallRetryPolicy, Client as InnerClient, SubscriptionId};
use serde_json::value::RawValue;
use std::time::Duration;
pub use reconnecting_jsonrpsee_ws_client::{
ExponentialBackoff, FibonacciBackoff, FixedInterval, IdKind,
};
#[cfg(feature = "native")]
use reconnecting_jsonrpsee_ws_client::{HeaderMap, PingConfig};
/// Builder for [`Client`].
#[derive(Debug, Clone)]
pub struct Builder<P> {
max_request_size: u32,
max_response_size: u32,
retry_policy: P,
max_redirections: u32,
id_kind: IdKind,
max_log_len: u32,
max_concurrent_requests: u32,
request_timeout: Duration,
connection_timeout: Duration,
#[cfg(feature = "native")]
ping_config: Option<PingConfig>,
#[cfg(feature = "native")]
headers: HeaderMap,
}
impl Default for Builder<ExponentialBackoff> {
fn default() -> Self {
Self {
max_request_size: 10 * 1024 * 1024,
max_response_size: 10 * 1024 * 1024,
retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)),
max_redirections: 5,
id_kind: IdKind::Number,
max_log_len: 1024,
max_concurrent_requests: 1024,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
#[cfg(feature = "native")]
ping_config: Some(PingConfig::new()),
#[cfg(feature = "native")]
headers: HeaderMap::new(),
}
}
}
impl Builder<ExponentialBackoff> {
/// Create a new builder.
pub fn new() -> Self {
Self::default()
}
}
impl<P> Builder<P>
where
P: Iterator<Item = Duration> + Send + Sync + 'static + Clone,
{
/// Configure the min response size a for websocket message.
///
/// Default: 10MB
pub fn max_request_size(mut self, max: u32) -> Self {
self.max_request_size = max;
self
}
/// Configure the max response size a for websocket message.
///
/// Default: 10MB
pub fn max_response_size(mut self, max: u32) -> Self {
self.max_response_size = max;
self
}
/// Set the max number of redirections to perform until a connection is regarded as failed.
///
/// Default: 5
pub fn max_redirections(mut self, redirect: u32) -> Self {
self.max_redirections = redirect;
self
}
/// Configure how many concurrent method calls are allowed.
///
/// Default: 1024
pub fn max_concurrent_requests(mut self, max: u32) -> Self {
self.max_concurrent_requests = max;
self
}
/// Configure how long until a method call is regarded as failed.
///
/// Default: 1 minute
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Set connection timeout for the WebSocket handshake
///
/// Default: 10 seconds
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
/// Configure the data type of the request object ID
///
/// Default: number
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}
/// Set maximum length for logging calls and responses.
/// Logs bigger than this limit will be truncated.
///
/// Default: 1024
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_len = max;
self
}
/// Configure which retry policy to use.
///
/// Default: Exponential backoff 10ms
pub fn retry_policy<T: Iterator<Item = Duration> + Send + Sync + 'static + Clone>(
self,
retry_policy: T,
) -> Builder<T> {
Builder {
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
retry_policy,
max_redirections: self.max_redirections,
max_log_len: self.max_log_len,
id_kind: self.id_kind,
max_concurrent_requests: self.max_concurrent_requests,
request_timeout: self.request_timeout,
connection_timeout: self.connection_timeout,
#[cfg(feature = "native")]
ping_config: self.ping_config,
#[cfg(feature = "native")]
headers: self.headers,
}
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Configure the WebSocket ping/pong interval.
///
/// Default: 30 seconds.
pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self {
self.ping_config = Some(ping_config);
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Disable WebSocket ping/pongs.
///
/// Default: 30 seconds.
pub fn disable_ws_ping(mut self) -> Self {
self.ping_config = None;
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(native)))]
/// Configure custom headers to use in the WebSocket handshake.
pub fn set_headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers;
self
}
/// Build and connect to the target.
pub async fn build(self, url: String) -> Result<Client, RpcError> {
let client = InnerClient::builder()
.retry_policy(self.retry_policy)
.build(url)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
Ok(Client(client))
}
}
/// Reconnecting rpc client.
#[derive(Debug, Clone)]
pub struct Client(InnerClient);
impl Client {
/// Create a builder.
pub fn builder() -> Builder<ExponentialBackoff> {
Builder::new()
}
/// A future that resolves when the client has initiated a reconnection.
/// This method returns another future that resolves when the client has reconnected.
///
/// This may be called multiple times.
pub async fn reconnect_initiated(&self) -> impl Future<Output = ()> + '_ {
self.0.reconnect_started().await;
self.0.reconnected()
}
/// Get how many times the client has reconnected successfully.
pub fn reconnect_count(&self) -> usize {
self.0.reconnect_count()
}
}
impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.0
.request_raw_with_policy(method.to_string(), params, CallRetryPolicy::Drop)
.await
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.0
.subscribe_raw_with_policy(
sub.to_string(),
params,
unsub.to_string(),
CallRetryPolicy::Drop,
)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
@@ -0,0 +1,640 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! # reconnecting-jsonrpsee-ws-client
//!
//! A simple reconnecting JSON-RPC WebSocket client for subxt which
//! automatically reconnects when the connection is lost but
//! it doesn't retain subscriptions and pending method calls when it reconnects.
//!
//! The logic which action to take for individual calls and subscriptions are
//! handled by the subxt backend implementations.
//!
//! # Example
//!
//! ```no_run
//! use std::time::Duration;
//! use futures::StreamExt;
//! use subxt::backend::rpc::reconnecting_rpc_client::{RpcClient, ExponentialBackoff};
//! use subxt::{OnlineClient, PolkadotConfig};
//!
//! #[tokio::main]
//! async fn main() {
//! let rpc = RpcClient::builder()
//! .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
//! .build("ws://localhost:9944".to_string())
//! .await
//! .unwrap();
//!
//! let subxt_client: OnlineClient<PolkadotConfig> = OnlineClient::from_rpc_client(rpc.clone()).await.unwrap();
//! let mut blocks_sub = subxt_client.blocks().subscribe_finalized().await.unwrap();
//!
//! while let Some(block) = blocks_sub.next().await {
//! let block = match block {
//! Ok(b) => b,
//! Err(e) => {
//! if e.is_disconnected_will_reconnect() {
//! println!("The RPC connection was lost and we may have missed a few blocks");
//! continue;
//! } else {
//! panic!("Error: {}", e);
//! }
//! }
//! };
//! println!("Block #{} ({})", block.number(), block.hash());
//! }
//! }
//! ```
mod platform;
#[cfg(test)]
mod tests;
mod utils;
use std::{
pin::Pin,
sync::Arc,
task::{self, Poll},
time::Duration,
};
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError as SubxtRpcError;
use finito::Retry;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use jsonrpsee::core::{
client::{
Client as WsClient, ClientT, Subscription as RpcSubscription, SubscriptionClientT,
SubscriptionKind,
},
traits::ToRpcParams,
};
use platform::spawn;
use serde_json::value::RawValue;
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot, Notify,
};
use utils::display_close_reason;
// re-exports
pub use finito::{ExponentialBackoff, FibonacciBackoff, FixedInterval};
pub use jsonrpsee::core::client::IdKind;
pub use jsonrpsee::{core::client::error::Error as RpcError, rpc_params, types::SubscriptionId};
#[cfg(feature = "native")]
pub use jsonrpsee::ws_client::{HeaderMap, PingConfig};
const LOG_TARGET: &str = "subxt-reconnecting-rpc-client";
/// Method result.
pub type MethodResult = Result<Box<RawValue>, Error>;
/// Subscription result.
pub type SubscriptionResult = Result<Box<RawValue>, DisconnectedWillReconnect>;
/// The connection was closed, reconnect initiated and the subscription was dropped.
#[derive(Debug, thiserror::Error)]
#[error("The connection was closed because of `{0:?}` and reconnect initiated")]
pub struct DisconnectedWillReconnect(String);
/// New-type pattern which implements [`ToRpcParams`] that is required by jsonrpsee.
#[derive(Debug, Clone)]
struct RpcParams(Option<Box<RawValue>>);
impl ToRpcParams for RpcParams {
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
Ok(self.0)
}
}
#[derive(Debug)]
enum Op {
Call {
method: String,
params: RpcParams,
send_back: oneshot::Sender<MethodResult>,
},
Subscription {
subscribe_method: String,
params: RpcParams,
unsubscribe_method: String,
send_back: oneshot::Sender<Result<Subscription, Error>>,
},
}
/// Error that can occur when for a RPC call or subscription.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The client was dropped by the user.
#[error("The client was dropped")]
Dropped,
/// The connection was closed and reconnect initiated.
#[error(transparent)]
DisconnectedWillReconnect(#[from] DisconnectedWillReconnect),
/// Other rpc error.
#[error("{0}")]
RpcError(RpcError),
}
/// Represent a single subscription.
pub struct Subscription {
id: SubscriptionId<'static>,
stream: mpsc::UnboundedReceiver<SubscriptionResult>,
}
impl Subscription {
/// Returns the next notification from the stream.
/// This may return `None` if the subscription has been terminated,
/// which may happen if the channel becomes full or is dropped.
///
/// **Note:** This has an identical signature to the [`StreamExt::next`]
/// method (and delegates to that). Import [`StreamExt`] if you'd like
/// access to other stream combinator methods.
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<SubscriptionResult> {
StreamExt::next(self).await
}
/// Get the subscription ID.
pub fn id(&self) -> SubscriptionId<'static> {
self.id.clone()
}
}
impl Stream for Subscription {
type Item = SubscriptionResult;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
match self.stream.poll_recv(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(msg)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("id", &self.id)
.finish()
}
}
/// JSON-RPC client that reconnects automatically and may loose
/// subscription notifications when it reconnects.
#[derive(Clone, Debug)]
pub struct RpcClient {
tx: mpsc::UnboundedSender<Op>,
}
/// Builder for [`Client`].
#[derive(Clone, Debug)]
pub struct RpcClientBuilder<P> {
max_request_size: u32,
max_response_size: u32,
retry_policy: P,
#[cfg(feature = "native")]
ping_config: Option<PingConfig>,
#[cfg(feature = "native")]
// web doesn't support custom headers
// https://stackoverflow.com/a/4361358/6394734
headers: HeaderMap,
max_redirections: u32,
id_kind: IdKind,
max_log_len: u32,
max_concurrent_requests: u32,
request_timeout: Duration,
connection_timeout: Duration,
}
impl Default for RpcClientBuilder<ExponentialBackoff> {
fn default() -> Self {
Self {
max_request_size: 10 * 1024 * 1024,
max_response_size: 10 * 1024 * 1024,
retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)),
#[cfg(feature = "native")]
ping_config: Some(PingConfig::new()),
#[cfg(feature = "native")]
headers: HeaderMap::new(),
max_redirections: 5,
id_kind: IdKind::Number,
max_log_len: 1024,
max_concurrent_requests: 1024,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
}
}
}
impl RpcClientBuilder<ExponentialBackoff> {
/// Create a new builder.
pub fn new() -> Self {
Self::default()
}
}
impl<P> RpcClientBuilder<P>
where
P: Iterator<Item = Duration> + Send + Sync + 'static + Clone,
{
/// Configure the min response size a for websocket message.
///
/// Default: 10MB
pub fn max_request_size(mut self, max: u32) -> Self {
self.max_request_size = max;
self
}
/// Configure the max response size a for websocket message.
///
/// Default: 10MB
pub fn max_response_size(mut self, max: u32) -> Self {
self.max_response_size = max;
self
}
/// Set the max number of redirections to perform until a connection is regarded as failed.
///
/// Default: 5
pub fn max_redirections(mut self, redirect: u32) -> Self {
self.max_redirections = redirect;
self
}
/// Configure how many concurrent method calls are allowed.
///
/// Default: 1024
pub fn max_concurrent_requests(mut self, max: u32) -> Self {
self.max_concurrent_requests = max;
self
}
/// Configure how long until a method call is regarded as failed.
///
/// Default: 1 minute
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Set connection timeout for the WebSocket handshake
///
/// Default: 10 seconds
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
/// Configure the data type of the request object ID
///
/// Default: number
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}
/// Set maximum length for logging calls and responses.
/// Logs bigger than this limit will be truncated.
///
/// Default: 1024
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_len = max;
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Configure custom headers to use in the WebSocket handshake.
pub fn set_headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers;
self
}
/// Configure which retry policy to use when a connection is lost.
///
/// Default: Exponential backoff 10ms
pub fn retry_policy<T>(self, retry_policy: T) -> RpcClientBuilder<T> {
RpcClientBuilder {
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
retry_policy,
#[cfg(feature = "native")]
ping_config: self.ping_config,
#[cfg(feature = "native")]
headers: self.headers,
max_redirections: self.max_redirections,
max_log_len: self.max_log_len,
id_kind: self.id_kind,
max_concurrent_requests: self.max_concurrent_requests,
request_timeout: self.request_timeout,
connection_timeout: self.connection_timeout,
}
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Configure the WebSocket ping/pong interval.
///
/// Default: 30 seconds.
pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self {
self.ping_config = Some(ping_config);
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Disable WebSocket ping/pongs.
///
/// Default: 30 seconds.
pub fn disable_ws_ping(mut self) -> Self {
self.ping_config = None;
self
}
/// Build and connect to the target.
pub async fn build(self, url: String) -> Result<RpcClient, RpcError> {
let (tx, rx) = mpsc::unbounded_channel();
let client = Retry::new(self.retry_policy.clone(), || {
platform::ws_client(url.as_ref(), &self)
})
.await?;
platform::spawn(background_task(client, rx, url, self));
Ok(RpcClient { tx })
}
}
impl RpcClient {
/// Create a builder.
pub fn builder() -> RpcClientBuilder<ExponentialBackoff> {
RpcClientBuilder::new()
}
/// Perform a JSON-RPC method call.
pub async fn request(
&self,
method: String,
params: Option<Box<RawValue>>,
) -> Result<Box<RawValue>, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Op::Call {
method,
params: RpcParams(params),
send_back: tx,
})
.map_err(|_| Error::Dropped)?;
rx.await.map_err(|_| Error::Dropped)?
}
/// Perform a JSON-RPC subscription.
pub async fn subscribe(
&self,
subscribe_method: String,
params: Option<Box<RawValue>>,
unsubscribe_method: String,
) -> Result<Subscription, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Op::Subscription {
subscribe_method,
params: RpcParams(params),
unsubscribe_method,
send_back: tx,
})
.map_err(|_| Error::Dropped)?;
rx.await.map_err(|_| Error::Dropped)?
}
}
impl RpcClientT for RpcClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request(method.to_string(), params)
.await
.map_err(|e| SubxtRpcError::DisconnectedWillReconnect(e.to_string()))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| SubxtRpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| SubxtRpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
async fn background_task<P>(
mut client: Arc<WsClient>,
mut rx: UnboundedReceiver<Op>,
url: String,
client_builder: RpcClientBuilder<P>,
) where
P: Iterator<Item = Duration> + Send + 'static + Clone,
{
let disconnect = Arc::new(tokio::sync::Notify::new());
loop {
tokio::select! {
// An incoming JSON-RPC call to dispatch.
next_message = rx.recv() => {
match next_message {
None => break,
Some(op) => {
spawn(dispatch_call(client.clone(), op, disconnect.clone()));
}
};
}
// The connection was terminated and try to reconnect.
_ = client.on_disconnect() => {
let params = ReconnectParams {
url: &url,
client_builder: &client_builder,
close_reason: client.disconnect_reason().await,
};
client = match reconnect(params).await {
Ok(client) => client,
Err(e) => {
tracing::debug!(target: LOG_TARGET, "Failed to reconnect: {e}; terminating the connection");
break;
}
};
}
}
}
disconnect.notify_waiters();
}
async fn dispatch_call(client: Arc<WsClient>, op: Op, on_disconnect: Arc<tokio::sync::Notify>) {
match op {
Op::Call {
method,
params,
send_back,
} => {
match client.request::<Box<RawValue>, _>(&method, params).await {
Ok(rp) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Ok(rp));
}
Err(RpcError::RestartNeeded(e)) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Err(DisconnectedWillReconnect(e.to_string()).into()));
}
Err(e) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Err(Error::RpcError(e)));
}
}
}
Op::Subscription {
subscribe_method,
params,
unsubscribe_method,
send_back,
} => {
match client
.subscribe::<Box<RawValue>, _>(
&subscribe_method,
params.clone(),
&unsubscribe_method,
)
.await
{
Ok(sub) => {
let (tx, rx) = mpsc::unbounded_channel();
let sub_id = match sub.kind() {
SubscriptionKind::Subscription(id) => id.clone().into_owned(),
_ => unreachable!("No method subscriptions possible in this crate; qed"),
};
platform::spawn(subscription_handler(
tx.clone(),
sub,
on_disconnect.clone(),
client.clone(),
));
let stream = Subscription {
id: sub_id,
stream: rx,
};
// Fails only if the request is dropped by the client.
let _ = send_back.send(Ok(stream));
}
Err(RpcError::RestartNeeded(e)) => {
// Fails only if the request is dropped by the client.
let _ = send_back.send(Err(DisconnectedWillReconnect(e.to_string()).into()));
}
Err(e) => {
// Fails only if the request is dropped.
let _ = send_back.send(Err(Error::RpcError(e)));
}
}
}
}
}
/// Handler for each individual subscription.
async fn subscription_handler(
sub_tx: UnboundedSender<SubscriptionResult>,
mut rpc_sub: RpcSubscription<Box<RawValue>>,
client_closed: Arc<Notify>,
client: Arc<WsClient>,
) {
loop {
tokio::select! {
next_msg = rpc_sub.next() => {
let Some(notif) = next_msg else {
let close = client.disconnect_reason().await;
_ = sub_tx.send(Err(DisconnectedWillReconnect(close.to_string())));
break;
};
let msg = notif.expect("RawValue is valid JSON; qed");
// Fails only if subscription was closed by the user.
if sub_tx.send(Ok(msg)).is_err() {
break;
}
}
// This channel indices whether the subscription was closed by user.
_ = sub_tx.closed() => {
break;
}
// This channel indicates whether the main task has been closed.
// at this point no further messages are processed.
_ = client_closed.notified() => {
break;
}
}
}
}
struct ReconnectParams<'a, P> {
url: &'a str,
client_builder: &'a RpcClientBuilder<P>,
close_reason: RpcError,
}
async fn reconnect<P>(params: ReconnectParams<'_, P>) -> Result<Arc<WsClient>, RpcError>
where
P: Iterator<Item = Duration> + Send + 'static + Clone,
{
let ReconnectParams {
url,
client_builder,
close_reason,
} = params;
let retry_policy = client_builder.retry_policy.clone();
tracing::debug!(target: LOG_TARGET, "Connection to {url} was closed: `{}`; starting to reconnect", display_close_reason(&close_reason));
let client = Retry::new(retry_policy.clone(), || {
platform::ws_client(url, client_builder)
})
.await?;
tracing::debug!(target: LOG_TARGET, "Connection to {url} was successfully re-established");
Ok(client)
}
@@ -0,0 +1,83 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::backend::rpc::reconnecting_rpc_client::{RpcClientBuilder, RpcError};
use jsonrpsee::core::client::Client;
use std::sync::Arc;
#[cfg(feature = "native")]
pub use tokio::spawn;
#[cfg(feature = "web")]
pub use wasm_bindgen_futures::spawn_local as spawn;
#[cfg(feature = "native")]
pub async fn ws_client<P>(
url: &str,
builder: &RpcClientBuilder<P>,
) -> Result<Arc<Client>, RpcError> {
use jsonrpsee::ws_client::WsClientBuilder;
let RpcClientBuilder {
max_request_size,
max_response_size,
ping_config,
headers,
max_redirections,
id_kind,
max_concurrent_requests,
max_log_len,
request_timeout,
connection_timeout,
..
} = builder;
let mut ws_client_builder = WsClientBuilder::new()
.max_request_size(*max_request_size)
.max_response_size(*max_response_size)
.set_headers(headers.clone())
.max_redirections(*max_redirections as usize)
.max_buffer_capacity_per_subscription(tokio::sync::Semaphore::MAX_PERMITS)
.max_concurrent_requests(*max_concurrent_requests as usize)
.set_max_logging_length(*max_log_len)
.set_tcp_no_delay(true)
.request_timeout(*request_timeout)
.connection_timeout(*connection_timeout)
.id_format(*id_kind);
if let Some(ping) = ping_config {
ws_client_builder = ws_client_builder.enable_ws_ping(*ping);
}
let client = ws_client_builder.build(url).await?;
Ok(Arc::new(client))
}
#[cfg(feature = "web")]
pub async fn ws_client<P>(
url: &str,
builder: &RpcClientBuilder<P>,
) -> Result<Arc<Client>, RpcError> {
use jsonrpsee::wasm_client::WasmClientBuilder;
let RpcClientBuilder {
id_kind,
max_concurrent_requests,
max_log_len,
request_timeout,
..
} = builder;
let ws_client_builder = WasmClientBuilder::new()
.max_buffer_capacity_per_subscription(tokio::sync::Semaphore::MAX_PERMITS)
.max_concurrent_requests(*max_concurrent_requests as usize)
.set_max_logging_length(*max_log_len)
.request_timeout(*request_timeout)
.id_format(*id_kind);
let client = ws_client_builder.build(url).await?;
Ok(Arc::new(client))
}
@@ -0,0 +1,274 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::*;
use futures::{future::Either, FutureExt};
use jsonrpsee::core::BoxError;
use jsonrpsee::server::{
http, stop_channel, ws, ConnectionGuard, ConnectionState, HttpRequest, HttpResponse, RpcModule,
RpcServiceBuilder, ServerConfig, SubscriptionMessage,
};
#[tokio::test]
async fn call_works() {
tracing_subscriber::fmt::init();
let (_handle, addr) = run_server().await.unwrap();
let client = RpcClient::builder().build(addr).await.unwrap();
assert!(client.request("say_hello".to_string(), None).await.is_ok(),)
}
#[tokio::test]
async fn sub_works() {
tracing_subscriber::fmt::init();
let (_handle, addr) = run_server().await.unwrap();
let client = RpcClient::builder()
.retry_policy(ExponentialBackoff::from_millis(50))
.build(addr)
.await
.unwrap();
let mut sub = client
.subscribe(
"subscribe_lo".to_string(),
None,
"unsubscribe_lo".to_string(),
)
.await
.unwrap();
assert!(sub.next().await.is_some());
}
#[tokio::test]
async fn sub_with_reconnect() {
tracing_subscriber::fmt::init();
let (handle, addr) = run_server().await.unwrap();
let client = RpcClient::builder().build(addr.clone()).await.unwrap();
let mut sub = client
.subscribe(
"subscribe_lo".to_string(),
None,
"unsubscribe_lo".to_string(),
)
.await
.unwrap();
let _ = handle.send(());
// Hack to wait for the server to restart.
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(matches!(sub.next().await, Some(Ok(_))));
assert!(matches!(
sub.next().await,
Some(Err(DisconnectedWillReconnect(_)))
));
// Restart the server.
let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap();
// Hack to wait for the server to restart.
tokio::time::sleep(Duration::from_millis(100)).await;
// Subscription should work after reconnect.
let mut sub = client
.subscribe(
"subscribe_lo".to_string(),
None,
"unsubscribe_lo".to_string(),
)
.await
.unwrap();
assert!(matches!(sub.next().await, Some(Ok(_))));
}
#[tokio::test]
async fn call_with_reconnect() {
tracing_subscriber::fmt::init();
let (handle, addr) = run_server_with_settings(None, true).await.unwrap();
let client = Arc::new(RpcClient::builder().build(addr.clone()).await.unwrap());
let req_fut = client.request("say_hello".to_string(), None).boxed();
let timeout_fut = tokio::time::sleep(Duration::from_secs(5));
// If the call isn't replied in 5 secs then it's regarded as it's still pending.
let req_fut = match futures::future::select(Box::pin(timeout_fut), req_fut).await {
Either::Left((_, f)) => f,
Either::Right(_) => panic!("RPC call finished"),
};
// Close the connection with a pending call.
let _ = handle.send(());
// Restart the server
let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap();
// Hack to wait for the server to restart.
tokio::time::sleep(Duration::from_millis(100)).await;
// This call should fail because reconnect.
assert!(req_fut.await.is_err());
// Future call should work after reconnect.
assert!(client.request("say_hello".to_string(), None).await.is_ok());
}
async fn run_server() -> Result<(tokio::sync::broadcast::Sender<()>, String), BoxError> {
run_server_with_settings(None, false).await
}
async fn run_server_with_settings(
url: Option<&str>,
dont_respond_to_method_calls: bool,
) -> Result<(tokio::sync::broadcast::Sender<()>, String), BoxError> {
use jsonrpsee::server::HttpRequest;
let sockaddr = match url {
Some(url) => url.strip_prefix("ws://").unwrap(),
None => "127.0.0.1:0",
};
let mut i = 0;
let listener = loop {
if let Ok(l) = tokio::net::TcpListener::bind(sockaddr).await {
break l;
}
tokio::time::sleep(Duration::from_millis(100)).await;
if i >= 10 {
panic!("Addr already in use");
}
i += 1;
};
let mut module = RpcModule::new(());
if dont_respond_to_method_calls {
module.register_async_method("say_hello", |_, _, _| async {
futures::future::pending::<()>().await;
"timeout"
})?;
} else {
module.register_async_method("say_hello", |_, _, _| async { "lo" })?;
}
module.register_subscription(
"subscribe_lo",
"subscribe_lo",
"unsubscribe_lo",
|_params, pending, _ctx, _| async move {
let sink = pending.accept().await.unwrap();
let i = 0;
loop {
if sink
.send(SubscriptionMessage::from_json(&i).unwrap())
.await
.is_err()
{
break;
}
tokio::time::sleep(std::time::Duration::from_secs(6)).await;
}
},
)?;
let (tx, mut rx) = tokio::sync::broadcast::channel(4);
let tx2 = tx.clone();
let (stop_handle, server_handle) = stop_channel();
let addr = listener.local_addr().expect("Could not find local addr");
tokio::spawn(async move {
loop {
let sock = tokio::select! {
res = listener.accept() => {
match res {
Ok((stream, _remote_addr)) => stream,
Err(e) => {
tracing::error!("Failed to accept connection: {:?}", e);
continue;
}
}
}
_ = rx.recv() => {
break
}
};
let module = module.clone();
let rx2 = tx2.subscribe();
let tx2 = tx2.clone();
let stop_handle2 = stop_handle.clone();
let svc = tower::service_fn(move |req: HttpRequest<hyper::body::Incoming>| {
let module = module.clone();
let tx = tx2.clone();
let stop_handle = stop_handle2.clone();
let conn_permit = ConnectionGuard::new(1).try_acquire().unwrap();
if ws::is_upgrade_request(&req) {
let rpc_service = RpcServiceBuilder::new();
let conn = ConnectionState::new(stop_handle, 1, conn_permit);
async move {
let mut rx = tx.subscribe();
let (rp, conn_fut) =
ws::connect(req, ServerConfig::default(), module, conn, rpc_service)
.await
.unwrap();
tokio::spawn(async move {
tokio::select! {
_ = conn_fut => (),
_ = rx.recv() => {},
}
});
Ok::<_, BoxError>(rp)
}
.boxed()
} else {
async { Ok(http::response::denied()) }.boxed()
}
});
tokio::spawn(serve_with_graceful_shutdown(sock, svc, rx2));
}
drop(server_handle);
});
Ok((tx, format!("ws://{}", addr)))
}
async fn serve_with_graceful_shutdown<S, B, I>(
io: I,
service: S,
mut rx: tokio::sync::broadcast::Receiver<()>,
) where
S: tower::Service<HttpRequest<hyper::body::Incoming>, Response = HttpResponse<B>>
+ Clone
+ Send
+ 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
if let Err(e) =
jsonrpsee::server::serve_with_graceful_shutdown(io, service, rx.recv().map(|_| ())).await
{
tracing::error!("Error while serving: {:?}", e);
}
}
@@ -0,0 +1,14 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Utils.
use crate::backend::rpc::reconnecting_rpc_client::RpcError;
pub fn display_close_reason(err: &RpcError) -> String {
match err {
RpcError::RestartNeeded(e) => e.to_string(),
other => other.to_string(),
}
}
+1 -1
View File
@@ -56,7 +56,7 @@ macro_rules! cfg_jsonrpsee_web {
macro_rules! cfg_reconnecting_rpc_client {
($($item:item)*) => {
$(
#[cfg(feature = "unstable-reconnecting-rpc-client")]
#[cfg(all(feature = "unstable-reconnecting-rpc-client", any(feature = "native", feature = "web")))]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-reconnecting-rpc-client")))]
$item
)*