feat: add reconnecting-rpc-client (#1396)

* initial commit

* update to reconnecting-ws-client v0.2

* re-export: reconnecting_rpc_client behind feature

* add helper function for reconnect

* fix nit in example

* simplify code without weird error fmt

* address grumbles

* address grumbles

* update reconnecting-ws-client 0.3

* cleanup error message
This commit is contained in:
Niklas Adolfsson
2024-02-08 13:19:06 +01:00
committed by GitHub
parent 61ab6b915e
commit cb67f94455
7 changed files with 287 additions and 16 deletions
Generated
+120 -11
View File
@@ -2302,10 +2302,21 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9579d0ca9fb30da026bac2f0f7d9576ec93489aeb7cd4971dd5b4617d82c79b2"
dependencies = [
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-client-transport 0.21.0",
"jsonrpsee-core 0.21.0",
"jsonrpsee-http-client",
"jsonrpsee-types",
"jsonrpsee-types 0.21.0",
]
[[package]]
name = "jsonrpsee"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a95f7cc23d5fab0cdeeaf6bad8c8f5e7a3aa7f0d211957ea78232b327ab27b0"
dependencies = [
"jsonrpsee-core 0.22.0",
"jsonrpsee-types 0.22.0",
"jsonrpsee-ws-client",
]
[[package]]
@@ -2318,7 +2329,28 @@ dependencies = [
"futures-util",
"gloo-net",
"http",
"jsonrpsee-core",
"jsonrpsee-core 0.21.0",
"pin-project",
"rustls-native-certs 0.7.0",
"rustls-pki-types",
"soketto",
"thiserror",
"tokio",
"tokio-rustls 0.25.0",
"tokio-util",
"tracing",
"url",
]
[[package]]
name = "jsonrpsee-client-transport"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b1736cfa3845fd9f8f43751f2b8e0e83f7b6081e754502f7d63b6587692cc83"
dependencies = [
"futures-util",
"http",
"jsonrpsee-core 0.22.0",
"pin-project",
"rustls-native-certs 0.7.0",
"rustls-pki-types",
@@ -2344,7 +2376,7 @@ dependencies = [
"futures-timer",
"futures-util",
"hyper",
"jsonrpsee-types",
"jsonrpsee-types 0.21.0",
"pin-project",
"rustc-hash",
"serde",
@@ -2356,6 +2388,29 @@ dependencies = [
"wasm-bindgen-futures",
]
[[package]]
name = "jsonrpsee-core"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82030d038658974732103e623ba2e0abec03bbbe175b39c0a2fafbada60c5868"
dependencies = [
"anyhow",
"async-lock 3.3.0",
"async-trait",
"beef",
"futures-timer",
"futures-util",
"jsonrpsee-types 0.22.0",
"pin-project",
"rustc-hash",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "jsonrpsee-http-client"
version = "0.21.0"
@@ -2365,8 +2420,8 @@ dependencies = [
"async-trait",
"hyper",
"hyper-rustls",
"jsonrpsee-core",
"jsonrpsee-types",
"jsonrpsee-core 0.21.0",
"jsonrpsee-types 0.21.0",
"serde",
"serde_json",
"thiserror",
@@ -2389,6 +2444,32 @@ dependencies = [
"thiserror",
]
[[package]]
name = "jsonrpsee-types"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a48fdc1202eafc51c63e00406575e59493284ace8b8b61aa16f3a6db5d64f1a"
dependencies = [
"anyhow",
"beef",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "jsonrpsee-ws-client"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5ce25d70a8e4d3cc574bbc3cad0137c326ad64b194793d5e7bbdd3fa4504181"
dependencies = [
"http",
"jsonrpsee-client-transport 0.22.0",
"jsonrpsee-core 0.22.0",
"jsonrpsee-types 0.22.0",
"url",
]
[[package]]
name = "keccak"
version = "0.1.5"
@@ -3145,6 +3226,22 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "reconnecting-jsonrpsee-ws-client"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea5cf7b021db88f1af45a9b2ecdbe5bc1c5cbebc146632269d572cdd435f5cf"
dependencies = [
"futures",
"jsonrpsee 0.22.0",
"serde_json",
"thiserror",
"tokio",
"tokio-retry",
"tokio-stream",
"tracing",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@@ -4453,9 +4550,10 @@ dependencies = [
"hex",
"impl-serde",
"instant",
"jsonrpsee",
"jsonrpsee 0.21.0",
"parity-scale-codec",
"primitive-types",
"reconnecting-jsonrpsee-ws-client",
"scale-bits",
"scale-decode",
"scale-encode",
@@ -4490,7 +4588,7 @@ dependencies = [
"heck",
"hex",
"indoc",
"jsonrpsee",
"jsonrpsee 0.21.0",
"parity-scale-codec",
"pretty_assertions",
"quote",
@@ -4517,7 +4615,7 @@ dependencies = [
"getrandom",
"heck",
"hex",
"jsonrpsee",
"jsonrpsee 0.21.0",
"parity-scale-codec",
"proc-macro2",
"quote",
@@ -4654,7 +4752,7 @@ version = "0.34.0"
dependencies = [
"hex",
"impl-serde",
"jsonrpsee",
"jsonrpsee 0.21.0",
"parity-scale-codec",
"serde",
"substrate-runner",
@@ -4753,6 +4851,17 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "tokio-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [
"pin-project",
"rand",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
+11
View File
@@ -42,6 +42,9 @@ web = [
"instant/wasm-bindgen"
]
# Enable this to use the reconnecting rpc client
unstable-reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"]
# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = ["dep:jsonrpsee"]
@@ -103,6 +106,9 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals
# Light client support:
tokio-stream = { workspace = true, optional = true }
# Reconnecting jsonrpc ws client
reconnecting-jsonrpsee-ws-client = { version = "0.3", optional = true }
# For parsing urls to disallow insecure schemes
url = { workspace = true }
@@ -138,6 +144,11 @@ name = "light_client_parachains"
path = "examples/light_client_parachains.rs"
required-features = ["unstable-light-client", "jsonrpsee", "native"]
[[example]]
name = "reconnecting_rpc_client"
path = "examples/reconnecting_rpc_client.rs"
required-features = ["unstable-reconnecting-rpc-client"]
[package.metadata.docs.rs]
features = ["default", "substrate-compat", "unstable-light-client"]
rustdoc-args = ["--cfg", "docsrs"]
+73
View File
@@ -0,0 +1,73 @@
//! Example to utilize the `reconnecting rpc client` in subxt
//! which hidden behind behind `--feature unstable-reconnecting-rpc-client`
//!
//! To utilize full logs from the RPC client use:
//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"`
#![allow(missing_docs)]
use std::time::Duration;
use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig};
use subxt::backend::rpc::RpcClient;
use subxt::error::{Error, RpcError};
use subxt::{OnlineClient, PolkadotConfig};
// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff
//
// This API is "iterator-like" so one could limit it to only
// reconnect x times and then quit.
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
// Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds
// then disconnect.
//
// This is just a way to ensure that the connection isn't idle if no message is sent that often
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// There are other configurations as well that can be found here:
// <https://docs.rs/reconnecting-jsonrpsee-ws-client/latest/reconnecting_jsonrpsee_ws_client/struct.ClientBuilder.html>
.build("ws://localhost:9944".to_string())
.await?;
let api: OnlineClient<PolkadotConfig> =
OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?;
// Subscribe to all finalized blocks:
let mut blocks_sub = api.blocks().subscribe_finalized().await?;
// For each block, print a bunch of information about it:
while let Some(block) = blocks_sub.next().await {
let block = match block {
Ok(b) => b,
Err(Error::Rpc(RpcError::DisconnectedWillReconnect(err))) => {
println!("{err}");
continue;
}
Err(e) => {
return Err(e.into());
}
};
let block_number = block.header().number;
let block_hash = block.hash();
println!("Block #{block_number} ({block_hash})");
}
println!("RPC client reconnected `{}` times", rpc.reconnect_count());
Ok(())
}
+6 -2
View File
@@ -60,9 +60,13 @@ crate::macros::cfg_jsonrpsee! {
mod jsonrpsee_impl;
}
crate::macros::cfg_reconnecting_rpc_client! {
mod reconnecting_jsonrpsee_impl;
pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client;
}
mod rpc_client;
mod rpc_client_t;
pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};
pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription};
pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};
@@ -0,0 +1,52 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::SubscriptionId;
use serde_json::value::RawValue;
impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request_raw(method.to_string(), params)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe_raw(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
+10 -2
View File
@@ -6,8 +6,6 @@
mod dispatch_error;
use core::fmt::Debug;
crate::macros::cfg_unstable_light_client! {
pub use crate::client::LightClientError;
}
@@ -100,6 +98,13 @@ impl From<std::convert::Infallible> for Error {
}
}
impl Error {
/// Checks whether the error was caused by a RPC re-connection.
pub fn is_disconnected_will_reconnect(&self) -> bool {
matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_)))
}
}
/// An RPC error. Since we are generic over the RPC client that is used,
/// the error is boxed and could be casted.
#[derive(Debug, thiserror::Error)]
@@ -120,6 +125,9 @@ pub enum RpcError {
/// The requested URL is insecure.
#[error("RPC error: insecure URL: {0}")]
InsecureUrl(String),
/// The connection was lost and automatically reconnected.
#[error("RPC error: the connection was lost `{0}`; reconnect automatically initiated")]
DisconnectedWillReconnect(String),
}
impl RpcError {
+15 -1
View File
@@ -52,7 +52,21 @@ macro_rules! cfg_jsonrpsee_web {
}
}
pub(crate) use {cfg_feature, cfg_jsonrpsee, cfg_substrate_compat, cfg_unstable_light_client};
#[allow(unused)]
macro_rules! cfg_reconnecting_rpc_client {
($($item:item)*) => {
$(
#[cfg(all(feature = "unstable-reconnecting-rpc-client"))]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-reconnecting-rpc-client")))]
$item
)*
}
}
pub(crate) use {
cfg_feature, cfg_jsonrpsee, cfg_reconnecting_rpc_client, cfg_substrate_compat,
cfg_unstable_light_client,
};
// Only used by light-client.
#[allow(unused)]