diff --git a/Cargo.lock b/Cargo.lock index 584c330a32..7c17152bc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2302,10 +2302,21 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9579d0ca9fb30da026bac2f0f7d9576ec93489aeb7cd4971dd5b4617d82c79b2" dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", + "jsonrpsee-client-transport 0.21.0", + "jsonrpsee-core 0.21.0", "jsonrpsee-http-client", - "jsonrpsee-types", + "jsonrpsee-types 0.21.0", +] + +[[package]] +name = "jsonrpsee" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a95f7cc23d5fab0cdeeaf6bad8c8f5e7a3aa7f0d211957ea78232b327ab27b0" +dependencies = [ + "jsonrpsee-core 0.22.0", + "jsonrpsee-types 0.22.0", + "jsonrpsee-ws-client", ] [[package]] @@ -2318,7 +2329,28 @@ dependencies = [ "futures-util", "gloo-net", "http", - "jsonrpsee-core", + "jsonrpsee-core 0.21.0", + "pin-project", + "rustls-native-certs 0.7.0", + "rustls-pki-types", + "soketto", + "thiserror", + "tokio", + "tokio-rustls 0.25.0", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "jsonrpsee-client-transport" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b1736cfa3845fd9f8f43751f2b8e0e83f7b6081e754502f7d63b6587692cc83" +dependencies = [ + "futures-util", + "http", + "jsonrpsee-core 0.22.0", "pin-project", "rustls-native-certs 0.7.0", "rustls-pki-types", @@ -2344,7 +2376,7 @@ dependencies = [ "futures-timer", "futures-util", "hyper", - "jsonrpsee-types", + "jsonrpsee-types 0.21.0", "pin-project", "rustc-hash", "serde", @@ -2356,6 +2388,29 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "jsonrpsee-core" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82030d038658974732103e623ba2e0abec03bbbe175b39c0a2fafbada60c5868" +dependencies = [ + "anyhow", + "async-lock 3.3.0", + "async-trait", + "beef", + "futures-timer", + "futures-util", + "jsonrpsee-types 0.22.0", + "pin-project", + "rustc-hash", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "jsonrpsee-http-client" version = "0.21.0" @@ -2365,8 +2420,8 @@ dependencies = [ "async-trait", "hyper", "hyper-rustls", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.21.0", + "jsonrpsee-types 0.21.0", "serde", "serde_json", "thiserror", @@ -2389,6 +2444,32 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonrpsee-types" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a48fdc1202eafc51c63e00406575e59493284ace8b8b61aa16f3a6db5d64f1a" +dependencies = [ + "anyhow", + "beef", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "jsonrpsee-ws-client" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5ce25d70a8e4d3cc574bbc3cad0137c326ad64b194793d5e7bbdd3fa4504181" +dependencies = [ + "http", + "jsonrpsee-client-transport 0.22.0", + "jsonrpsee-core 0.22.0", + "jsonrpsee-types 0.22.0", + "url", +] + [[package]] name = "keccak" version = "0.1.5" @@ -3145,6 +3226,22 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "reconnecting-jsonrpsee-ws-client" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea5cf7b021db88f1af45a9b2ecdbe5bc1c5cbebc146632269d572cdd435f5cf" +dependencies = [ + "futures", + "jsonrpsee 0.22.0", + "serde_json", + "thiserror", + "tokio", + "tokio-retry", + "tokio-stream", + "tracing", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -4453,9 +4550,10 @@ dependencies = [ "hex", "impl-serde", "instant", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "primitive-types", + "reconnecting-jsonrpsee-ws-client", "scale-bits", "scale-decode", "scale-encode", @@ -4490,7 +4588,7 @@ dependencies = [ "heck", "hex", "indoc", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "pretty_assertions", "quote", @@ -4517,7 +4615,7 @@ dependencies = [ "getrandom", "heck", "hex", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "proc-macro2", "quote", @@ -4654,7 +4752,7 @@ version = "0.34.0" dependencies = [ "hex", "impl-serde", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "serde", "substrate-runner", @@ -4753,6 +4851,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 0686a8c897..5ff125d191 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -42,6 +42,9 @@ web = [ "instant/wasm-bindgen" ] +# Enable this to use the reconnecting rpc client +unstable-reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"] + # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = ["dep:jsonrpsee"] @@ -103,6 +106,9 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals # Light client support: tokio-stream = { workspace = true, optional = true } +# Reconnecting jsonrpc ws client +reconnecting-jsonrpsee-ws-client = { version = "0.3", optional = true } + # For parsing urls to disallow insecure schemes url = { workspace = true } @@ -138,6 +144,11 @@ name = "light_client_parachains" path = "examples/light_client_parachains.rs" required-features = ["unstable-light-client", "jsonrpsee", "native"] +[[example]] +name = "reconnecting_rpc_client" +path = "examples/reconnecting_rpc_client.rs" +required-features = ["unstable-reconnecting-rpc-client"] + [package.metadata.docs.rs] features = ["default", "substrate-compat", "unstable-light-client"] rustdoc-args = ["--cfg", "docsrs"] diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs new file mode 100644 index 0000000000..b21be899f6 --- /dev/null +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -0,0 +1,73 @@ +//! Example to utilize the `reconnecting rpc client` in subxt +//! which hidden behind behind `--feature unstable-reconnecting-rpc-client` +//! +//! To utilize full logs from the RPC client use: +//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"` + +#![allow(missing_docs)] + +use std::time::Duration; + +use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; +use subxt::backend::rpc::RpcClient; +use subxt::error::{Error, RpcError}; +use subxt::{OnlineClient, PolkadotConfig}; + +// Generate an interface that we can use from the node's metadata. +#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")] +pub mod polkadot {} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + // Create a new client with with a reconnecting RPC client. + let rpc = Client::builder() + // Reconnect with exponential backoff + // + // This API is "iterator-like" so one could limit it to only + // reconnect x times and then quit. + .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) + // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds + // then disconnect. + // + // This is just a way to ensure that the connection isn't idle if no message is sent that often + .enable_ws_ping( + PingConfig::new() + .ping_interval(Duration::from_secs(6)) + .inactive_limit(Duration::from_secs(30)), + ) + // There are other configurations as well that can be found here: + // + .build("ws://localhost:9944".to_string()) + .await?; + + let api: OnlineClient = + OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?; + + // Subscribe to all finalized blocks: + let mut blocks_sub = api.blocks().subscribe_finalized().await?; + + // For each block, print a bunch of information about it: + while let Some(block) = blocks_sub.next().await { + let block = match block { + Ok(b) => b, + Err(Error::Rpc(RpcError::DisconnectedWillReconnect(err))) => { + println!("{err}"); + continue; + } + Err(e) => { + return Err(e.into()); + } + }; + + let block_number = block.header().number; + let block_hash = block.hash(); + + println!("Block #{block_number} ({block_hash})"); + } + + println!("RPC client reconnected `{}` times", rpc.reconnect_count()); + + Ok(()) +} diff --git a/subxt/src/backend/rpc/mod.rs b/subxt/src/backend/rpc/mod.rs index 12910939e7..19101dd5df 100644 --- a/subxt/src/backend/rpc/mod.rs +++ b/subxt/src/backend/rpc/mod.rs @@ -60,9 +60,13 @@ crate::macros::cfg_jsonrpsee! { mod jsonrpsee_impl; } +crate::macros::cfg_reconnecting_rpc_client! { + mod reconnecting_jsonrpsee_impl; + pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client; +} + mod rpc_client; mod rpc_client_t; -pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT}; - pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription}; +pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT}; diff --git a/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs b/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs new file mode 100644 index 0000000000..da37b267e0 --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs @@ -0,0 +1,52 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; +use crate::error::RpcError; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use reconnecting_jsonrpsee_ws_client::SubscriptionId; +use serde_json::value::RawValue; + +impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + async { + self.request_raw(method.to_string(), params) + .await + .map_err(|e| RpcError::ClientError(Box::new(e))) + } + .boxed() + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + async { + let sub = self + .subscribe_raw(sub.to_string(), params, unsub.to_string()) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let id = match sub.id() { + SubscriptionId::Num(n) => n.to_string(), + SubscriptionId::Str(s) => s.to_string(), + }; + let stream = sub + .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) + .boxed(); + + Ok(RawRpcSubscription { + stream, + id: Some(id), + }) + } + .boxed() + } +} diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 78c5528012..41dbc11d31 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -6,8 +6,6 @@ mod dispatch_error; -use core::fmt::Debug; - crate::macros::cfg_unstable_light_client! { pub use crate::client::LightClientError; } @@ -100,6 +98,13 @@ impl From for Error { } } +impl Error { + /// Checks whether the error was caused by a RPC re-connection. + pub fn is_disconnected_will_reconnect(&self) -> bool { + matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_))) + } +} + /// An RPC error. Since we are generic over the RPC client that is used, /// the error is boxed and could be casted. #[derive(Debug, thiserror::Error)] @@ -120,6 +125,9 @@ pub enum RpcError { /// The requested URL is insecure. #[error("RPC error: insecure URL: {0}")] InsecureUrl(String), + /// The connection was lost and automatically reconnected. + #[error("RPC error: the connection was lost `{0}`; reconnect automatically initiated")] + DisconnectedWillReconnect(String), } impl RpcError { diff --git a/subxt/src/macros.rs b/subxt/src/macros.rs index a8d86ada69..47362f18ca 100644 --- a/subxt/src/macros.rs +++ b/subxt/src/macros.rs @@ -52,7 +52,21 @@ macro_rules! cfg_jsonrpsee_web { } } -pub(crate) use {cfg_feature, cfg_jsonrpsee, cfg_substrate_compat, cfg_unstable_light_client}; +#[allow(unused)] +macro_rules! cfg_reconnecting_rpc_client { + ($($item:item)*) => { + $( + #[cfg(all(feature = "unstable-reconnecting-rpc-client"))] + #[cfg_attr(docsrs, doc(cfg(feature = "unstable-reconnecting-rpc-client")))] + $item + )* + } +} + +pub(crate) use { + cfg_feature, cfg_jsonrpsee, cfg_reconnecting_rpc_client, cfg_substrate_compat, + cfg_unstable_light_client, +}; // Only used by light-client. #[allow(unused)]