diff --git a/Cargo.lock b/Cargo.lock index 27beed5c03..9b4f0acda4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "again" +version = "0.1.2" +source = "git+https://github.com/polytope-labs/again?branch=develop#39dc8f18462ff6f2da6205076bd34a967ee1f010" +dependencies = [ + "fluvio-wasm-timer", + "log", + "rand", +] + [[package]] name = "ahash" version = "0.7.8" @@ -1597,6 +1607,21 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fluvio-wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b768c170dc045fa587a8f948c91f9bcfb87f774930477c6215addf54317f137f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2320,9 +2345,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f3ae45a64cfc0882934f963be9431b2a165d667f53140358181f262aca0702" +checksum = "3cdbb7cb6f3ba28f5b212dd250ab4483105efc3e381f5c8bb90340f14f0a2cc3" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -2333,9 +2358,9 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455fc882e56f58228df2aee36b88a1340eafd707c76af2fa68cf94b37d461131" +checksum = "9ab2e14e727d2faf388c99d9ca5210566ed3b044f07d92c29c3611718d178380" dependencies = [ "futures-channel", "futures-util", @@ -2356,9 +2381,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b75568f4f9696e3a47426e1985b548e1a9fcb13372a5e320372acaf04aca30d1" +checksum = "71962a1c49af43adf81d337e4ebc93f3c915faf6eccaa14d74e255107dfd7723" dependencies = [ "anyhow", "async-lock 3.3.0", @@ -2381,9 +2406,9 @@ dependencies = [ [[package]] name = "jsonrpsee-http-client" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e7a95e346f55df84fb167b7e06470e196e7d5b9488a21d69c5d9732043ba7ba" +checksum = "8c13987da51270bda2c1c9b40c19be0fe9b225c7a0553963d8f17e683a50ce84" dependencies = [ "async-trait", "hyper", @@ -2401,9 +2426,9 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3467fd35feeee179f71ab294516bdf3a81139e7aeebdd860e46897c12e1a3368" +checksum = "1e53c72de6cd2ad6ac1aa6e848206ef8b736f92ed02354959130373dfa5b3cbd" dependencies = [ "anyhow", "beef", @@ -2414,9 +2439,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68ca71e74983f624c0cb67828e480a981586074da8ad3a2f214c6a3f884edab9" +checksum = "c8a07ab8da9a283b906f6735ddd17d3680158bb72259e853441d1dd0167079ec" dependencies = [ "http", "jsonrpsee-client-transport", @@ -2848,6 +2873,17 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -2855,7 +2891,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -2866,7 +2916,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets 0.48.5", ] @@ -3269,19 +3319,27 @@ dependencies = [ [[package]] name = "reconnecting-jsonrpsee-ws-client" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea5cf7b021db88f1af45a9b2ecdbe5bc1c5cbebc146632269d572cdd435f5cf" +source = "git+https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client?branch=na-experimental-branch#919ff13a17e506013054ef6563d445ccd3575afe" dependencies = [ + "again", "futures", "jsonrpsee", "serde_json", "thiserror", "tokio", - "tokio-retry", "tokio-stream", "tracing", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -4111,7 +4169,7 @@ dependencies = [ "log", "lru", "no-std-net", - "parking_lot", + "parking_lot 0.12.1", "pin-project", "rand", "rand_chacha", @@ -4202,7 +4260,7 @@ dependencies = [ "merlin", "parity-bip39", "parity-scale-codec", - "parking_lot", + "parking_lot 0.12.1", "paste", "primitive-types", "rand", @@ -4307,7 +4365,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdbab8b61bd61d5f8625a0c75753b5d5a23be55d3445419acd42caf59cf6236b" dependencies = [ "parity-scale-codec", - "parking_lot", + "parking_lot 0.12.1", "sp-core", "sp-externalities", ] @@ -4391,7 +4449,7 @@ dependencies = [ "hash-db", "log", "parity-scale-codec", - "parking_lot", + "parking_lot 0.12.1", "rand", "smallvec", "sp-core", @@ -4449,7 +4507,7 @@ dependencies = [ "memory-db", "nohash-hasher", "parity-scale-codec", - "parking_lot", + "parking_lot 0.12.1", "rand", "scale-info", "schnellru", @@ -4919,17 +4977,6 @@ dependencies = [ "syn 2.0.53", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index f91ec708a0..cdee8a3c2d 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -106,7 +106,7 @@ subxt-metadata = { workspace = true, features = ["std"] } subxt-lightclient = { workspace = true, optional = true, default-features = false } # Reconnecting jsonrpc ws client -reconnecting-jsonrpsee-ws-client = { version = "0.3", optional = true } +reconnecting-jsonrpsee-ws-client = { git = "https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client", branch = "na-experimental-branch", optional = true } # For parsing urls to disallow insecure schemes url = { workspace = true } @@ -153,4 +153,4 @@ features = ["default", "substrate-compat", "unstable-light-client"] rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] -features = ["default", "substrate-compat", "unstable-light-client"] \ No newline at end of file +features = ["default", "substrate-compat", "unstable-light-client"] diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index b21be899f6..5f10ad38be 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -6,9 +6,10 @@ #![allow(missing_docs)] +use std::sync::Arc; use std::time::Duration; -use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; +use subxt::backend::rpc::reconnecting_rpc_client::{CallRetryPolicy, Client, RetryPolicy}; use subxt::backend::rpc::RpcClient; use subxt::error::{Error, RpcError}; use subxt::{OnlineClient, PolkadotConfig}; @@ -22,25 +23,19 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); // Create a new client with with a reconnecting RPC client. - let rpc = Client::builder() - // Reconnect with exponential backoff - // - // This API is "iterator-like" so one could limit it to only - // reconnect x times and then quit. - .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) - // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds - // then disconnect. - // - // This is just a way to ensure that the connection isn't idle if no message is sent that often - .enable_ws_ping( - PingConfig::new() - .ping_interval(Duration::from_secs(6)) - .inactive_limit(Duration::from_secs(30)), - ) - // There are other configurations as well that can be found here: - // - .build("ws://localhost:9944".to_string()) - .await?; + let rpc = Arc::new( + Client::builder() + // Reconnect with exponential backoff + .retry_policy_for_reconnect( + RetryPolicy::exponential(Duration::from_millis(100)) + .with_max_delay(Duration::from_secs(10)) + .with_max_retries(usize::MAX), + ) + // Just an example how to override the default retry policy for individual RPC calls. + .retry_policy_for_method("foo", CallRetryPolicy::Retry) + .build("ws://localhost:9944".to_string()) + .await?, + ); let api: OnlineClient = OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?; diff --git a/subxt/src/backend/rpc/mod.rs b/subxt/src/backend/rpc/mod.rs index 453fcf5a7f..bec5d9d86e 100644 --- a/subxt/src/backend/rpc/mod.rs +++ b/subxt/src/backend/rpc/mod.rs @@ -65,8 +65,8 @@ crate::macros::cfg_unstable_light_client! { } crate::macros::cfg_reconnecting_rpc_client! { - mod reconnecting_jsonrpsee_impl; - pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client; + /// reconnecting rpc client. + pub mod reconnecting_rpc_client; } mod rpc_client; diff --git a/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs b/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs deleted file mode 100644 index da37b267e0..0000000000 --- a/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2019-2023 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; -use crate::error::RpcError; -use futures::{FutureExt, StreamExt, TryStreamExt}; -use reconnecting_jsonrpsee_ws_client::SubscriptionId; -use serde_json::value::RawValue; - -impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { - fn request_raw<'a>( - &'a self, - method: &'a str, - params: Option>, - ) -> RawRpcFuture<'a, Box> { - async { - self.request_raw(method.to_string(), params) - .await - .map_err(|e| RpcError::ClientError(Box::new(e))) - } - .boxed() - } - - fn subscribe_raw<'a>( - &'a self, - sub: &'a str, - params: Option>, - unsub: &'a str, - ) -> RawRpcFuture<'a, RawRpcSubscription> { - async { - let sub = self - .subscribe_raw(sub.to_string(), params, unsub.to_string()) - .await - .map_err(|e| RpcError::ClientError(Box::new(e)))?; - - let id = match sub.id() { - SubscriptionId::Num(n) => n.to_string(), - SubscriptionId::Str(s) => s.to_string(), - }; - let stream = sub - .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) - .boxed(); - - Ok(RawRpcSubscription { - stream, - id: Some(id), - }) - } - .boxed() - } -} diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client.rs b/subxt/src/backend/rpc/reconnecting_rpc_client.rs new file mode 100644 index 0000000000..a0dfac3968 --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_rpc_client.rs @@ -0,0 +1,284 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +pub use reconnecting_jsonrpsee_ws_client::{CallRetryPolicy, RetryPolicy}; + +use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; +use crate::error::RpcError; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use reconnecting_jsonrpsee_ws_client::{Client as InnerClient, SubscriptionId}; +use serde_json::value::RawValue; +use std::collections::HashMap; +use std::time::Duration; + +const NO_RETRY_ON_RECONNECT: [&str; 12] = [ + // Subscription with side-effects. + "author_submitAndWatchExtrinsic", + // chainHead doesn't work without the correct followID. + "chainHead_unstable_body", + "chainHead_unstable_call", + "chainHead_unstable_continue", + "chainHead_unstable_follow", + "chainHead_unstable_header", + "chainHead_unstable_stopOperation", + "chainHead_unstable_storage", + "chainHead_unstable_unfollow", + "chainHead_unstable_unpin", + // Subscription with side-effects. + "transactionWatch_unstable_submitAndWatch", + "transactionWatch_unstable_unwatch", +]; + +const RETRY_ON_RECONNECT: [&str; 111] = [ + "account_nextIndex", + "author_hasKey", + "author_hasSessionKeys", + "author_insertKey", + "author_pendingExtrinsics", + "author_removeExtrinsic", + "author_rotateKeys", + "author_submitExtrinsic", + "author_unwatchExtrinsic", + "babe_epochAuthorship", + "beefy_getFinalizedHead", + "beefy_subscribeJustifications", + "beefy_unsubscribeJustifications", + "chainSpec_v1_chainName", + "chainSpec_v1_genesisHash", + "chainSpec_v1_properties", + "chain_getBlock", + "chain_getBlockHash", + "chain_getFinalisedHead", + "chain_getFinalizedHead", + "chain_getHead", + "chain_getHeader", + "chain_getRuntimeVersion", + "chain_subscribeAllHeads", + "chain_subscribeFinalisedHeads", + "chain_subscribeFinalizedHeads", + "chain_subscribeNewHead", + "chain_subscribeNewHeads", + "chain_subscribeRuntimeVersion", + "chain_unsubscribeAllHeads", + "chain_unsubscribeFinalisedHeads", + "chain_unsubscribeFinalizedHeads", + "chain_unsubscribeNewHead", + "chain_unsubscribeNewHeads", + "chain_unsubscribeRuntimeVersion", + "childstate_getKeys", + "childstate_getKeysPaged", + "childstate_getKeysPagedAt", + "childstate_getStorage", + "childstate_getStorageEntries", + "childstate_getStorageHash", + "childstate_getStorageSize", + "dev_getBlockStats", + "grandpa_proveFinality", + "grandpa_roundState", + "grandpa_subscribeJustifications", + "grandpa_unsubscribeJustifications", + "mmr_generateProof", + "mmr_root", + "mmr_verifyProof", + "mmr_verifyProofStateless", + "offchain_localStorageGet", + "offchain_localStorageSet", + "payment_queryFeeDetails", + "payment_queryInfo", + "rpc_methods", + "state_call", + "state_callAt", + "state_getChildReadProof", + "state_getKeys", + "state_getKeysPaged", + "state_getKeysPagedAt", + "state_getMetadata", + "state_getPairs", + "state_getReadProof", + "state_getRuntimeVersion", + "state_getStorage", + "state_getStorageAt", + "state_getStorageHash", + "state_getStorageHashAt", + "state_getStorageSize", + "state_getStorageSizeAt", + "state_queryStorage", + "state_queryStorageAt", + "state_subscribeRuntimeVersion", + "state_subscribeStorage", + "state_traceBlock", + "state_trieMigrationStatus", + "state_unsubscribeRuntimeVersion", + "state_unsubscribeStorage", + "statement_broadcasts", + "statement_dump", + "statement_posted", + "statement_postedClear", + "statement_remove", + "statement_submit", + "subscribe_newHead", + "sync_state_genSyncSpec", + "system_accountNextIndex", + "system_addLogFilter", + "system_addReservedPeer", + "system_chain", + "system_chainType", + "system_dryRun", + "system_dryRunAt", + "system_health", + "system_localListenAddresses", + "system_localPeerId", + "system_name", + "system_nodeRoles", + "system_peers", + "system_properties", + "system_removeReservedPeer", + "system_reservedPeers", + "system_resetLogFilter", + "system_syncState", + "system_unstable_networkState", + "system_version", + "transaction_v1_broadcast", + "transaction_v1_stop", + "unsubscribe_newHead", +]; + +/// Reconnecting rpc client builder. +pub struct Builder { + retry_policy: RetryPolicy, + methods: HashMap<&'static str, CallRetryPolicy>, +} + +impl Builder { + /// Create a new builder. + pub fn new() -> Self { + let mut methods = HashMap::new(); + + for method in NO_RETRY_ON_RECONNECT.into_iter() { + methods.insert(method, CallRetryPolicy::Drop); + } + + for method in RETRY_ON_RECONNECT.into_iter() { + methods.insert(method, CallRetryPolicy::Retry); + } + + Self { + retry_policy: RetryPolicy::exponential(Duration::from_millis(10)) + .with_max_delay(Duration::from_secs(30)), + methods, + } + } + + /// Configure custom retry policy for a specific rpc call/subscription. + pub fn retry_policy_for_method( + mut self, + method: &'static str, + policy: CallRetryPolicy, + ) -> Self { + self.methods.insert(method, policy); + + self + } + + /// Set retry policy when reconnecting. + pub fn retry_policy_for_reconnect(self, retry_policy: RetryPolicy) -> Self { + Self { + retry_policy, + methods: self.methods, + } + } + + /// Build a new rpc client i.e, connect. + pub async fn build(self, url: String) -> Result { + let client = InnerClient::builder() + .retry_policy(self.retry_policy) + .build(url) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + Ok(Client { + inner: client, + methods: self.methods, + }) + } +} + +/// Reconnecting rpc client. +pub struct Client { + inner: InnerClient, + methods: HashMap<&'static str, CallRetryPolicy>, +} + +impl Client { + /// Create a builder. + pub fn builder() -> Builder { + Builder::new() + } + + /// Future that returns when the reconnection has started. + pub async fn on_reconnect(&self) { + self.inner.on_reconnect().await + } + + /// Counter to determine how many times the client has reconnected. + pub fn reconnect_count(&self) -> usize { + self.inner.reconnect_count() + } + + fn get_method_retry_policy(&self, method: &str) -> CallRetryPolicy { + if let Some(policy) = self.methods.get(method) { + *policy + } else { + tracing::warn!("unknown method `{method}`; setting retry policy to drop on reconnect"); + CallRetryPolicy::Drop + } + } +} + +impl RpcClientT for Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + async { + let retry_policy = self.get_method_retry_policy(method); + self.inner + .request_raw_with_policy(method.to_string(), params, retry_policy) + .await + .map_err(|e| RpcError::ClientError(Box::new(e))) + } + .boxed() + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + async { + let retry_policy = self.get_method_retry_policy(sub); + let sub = self + .inner + .subscribe_raw_with_policy(sub.to_string(), params, unsub.to_string(), retry_policy) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let id = match sub.id() { + SubscriptionId::Num(n) => n.to_string(), + SubscriptionId::Str(s) => s.to_string(), + }; + let stream = sub + .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) + .boxed(); + + Ok(RawRpcSubscription { + stream, + id: Some(id), + }) + } + .boxed() + } +}