add simple reconnecting rpc client

This commit is contained in:
Niklas Adolfsson
2024-03-21 15:57:56 +01:00
parent 9810406db2
commit 476afd9526
6 changed files with 383 additions and 109 deletions
Generated
+80 -33
View File
@@ -46,6 +46,16 @@ dependencies = [
"generic-array",
]
[[package]]
name = "again"
version = "0.1.2"
source = "git+https://github.com/polytope-labs/again?branch=develop#39dc8f18462ff6f2da6205076bd34a967ee1f010"
dependencies = [
"fluvio-wasm-timer",
"log",
"rand",
]
[[package]]
name = "ahash"
version = "0.7.8"
@@ -1597,6 +1607,21 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "fluvio-wasm-timer"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b768c170dc045fa587a8f948c91f9bcfb87f774930477c6215addf54317f137f"
dependencies = [
"futures",
"js-sys",
"parking_lot 0.11.2",
"pin-utils",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -2320,9 +2345,9 @@ dependencies = [
[[package]]
name = "jsonrpsee"
version = "0.22.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f3ae45a64cfc0882934f963be9431b2a165d667f53140358181f262aca0702"
checksum = "3cdbb7cb6f3ba28f5b212dd250ab4483105efc3e381f5c8bb90340f14f0a2cc3"
dependencies = [
"jsonrpsee-client-transport",
"jsonrpsee-core",
@@ -2333,9 +2358,9 @@ dependencies = [
[[package]]
name = "jsonrpsee-client-transport"
version = "0.22.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "455fc882e56f58228df2aee36b88a1340eafd707c76af2fa68cf94b37d461131"
checksum = "9ab2e14e727d2faf388c99d9ca5210566ed3b044f07d92c29c3611718d178380"
dependencies = [
"futures-channel",
"futures-util",
@@ -2356,9 +2381,9 @@ dependencies = [
[[package]]
name = "jsonrpsee-core"
version = "0.22.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b75568f4f9696e3a47426e1985b548e1a9fcb13372a5e320372acaf04aca30d1"
checksum = "71962a1c49af43adf81d337e4ebc93f3c915faf6eccaa14d74e255107dfd7723"
dependencies = [
"anyhow",
"async-lock 3.3.0",
@@ -2381,9 +2406,9 @@ dependencies = [
[[package]]
name = "jsonrpsee-http-client"
version = "0.22.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e7a95e346f55df84fb167b7e06470e196e7d5b9488a21d69c5d9732043ba7ba"
checksum = "8c13987da51270bda2c1c9b40c19be0fe9b225c7a0553963d8f17e683a50ce84"
dependencies = [
"async-trait",
"hyper",
@@ -2401,9 +2426,9 @@ dependencies = [
[[package]]
name = "jsonrpsee-types"
version = "0.22.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3467fd35feeee179f71ab294516bdf3a81139e7aeebdd860e46897c12e1a3368"
checksum = "1e53c72de6cd2ad6ac1aa6e848206ef8b736f92ed02354959130373dfa5b3cbd"
dependencies = [
"anyhow",
"beef",
@@ -2414,9 +2439,9 @@ dependencies = [
[[package]]
name = "jsonrpsee-ws-client"
version = "0.22.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68ca71e74983f624c0cb67828e480a981586074da8ad3a2f214c6a3f884edab9"
checksum = "c8a07ab8da9a283b906f6735ddd17d3680158bb72259e853441d1dd0167079ec"
dependencies = [
"http",
"jsonrpsee-client-transport",
@@ -2848,6 +2873,17 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@@ -2855,7 +2891,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
"parking_lot_core 0.9.9",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
]
[[package]]
@@ -2866,7 +2916,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"redox_syscall 0.4.1",
"smallvec",
"windows-targets 0.48.5",
]
@@ -3269,19 +3319,27 @@ dependencies = [
[[package]]
name = "reconnecting-jsonrpsee-ws-client"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea5cf7b021db88f1af45a9b2ecdbe5bc1c5cbebc146632269d572cdd435f5cf"
source = "git+https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client?branch=na-experimental-branch#919ff13a17e506013054ef6563d445ccd3575afe"
dependencies = [
"again",
"futures",
"jsonrpsee",
"serde_json",
"thiserror",
"tokio",
"tokio-retry",
"tokio-stream",
"tracing",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@@ -4111,7 +4169,7 @@ dependencies = [
"log",
"lru",
"no-std-net",
"parking_lot",
"parking_lot 0.12.1",
"pin-project",
"rand",
"rand_chacha",
@@ -4202,7 +4260,7 @@ dependencies = [
"merlin",
"parity-bip39",
"parity-scale-codec",
"parking_lot",
"parking_lot 0.12.1",
"paste",
"primitive-types",
"rand",
@@ -4307,7 +4365,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdbab8b61bd61d5f8625a0c75753b5d5a23be55d3445419acd42caf59cf6236b"
dependencies = [
"parity-scale-codec",
"parking_lot",
"parking_lot 0.12.1",
"sp-core",
"sp-externalities",
]
@@ -4391,7 +4449,7 @@ dependencies = [
"hash-db",
"log",
"parity-scale-codec",
"parking_lot",
"parking_lot 0.12.1",
"rand",
"smallvec",
"sp-core",
@@ -4449,7 +4507,7 @@ dependencies = [
"memory-db",
"nohash-hasher",
"parity-scale-codec",
"parking_lot",
"parking_lot 0.12.1",
"rand",
"scale-info",
"schnellru",
@@ -4919,17 +4977,6 @@ dependencies = [
"syn 2.0.53",
]
[[package]]
name = "tokio-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [
"pin-project",
"rand",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
+2 -2
View File
@@ -106,7 +106,7 @@ subxt-metadata = { workspace = true, features = ["std"] }
subxt-lightclient = { workspace = true, optional = true, default-features = false }
# Reconnecting jsonrpc ws client
reconnecting-jsonrpsee-ws-client = { version = "0.3", optional = true }
reconnecting-jsonrpsee-ws-client = { git = "https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client", branch = "na-experimental-branch", optional = true }
# For parsing urls to disallow insecure schemes
url = { workspace = true }
@@ -153,4 +153,4 @@ features = ["default", "substrate-compat", "unstable-light-client"]
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["default", "substrate-compat", "unstable-light-client"]
features = ["default", "substrate-compat", "unstable-light-client"]
+15 -20
View File
@@ -6,9 +6,10 @@
#![allow(missing_docs)]
use std::sync::Arc;
use std::time::Duration;
use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig};
use subxt::backend::rpc::reconnecting_rpc_client::{CallRetryPolicy, Client, RetryPolicy};
use subxt::backend::rpc::RpcClient;
use subxt::error::{Error, RpcError};
use subxt::{OnlineClient, PolkadotConfig};
@@ -22,25 +23,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff
//
// This API is "iterator-like" so one could limit it to only
// reconnect x times and then quit.
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
// Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds
// then disconnect.
//
// This is just a way to ensure that the connection isn't idle if no message is sent that often
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// There are other configurations as well that can be found here:
// <https://docs.rs/reconnecting-jsonrpsee-ws-client/latest/reconnecting_jsonrpsee_ws_client/struct.ClientBuilder.html>
.build("ws://localhost:9944".to_string())
.await?;
let rpc = Arc::new(
Client::builder()
// Reconnect with exponential backoff
.retry_policy_for_reconnect(
RetryPolicy::exponential(Duration::from_millis(100))
.with_max_delay(Duration::from_secs(10))
.with_max_retries(usize::MAX),
)
// Just an example how to override the default retry policy for individual RPC calls.
.retry_policy_for_method("foo", CallRetryPolicy::Retry)
.build("ws://localhost:9944".to_string())
.await?,
);
let api: OnlineClient<PolkadotConfig> =
OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?;
+2 -2
View File
@@ -65,8 +65,8 @@ crate::macros::cfg_unstable_light_client! {
}
crate::macros::cfg_reconnecting_rpc_client! {
mod reconnecting_jsonrpsee_impl;
pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client;
/// reconnecting rpc client.
pub mod reconnecting_rpc_client;
}
mod rpc_client;
@@ -1,52 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::SubscriptionId;
use serde_json::value::RawValue;
impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request_raw(method.to_string(), params)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe_raw(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
@@ -0,0 +1,284 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
pub use reconnecting_jsonrpsee_ws_client::{CallRetryPolicy, RetryPolicy};
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::{Client as InnerClient, SubscriptionId};
use serde_json::value::RawValue;
use std::collections::HashMap;
use std::time::Duration;
const NO_RETRY_ON_RECONNECT: [&str; 12] = [
// Subscription with side-effects.
"author_submitAndWatchExtrinsic",
// chainHead doesn't work without the correct followID.
"chainHead_unstable_body",
"chainHead_unstable_call",
"chainHead_unstable_continue",
"chainHead_unstable_follow",
"chainHead_unstable_header",
"chainHead_unstable_stopOperation",
"chainHead_unstable_storage",
"chainHead_unstable_unfollow",
"chainHead_unstable_unpin",
// Subscription with side-effects.
"transactionWatch_unstable_submitAndWatch",
"transactionWatch_unstable_unwatch",
];
const RETRY_ON_RECONNECT: [&str; 111] = [
"account_nextIndex",
"author_hasKey",
"author_hasSessionKeys",
"author_insertKey",
"author_pendingExtrinsics",
"author_removeExtrinsic",
"author_rotateKeys",
"author_submitExtrinsic",
"author_unwatchExtrinsic",
"babe_epochAuthorship",
"beefy_getFinalizedHead",
"beefy_subscribeJustifications",
"beefy_unsubscribeJustifications",
"chainSpec_v1_chainName",
"chainSpec_v1_genesisHash",
"chainSpec_v1_properties",
"chain_getBlock",
"chain_getBlockHash",
"chain_getFinalisedHead",
"chain_getFinalizedHead",
"chain_getHead",
"chain_getHeader",
"chain_getRuntimeVersion",
"chain_subscribeAllHeads",
"chain_subscribeFinalisedHeads",
"chain_subscribeFinalizedHeads",
"chain_subscribeNewHead",
"chain_subscribeNewHeads",
"chain_subscribeRuntimeVersion",
"chain_unsubscribeAllHeads",
"chain_unsubscribeFinalisedHeads",
"chain_unsubscribeFinalizedHeads",
"chain_unsubscribeNewHead",
"chain_unsubscribeNewHeads",
"chain_unsubscribeRuntimeVersion",
"childstate_getKeys",
"childstate_getKeysPaged",
"childstate_getKeysPagedAt",
"childstate_getStorage",
"childstate_getStorageEntries",
"childstate_getStorageHash",
"childstate_getStorageSize",
"dev_getBlockStats",
"grandpa_proveFinality",
"grandpa_roundState",
"grandpa_subscribeJustifications",
"grandpa_unsubscribeJustifications",
"mmr_generateProof",
"mmr_root",
"mmr_verifyProof",
"mmr_verifyProofStateless",
"offchain_localStorageGet",
"offchain_localStorageSet",
"payment_queryFeeDetails",
"payment_queryInfo",
"rpc_methods",
"state_call",
"state_callAt",
"state_getChildReadProof",
"state_getKeys",
"state_getKeysPaged",
"state_getKeysPagedAt",
"state_getMetadata",
"state_getPairs",
"state_getReadProof",
"state_getRuntimeVersion",
"state_getStorage",
"state_getStorageAt",
"state_getStorageHash",
"state_getStorageHashAt",
"state_getStorageSize",
"state_getStorageSizeAt",
"state_queryStorage",
"state_queryStorageAt",
"state_subscribeRuntimeVersion",
"state_subscribeStorage",
"state_traceBlock",
"state_trieMigrationStatus",
"state_unsubscribeRuntimeVersion",
"state_unsubscribeStorage",
"statement_broadcasts",
"statement_dump",
"statement_posted",
"statement_postedClear",
"statement_remove",
"statement_submit",
"subscribe_newHead",
"sync_state_genSyncSpec",
"system_accountNextIndex",
"system_addLogFilter",
"system_addReservedPeer",
"system_chain",
"system_chainType",
"system_dryRun",
"system_dryRunAt",
"system_health",
"system_localListenAddresses",
"system_localPeerId",
"system_name",
"system_nodeRoles",
"system_peers",
"system_properties",
"system_removeReservedPeer",
"system_reservedPeers",
"system_resetLogFilter",
"system_syncState",
"system_unstable_networkState",
"system_version",
"transaction_v1_broadcast",
"transaction_v1_stop",
"unsubscribe_newHead",
];
/// Reconnecting rpc client builder.
pub struct Builder {
retry_policy: RetryPolicy,
methods: HashMap<&'static str, CallRetryPolicy>,
}
impl Builder {
/// Create a new builder.
pub fn new() -> Self {
let mut methods = HashMap::new();
for method in NO_RETRY_ON_RECONNECT.into_iter() {
methods.insert(method, CallRetryPolicy::Drop);
}
for method in RETRY_ON_RECONNECT.into_iter() {
methods.insert(method, CallRetryPolicy::Retry);
}
Self {
retry_policy: RetryPolicy::exponential(Duration::from_millis(10))
.with_max_delay(Duration::from_secs(30)),
methods,
}
}
/// Configure custom retry policy for a specific rpc call/subscription.
pub fn retry_policy_for_method(
mut self,
method: &'static str,
policy: CallRetryPolicy,
) -> Self {
self.methods.insert(method, policy);
self
}
/// Set retry policy when reconnecting.
pub fn retry_policy_for_reconnect(self, retry_policy: RetryPolicy) -> Self {
Self {
retry_policy,
methods: self.methods,
}
}
/// Build a new rpc client i.e, connect.
pub async fn build(self, url: String) -> Result<Client, RpcError> {
let client = InnerClient::builder()
.retry_policy(self.retry_policy)
.build(url)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
Ok(Client {
inner: client,
methods: self.methods,
})
}
}
/// Reconnecting rpc client.
pub struct Client {
inner: InnerClient,
methods: HashMap<&'static str, CallRetryPolicy>,
}
impl Client {
/// Create a builder.
pub fn builder() -> Builder {
Builder::new()
}
/// Future that returns when the reconnection has started.
pub async fn on_reconnect(&self) {
self.inner.on_reconnect().await
}
/// Counter to determine how many times the client has reconnected.
pub fn reconnect_count(&self) -> usize {
self.inner.reconnect_count()
}
fn get_method_retry_policy(&self, method: &str) -> CallRetryPolicy {
if let Some(policy) = self.methods.get(method) {
*policy
} else {
tracing::warn!("unknown method `{method}`; setting retry policy to drop on reconnect");
CallRetryPolicy::Drop
}
}
}
impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
let retry_policy = self.get_method_retry_policy(method);
self.inner
.request_raw_with_policy(method.to_string(), params, retry_policy)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let retry_policy = self.get_method_retry_policy(sub);
let sub = self
.inner
.subscribe_raw_with_policy(sub.to_string(), params, unsub.to_string(), retry_policy)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}