rpc: add full support reconnecting rpc client (#1505)

* add simple reconnecting rpc client

* initial retryable calls

* add reconnecting backend

* add reconnecting example for unstable backend

* add todo what isn't working

* FollowStream: restart on reconn

* naive fix: fetch sub_id in stream_headers

* cleanup

* remove resubscribe APIs

* cleanup and remove many wrapper streams

* remove retry backend

* legacy rpc: make it retryable

* unstable rpc: make it retryable

* fix nits

* support wasm as well

* remove deadcode

* address grumbles

* revert rpc methods

* don't create a subscription per block

* get rid off retry logic in subxt rpc

* Update subxt/Cargo.toml

* Update subxt/src/backend/legacy/mod.rs

* Update subxt/src/backend/legacy/mod.rs

* remove outdated comments

* fix bad merge

* Fix reconnecting RPC client and update dependencies

* add back retry logic and remove `finito`

* fix nits

* cleanup

* add hack for race when reconnecting

* backend: emit Stop event DisconnectWillRecoonect

* merge reconnecting client examples

* add fn retry_stream

* cleanup

* add all features from reconnecting-rpc-client

* fix build

* remove needless retry for fetch_storage

* StorageFetchDescendantKeysStream handle disconnect err

* dont retry transactions

* fetch subscription ID from FollowStreamMsg

* fix nits

* Update subxt/src/backend/legacy/mod.rs

* Update subxt/src/backend/legacy/mod.rs

* add reconn to StorageItems stream

* StorageFetchDescendantKeysStreamchore: retry storage call

* RetryStream: emit DisconnectWillReconnect msg

* runtime subscriptions ignore DisconnectWillReconn

* Update subxt/examples/setup_reconnecting_rpc_client.rs

* Update subxt/src/client/online_client.rs

* Update subxt/src/client/online_client.rs

* Add custom stream wrapper for finalized blocks

* add missing retry block

* clippy

* clippy again

* cleanup

* remove duplicate logic

* fix more grumbles

* Update subxt/examples/setup_reconnecting_rpc_client.rs

Co-authored-by: James Wilson <james@jsdw.me>

* simplify the example

* remove pin-project dep

* remove duplicate retry logic

* remove extra code

* specify trait bounds for retry api

* simplify the example

* fix weird Poll::Pending return

* fix nit in poll impl

* remove needless paths

* make retry_stream pub and add doc examples

* Update subxt/src/backend/utils.rs

---------

Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Niklas Adolfsson
2024-05-08 15:12:54 +02:00
committed by GitHub
parent f034ac486c
commit bec896d91a
19 changed files with 1231 additions and 363 deletions
Generated
+13
View File
@@ -2389,6 +2389,7 @@ dependencies = [
"jsonrpsee-core",
"jsonrpsee-http-client",
"jsonrpsee-types",
"jsonrpsee-wasm-client",
"jsonrpsee-ws-client",
]
@@ -2472,6 +2473,17 @@ dependencies = [
"thiserror",
]
[[package]]
name = "jsonrpsee-wasm-client"
version = "0.22.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f448d8eacd945cc17b6c0b42c361531ca36a962ee186342a97cdb8fca679cd77"
dependencies = [
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-types",
]
[[package]]
name = "jsonrpsee-ws-client"
version = "0.22.5"
@@ -3397,6 +3409,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"wasm-bindgen-futures",
]
[[package]]
+7 -5
View File
@@ -28,7 +28,8 @@ native = [
"jsonrpsee?/async-client",
"jsonrpsee?/client-ws-transport-native-tls",
"subxt-lightclient?/native",
"tokio-util"
"tokio-util",
"reconnecting-jsonrpsee-ws-client?/native",
]
# Enable this for web/wasm builds.
@@ -39,7 +40,8 @@ web = [
"getrandom/js",
"subxt-lightclient?/web",
"subxt-macro/web",
"instant/wasm-bindgen"
"instant/wasm-bindgen",
"reconnecting-jsonrpsee-ws-client?/web",
]
# Enable this to use the reconnecting rpc client
@@ -99,7 +101,7 @@ subxt-metadata = { workspace = true, features = ["std"] }
subxt-lightclient = { workspace = true, optional = true, default-features = false }
# Reconnecting jsonrpc ws client
reconnecting-jsonrpsee-ws-client = { version = "0.4", optional = true }
reconnecting-jsonrpsee-ws-client = { version = "0.4", optional = true, default-features = false }
# For parsing urls to disallow insecure schemes
url = { workspace = true }
@@ -137,8 +139,8 @@ path = "examples/light_client_local_node.rs"
required-features = ["unstable-light-client", "jsonrpsee", "native"]
[[example]]
name = "reconnecting_rpc_client"
path = "examples/reconnecting_rpc_client.rs"
name = "setup_reconnecting_rpc_client"
path = "examples/setup_reconnecting_rpc_client.rs"
required-features = ["unstable-reconnecting-rpc-client"]
[package.metadata.docs.rs]
-73
View File
@@ -1,73 +0,0 @@
//! Example to utilize the `reconnecting rpc client` in subxt
//! which hidden behind behind `--feature unstable-reconnecting-rpc-client`
//!
//! To utilize full logs from the RPC client use:
//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"`
#![allow(missing_docs)]
use std::time::Duration;
use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig};
use subxt::backend::rpc::RpcClient;
use subxt::error::{Error, RpcError};
use subxt::{OnlineClient, PolkadotConfig};
// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff
//
// This API is "iterator-like" so one could limit it to only
// reconnect x times and then quit.
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
// Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds
// then disconnect.
//
// This is just a way to ensure that the connection isn't idle if no message is sent that often
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// There are other configurations as well that can be found here:
// <https://docs.rs/reconnecting-jsonrpsee-ws-client/latest/reconnecting_jsonrpsee_ws_client/struct.ClientBuilder.html>
.build("ws://localhost:9944".to_string())
.await?;
let api: OnlineClient<PolkadotConfig> =
OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?;
// Subscribe to all finalized blocks:
let mut blocks_sub = api.blocks().subscribe_finalized().await?;
// For each block, print a bunch of information about it:
while let Some(block) = blocks_sub.next().await {
let block = match block {
Ok(b) => b,
Err(Error::Rpc(RpcError::DisconnectedWillReconnect(err))) => {
println!("{err}");
continue;
}
Err(e) => {
return Err(e.into());
}
};
let block_number = block.header().number;
let block_hash = block.hash();
println!("Block #{block_number} ({block_hash})");
}
println!("RPC client reconnected `{}` times", rpc.reconnect_count());
Ok(())
}
@@ -0,0 +1,102 @@
//! Example to utilize the `reconnecting rpc client` in subxt
//! which hidden behind behind `--feature unstable-reconnecting-rpc-client`
//!
//! To utilize full logs from the RPC client use:
//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"`
#![allow(missing_docs)]
use std::time::Duration;
use futures::StreamExt;
use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff};
use subxt::{OnlineClient, PolkadotConfig};
// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff
//
// This API is "iterator-like" and we use `take` to limit the number of retries.
.retry_policy(
ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(10))
.take(3),
)
// There are other configurations as well that can be found at [`reconnecting_rpc_client::ClientBuilder`].
.build("ws://localhost:9944".to_string())
.await?;
// If you want to use the unstable backend with the reconnecting RPC client, you can do so like this:
//
// ```
// use subxt::backend::unstable::UnstableBackend;
// use subxt::OnlineClient;
//
// let (backend, mut driver) = UnstableBackend::builder().build(RpcClient::new(rpc.clone()));
// tokio::spawn(async move {
// while let Some(val) = driver.next().await {
// if let Err(e) = val {
// eprintln!("Error driving unstable backend: {e}; terminating client");
// }
// }
// });
// let api: OnlineClient<PolkadotConfig> = OnlineClient::from_backend(Arc::new(backend)).await?;
// ```
let api: OnlineClient<PolkadotConfig> = OnlineClient::from_rpc_client(rpc.clone()).await?;
// Optionally print if the RPC client reconnects.
let rpc2 = rpc.clone();
tokio::spawn(async move {
loop {
// The connection was lost and the client is trying to reconnect.
let reconnected = rpc2.reconnect_initiated().await;
let now = std::time::Instant::now();
// The connection was re-established.
reconnected.await;
println!(
"RPC client reconnection took `{}s`",
now.elapsed().as_secs()
);
}
});
// Run for at most 100 blocks and print a bunch of information about it.
//
// The subscription is automatically re-started when the RPC client has reconnected.
// You can test that by stopping the polkadot node and restarting it.
let mut blocks_sub = api.blocks().subscribe_finalized().await?.take(100);
while let Some(block) = blocks_sub.next().await {
let block = match block {
Ok(b) => b,
Err(e) => {
// This can only happen on the legacy backend and the unstable backend
// will handle this internally.
if e.is_disconnected_will_reconnect() {
println!("The RPC connection was lost and we may have missed a few blocks");
continue;
}
return Err(e.into());
}
};
let block_number = block.number();
let block_hash = block.hash();
println!("Block #{block_number} ({block_hash})");
}
println!("RPC client reconnected `{}` times", rpc.reconnect_count());
Ok(())
}
+158 -76
View File
@@ -8,10 +8,12 @@
pub mod rpc_methods;
use self::rpc_methods::TransactionStatus as RpcTransactionStatus;
use crate::backend::utils::{retry, retry_stream};
use crate::backend::{
rpc::RpcClient, Backend, BlockRef, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults,
TransactionStatus,
};
use crate::error::RpcError;
use crate::{config::Header, Config, Error};
use async_trait::async_trait;
use futures::{future, future::Either, stream, Future, FutureExt, Stream, StreamExt};
@@ -62,12 +64,21 @@ impl<T: Config> LegacyBackendBuilder<T> {
}
/// The legacy backend.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct LegacyBackend<T> {
storage_page_size: u32,
methods: LegacyRpcMethods<T>,
}
impl<T> Clone for LegacyBackend<T> {
fn clone(&self) -> LegacyBackend<T> {
LegacyBackend {
storage_page_size: self.storage_page_size,
methods: self.methods.clone(),
}
}
}
impl<T: Config> LegacyBackend<T> {
/// Configure and construct an [`LegacyBackend`].
pub fn builder() -> LegacyBackendBuilder<T> {
@@ -84,24 +95,28 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
keys: Vec<Vec<u8>>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
let methods = self.methods.clone();
retry(|| async {
let keys = keys.clone();
let methods = self.methods.clone();
// For each key, return it + a future to get the result.
let iter = keys.into_iter().map(move |key| {
let methods = methods.clone();
async move {
let res = methods.state_get_storage(&key, Some(at)).await?;
Ok(res.map(|value| StorageResponse { key, value }))
}
});
// For each key, return it + a future to get the result.
let iter = keys.into_iter().map(move |key| {
let methods = methods.clone();
async move {
let res = methods.state_get_storage(&key, Some(at)).await?;
Ok(res.map(|value| StorageResponse { key, value }))
}
});
let s = stream::iter(iter)
// Resolve the future
.then(|fut| fut)
// Filter any Options out (ie if we didn't find a value at some key we return nothing for it).
.filter_map(|r| future::ready(r.transpose()));
let s = stream::iter(iter)
// Resolve the future
.then(|fut| fut)
// Filter any Options out (ie if we didn't find a value at some key we return nothing for it).
.filter_map(|r| future::ready(r.transpose()));
Ok(StreamOf(Box::pin(s)))
Ok(StreamOf(Box::pin(s)))
})
.await
}
async fn storage_fetch_descendant_keys(
@@ -158,99 +173,159 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
}
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
self.methods.genesis_hash().await
retry(|| self.methods.genesis_hash()).await
}
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
self.methods.chain_get_header(Some(at)).await
retry(|| self.methods.chain_get_header(Some(at))).await
}
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
let Some(details) = self.methods.chain_get_block(Some(at)).await? else {
return Ok(None);
};
Ok(Some(
details.block.extrinsics.into_iter().map(|b| b.0).collect(),
))
retry(|| async {
let Some(details) = self.methods.chain_get_block(Some(at)).await? else {
return Ok(None);
};
Ok(Some(
details.block.extrinsics.into_iter().map(|b| b.0).collect(),
))
})
.await
}
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error> {
let hash = self.methods.chain_get_finalized_head().await?;
Ok(BlockRef::from_hash(hash))
retry(|| async {
let hash = self.methods.chain_get_finalized_head().await?;
Ok(BlockRef::from_hash(hash))
})
.await
}
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error> {
let details = self.methods.state_get_runtime_version(None).await?;
Ok(RuntimeVersion {
spec_version: details.spec_version,
transaction_version: details.transaction_version,
retry(|| async {
let details = self.methods.state_get_runtime_version(None).await?;
Ok(RuntimeVersion {
spec_version: details.spec_version,
transaction_version: details.transaction_version,
})
})
.await
}
async fn stream_runtime_version(&self) -> Result<StreamOfResults<RuntimeVersion>, Error> {
let sub = self.methods.state_subscribe_runtime_version().await?;
let sub = sub.map(|r| {
r.map(|v| RuntimeVersion {
spec_version: v.spec_version,
transaction_version: v.transaction_version,
let methods = self.methods.clone();
let retry_sub = retry_stream(move || {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.state_subscribe_runtime_version().await?;
let sub = sub.map(|r| {
r.map(|v| RuntimeVersion {
spec_version: v.spec_version,
transaction_version: v.transaction_version,
})
});
Ok(StreamOf(Box::pin(sub)))
})
})
.await?;
// For runtime version subscriptions we omit the `DisconnectedWillReconnect` error
// because the once it resubscribes it will emit the latest runtime version.
//
// Thus, it's technically possible that a runtime version can be missed if
// two runtime upgrades happen in quick succession, but this is very unlikely.
let stream = retry_sub.filter(|r| {
let forward = !matches!(r, Err(Error::Rpc(RpcError::DisconnectedWillReconnect(_))));
async move { forward }
});
Ok(StreamOf(Box::pin(sub)))
Ok(StreamOf(Box::pin(stream)))
}
async fn stream_all_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
let sub = self.methods.chain_subscribe_all_heads().await?;
let sub = sub.map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
let methods = self.methods.clone();
let retry_sub = retry_stream(move || {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.chain_subscribe_all_heads().await?;
let sub = sub.map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
})
});
Ok(StreamOf(Box::pin(sub)))
})
});
Ok(StreamOf(Box::pin(sub)))
})
.await?;
Ok(retry_sub)
}
async fn stream_best_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
let sub = self.methods.chain_subscribe_new_heads().await?;
let sub = sub.map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
let methods = self.methods.clone();
let retry_sub = retry_stream(move || {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.chain_subscribe_new_heads().await?;
let sub = sub.map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
})
});
Ok(StreamOf(Box::pin(sub)))
})
});
Ok(StreamOf(Box::pin(sub)))
})
.await?;
Ok(retry_sub)
}
async fn stream_finalized_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
let sub: super::rpc::RpcSubscription<<T as Config>::Header> =
self.methods.chain_subscribe_finalized_heads().await?;
let this = self.clone();
// Get the last finalized block immediately so that the stream will emit every finalized block after this.
let last_finalized_block_ref = self.latest_finalized_block_ref().await?;
let last_finalized_block_num = self
.block_header(last_finalized_block_ref.hash())
.await?
.map(|h| h.number().into());
let retry_sub = retry_stream(move || {
let this = this.clone();
Box::pin(async move {
let sub = this.methods.chain_subscribe_finalized_heads().await?;
// Fill in any missing blocks, because the backend may not emit every finalized block; just the latest ones which
// are finalized each time.
let sub = subscribe_to_block_headers_filling_in_gaps(
self.methods.clone(),
sub,
last_finalized_block_num,
);
let sub = sub.map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
// Get the last finalized block immediately so that the stream will emit every finalized block after this.
let last_finalized_block_ref = this.latest_finalized_block_ref().await?;
let last_finalized_block_num = this
.block_header(last_finalized_block_ref.hash())
.await?
.map(|h| h.number().into());
// Fill in any missing blocks, because the backend may not emit every finalized block; just the latest ones which
// are finalized each time.
let sub = subscribe_to_block_headers_filling_in_gaps(
this.methods.clone(),
sub,
last_finalized_block_num,
);
let sub = sub.map(|r| {
r.map(|h| {
let hash = h.hash();
(h, BlockRef::from_hash(hash))
})
});
Ok(StreamOf(Box::pin(sub)))
})
});
Ok(StreamOf(Box::pin(sub)))
})
.await?;
Ok(retry_sub)
}
async fn submit_transaction(
@@ -261,6 +336,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
.methods
.author_submit_and_watch_extrinsic(extrinsic)
.await?;
let sub = sub.filter_map(|r| {
let mapped = r
.map(|tx| {
@@ -309,7 +385,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
future::ready(mapped)
});
Ok(StreamOf(Box::pin(sub)))
Ok(StreamOf::new(Box::pin(sub)))
}
async fn call(
@@ -318,9 +395,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
call_parameters: Option<&[u8]>,
at: T::Hash,
) -> Result<Vec<u8>, Error> {
self.methods
.state_call(method, call_parameters, Some(at))
.await
retry(|| self.methods.state_call(method, call_parameters, Some(at))).await
}
}
@@ -431,6 +506,11 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
return Poll::Ready(Some(Ok(keys)));
}
Err(e) => {
if e.is_disconnected_will_reconnect() {
this.keys_fut = Some(keys_fut);
continue;
}
// Error getting keys? Return it.
return Poll::Ready(Some(Err(e)));
}
@@ -513,7 +593,9 @@ impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
let at = this.keys.at;
let results_fut = async move {
let keys = keys.iter().map(|k| &**k);
let values = methods.state_query_storage_at(keys, Some(at)).await?;
let values =
retry(|| methods.state_query_storage_at(keys.clone(), Some(at)))
.await?;
let values: VecDeque<_> = values
.into_iter()
.flat_map(|v| {
+1
View File
@@ -9,6 +9,7 @@
pub mod legacy;
pub mod rpc;
pub mod unstable;
pub mod utils;
use subxt_core::client::RuntimeVersion;
+2 -2
View File
@@ -65,8 +65,8 @@ crate::macros::cfg_unstable_light_client! {
}
crate::macros::cfg_reconnecting_rpc_client! {
mod reconnecting_jsonrpsee_impl;
pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client;
/// reconnecting rpc client.
pub mod reconnecting_rpc_client;
}
mod rpc_client;
@@ -1,52 +0,0 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::SubscriptionId;
use serde_json::value::RawValue;
impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request_raw(method.to_string(), params)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe_raw(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
@@ -0,0 +1,270 @@
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{Future, FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::{CallRetryPolicy, Client as InnerClient, SubscriptionId};
use serde_json::value::RawValue;
use std::time::Duration;
pub use reconnecting_jsonrpsee_ws_client::{
ExponentialBackoff, FibonacciBackoff, FixedInterval, IdKind,
};
#[cfg(feature = "native")]
use reconnecting_jsonrpsee_ws_client::{HeaderMap, PingConfig};
/// Builder for [`Client`].
#[derive(Debug, Clone)]
pub struct Builder<P> {
max_request_size: u32,
max_response_size: u32,
retry_policy: P,
max_redirections: u32,
id_kind: IdKind,
max_log_len: u32,
max_concurrent_requests: u32,
request_timeout: Duration,
connection_timeout: Duration,
#[cfg(feature = "native")]
ping_config: Option<PingConfig>,
#[cfg(feature = "native")]
headers: HeaderMap,
}
impl Default for Builder<ExponentialBackoff> {
fn default() -> Self {
Self {
max_request_size: 10 * 1024 * 1024,
max_response_size: 10 * 1024 * 1024,
retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)),
max_redirections: 5,
id_kind: IdKind::Number,
max_log_len: 1024,
max_concurrent_requests: 1024,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
#[cfg(feature = "native")]
ping_config: Some(PingConfig::new()),
#[cfg(feature = "native")]
headers: HeaderMap::new(),
}
}
}
impl Builder<ExponentialBackoff> {
/// Create a new builder.
pub fn new() -> Self {
Self::default()
}
}
impl<P> Builder<P>
where
P: Iterator<Item = Duration> + Send + Sync + 'static + Clone,
{
/// Configure the min response size a for websocket message.
///
/// Default: 10MB
pub fn max_request_size(mut self, max: u32) -> Self {
self.max_request_size = max;
self
}
/// Configure the max response size a for websocket message.
///
/// Default: 10MB
pub fn max_response_size(mut self, max: u32) -> Self {
self.max_response_size = max;
self
}
/// Set the max number of redirections to perform until a connection is regarded as failed.
///
/// Default: 5
pub fn max_redirections(mut self, redirect: u32) -> Self {
self.max_redirections = redirect;
self
}
/// Configure how many concurrent method calls are allowed.
///
/// Default: 1024
pub fn max_concurrent_requests(mut self, max: u32) -> Self {
self.max_concurrent_requests = max;
self
}
/// Configure how long until a method call is regarded as failed.
///
/// Default: 1 minute
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Set connection timeout for the WebSocket handshake
///
/// Default: 10 seconds
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
/// Configure the data type of the request object ID
///
/// Default: number
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}
/// Set maximum length for logging calls and responses.
/// Logs bigger than this limit will be truncated.
///
/// Default: 1024
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_len = max;
self
}
/// Configure which retry policy to use.
///
/// Default: Exponential backoff 10ms
pub fn retry_policy<T: Iterator<Item = Duration> + Send + Sync + 'static + Clone>(
self,
retry_policy: T,
) -> Builder<T> {
Builder {
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
retry_policy,
max_redirections: self.max_redirections,
max_log_len: self.max_log_len,
id_kind: self.id_kind,
max_concurrent_requests: self.max_concurrent_requests,
request_timeout: self.request_timeout,
connection_timeout: self.connection_timeout,
#[cfg(feature = "native")]
ping_config: self.ping_config,
#[cfg(feature = "native")]
headers: self.headers,
}
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Configure the WebSocket ping/pong interval.
///
/// Default: 30 seconds.
pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self {
self.ping_config = Some(ping_config);
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
/// Disable WebSocket ping/pongs.
///
/// Default: 30 seconds.
pub fn disable_ws_ping(mut self) -> Self {
self.ping_config = None;
self
}
#[cfg(feature = "native")]
#[cfg_attr(docsrs, doc(cfg(native)))]
/// Configure custom headers to use in the WebSocket handshake.
pub fn set_headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers;
self
}
/// Build and connect to the target.
pub async fn build(self, url: String) -> Result<Client, RpcError> {
let client = InnerClient::builder()
.retry_policy(self.retry_policy)
.build(url)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
Ok(Client(client))
}
}
/// Reconnecting rpc client.
#[derive(Debug, Clone)]
pub struct Client(InnerClient);
impl Client {
/// Create a builder.
pub fn builder() -> Builder<ExponentialBackoff> {
Builder::new()
}
/// A future that resolves when the client has initiated a reconnection.
/// This method returns another future that resolves when the client has reconnected.
///
/// This may be called multiple times.
pub async fn reconnect_initiated(&self) -> impl Future<Output = ()> + '_ {
self.0.reconnect_started().await;
self.0.reconnected()
}
/// Get how many times the client has reconnected successfully.
pub fn reconnect_count(&self) -> usize {
self.0.reconnect_count()
}
}
impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.0
.request_raw_with_policy(method.to_string(), params, CallRetryPolicy::Drop)
.await
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
}
.boxed()
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.0
.subscribe_raw_with_policy(
sub.to_string(),
params,
unsub.to_string(),
CallRetryPolicy::Drop,
)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;
let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();
Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
@@ -148,6 +148,12 @@ impl<Hash> Stream for FollowStream<Hash> {
continue;
}
Poll::Ready(Err(e)) => {
// Re-start if a reconnecting backend was enabled.
if e.is_disconnected_will_reconnect() {
this.stream = InnerStreamState::Stopped;
continue;
}
// Finish forever if there's an error, passing it on.
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
@@ -182,6 +188,12 @@ impl<Hash> Stream for FollowStream<Hash> {
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev))));
}
Poll::Ready(Some(Err(e))) => {
// Re-start if a reconnecting backend was enabled.
if e.is_disconnected_will_reconnect() {
this.stream = InnerStreamState::Stopped;
continue;
}
// Finish forever if there's an error, passing it on.
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
@@ -5,7 +5,7 @@
use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::config::BlockHash;
use crate::error::Error;
use crate::error::{Error, RpcError};
use futures::stream::{Stream, StreamExt};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
@@ -380,6 +380,103 @@ struct SubscriberDetails<Hash: BlockHash> {
waker: Option<Waker>,
}
/// A stream that subscribes to finalized blocks
/// and indicates whether a block was missed if was restarted.
#[derive(Debug)]
pub struct FollowStreamFinalizedHeads<Hash: BlockHash, F> {
stream: FollowStreamDriverSubscription<Hash>,
sub_id: Option<String>,
last_seen_block: Option<BlockRef<Hash>>,
f: F,
is_done: bool,
}
impl<Hash: BlockHash, F> Unpin for FollowStreamFinalizedHeads<Hash, F> {}
impl<Hash, F> FollowStreamFinalizedHeads<Hash, F>
where
Hash: BlockHash,
F: Fn(FollowEvent<BlockRef<Hash>>) -> Vec<BlockRef<Hash>>,
{
pub fn new(stream: FollowStreamDriverSubscription<Hash>, f: F) -> Self {
Self {
stream,
sub_id: None,
last_seen_block: None,
f,
is_done: false,
}
}
}
impl<Hash, F> Stream for FollowStreamFinalizedHeads<Hash, F>
where
Hash: BlockHash,
F: Fn(FollowEvent<BlockRef<Hash>>) -> Vec<BlockRef<Hash>>,
{
type Item = Result<(String, Vec<BlockRef<Hash>>), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_done {
return Poll::Ready(None);
}
loop {
let Some(ev) = futures::ready!(self.stream.poll_next_unpin(cx)) else {
self.is_done = true;
return Poll::Ready(None);
};
let block_refs = match ev {
FollowStreamMsg::Ready(sub_id) => {
self.sub_id = Some(sub_id);
continue;
}
FollowStreamMsg::Event(FollowEvent::Finalized(finalized)) => {
self.last_seen_block = finalized.finalized_block_hashes.last().cloned();
(self.f)(FollowEvent::Finalized(finalized))
}
FollowStreamMsg::Event(FollowEvent::Initialized(mut init)) => {
let prev = self.last_seen_block.take();
self.last_seen_block = init.finalized_block_hashes.last().cloned();
if let Some(p) = prev {
let Some(pos) = init
.finalized_block_hashes
.iter()
.position(|b| b.hash() == p.hash())
else {
return Poll::Ready(Some(Err(RpcError::DisconnectedWillReconnect(
"Missed at least one block when the connection was lost".to_owned(),
)
.into())));
};
// If we got older blocks than `prev`, we need to remove them
// because they should already have been sent at this point.
init.finalized_block_hashes.drain(0..=pos);
}
(self.f)(FollowEvent::Initialized(init))
}
FollowStreamMsg::Event(ev) => (self.f)(ev),
};
if block_refs.is_empty() {
continue;
}
let sub_id = self
.sub_id
.clone()
.expect("Ready is always emitted before any other event");
return Poll::Ready(Some(Ok((sub_id, block_refs))));
}
}
}
#[cfg(test)]
mod test_utils {
use super::super::follow_stream_unpin::test_utils::test_unpin_stream_getter;
@@ -402,6 +499,9 @@ mod test_utils {
#[cfg(test)]
mod test {
use futures::TryStreamExt;
use sp_core::H256;
use super::super::follow_stream::test_utils::{
ev_best_block, ev_finalized, ev_initialized, ev_new_block,
};
@@ -545,4 +645,101 @@ mod test {
];
assert_eq!(evs, expected);
}
#[tokio::test]
async fn subscribe_finalized_blocks_restart_works() {
let mut driver = test_follow_stream_driver_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1], [])),
Ok(FollowEvent::Stop),
Ok(ev_initialized(1)),
Ok(ev_finalized([2], [])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let handle = driver.handle();
tokio::spawn(async move { while driver.next().await.is_some() {} });
let f = |ev| match ev {
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
FollowEvent::Initialized(ev) => ev.finalized_block_hashes,
_ => vec![],
};
let stream = FollowStreamFinalizedHeads::new(handle.subscribe(), f);
let evs: Vec<_> = stream.try_collect().await.unwrap();
let expected = vec![
(
"sub_id_0".to_string(),
vec![BlockRef::new(H256::from_low_u64_le(0))],
),
(
"sub_id_0".to_string(),
vec![BlockRef::new(H256::from_low_u64_le(1))],
),
(
"sub_id_5".to_string(),
vec![BlockRef::new(H256::from_low_u64_le(2))],
),
];
assert_eq!(evs, expected);
}
#[tokio::test]
async fn subscribe_finalized_blocks_restart_with_missed_blocks() {
let mut driver = test_follow_stream_driver_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(FollowEvent::Stop),
// Emulate that we missed some blocks.
Ok(ev_initialized(13)),
Ok(ev_finalized([14], [])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let handle = driver.handle();
tokio::spawn(async move { while driver.next().await.is_some() {} });
let f = |ev| match ev {
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
FollowEvent::Initialized(ev) => ev.finalized_block_hashes,
_ => vec![],
};
let evs: Vec<_> = FollowStreamFinalizedHeads::new(handle.subscribe(), f)
.collect()
.await;
assert_eq!(
evs[0].as_ref().unwrap(),
&(
"sub_id_0".to_string(),
vec![BlockRef::new(H256::from_low_u64_le(0))]
)
);
assert!(
matches!(&evs[1], Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) if e.contains("Missed at least one block when the connection was lost"))
);
assert_eq!(
evs[2].as_ref().unwrap(),
&(
"sub_id_2".to_string(),
vec![BlockRef::new(H256::from_low_u64_le(14))]
)
);
}
}
@@ -474,7 +474,7 @@ pub(super) mod test_utils {
pub type UnpinRx<Hash> = std::sync::mpsc::Receiver<(Hash, Arc<str>)>;
/// Get a `FolowStreamUnpin` from an iterator over events.
/// Get a [`FollowStreamUnpin`] from an iterator over events.
pub fn test_unpin_stream_getter<Hash, F, I>(
events: F,
max_life: usize,
+178 -144
View File
@@ -18,21 +18,22 @@ mod storage_items;
pub mod rpc_methods;
use self::follow_stream_driver::FollowStreamFinalizedHeads;
use self::rpc_methods::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
};
use crate::backend::{
rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
StreamOfResults, TransactionStatus,
rpc::RpcClient, utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse,
StreamOf, StreamOfResults, TransactionStatus,
};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use crate::Config;
use async_trait::async_trait;
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::future::Either;
use futures::{Stream, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::task::Poll;
use storage_items::StorageItems;
@@ -136,43 +137,50 @@ impl<T: Config> UnstableBackend<T> {
}
/// Stream block headers based on the provided filter fn
async fn stream_headers<F, I>(
async fn stream_headers<F>(
&self,
f: F,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error>
where
F: Fn(FollowEvent<follow_stream_unpin::BlockRef<T::Hash>>) -> I + Copy + Send + 'static,
I: IntoIterator<Item = follow_stream_unpin::BlockRef<T::Hash>> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
F: Fn(
FollowEvent<follow_stream_unpin::BlockRef<T::Hash>>,
) -> Vec<follow_stream_unpin::BlockRef<T::Hash>>
+ Send
+ Sync
+ 'static,
{
let sub_id = get_subscription_id(&self.follow_handle).await?;
let sub_id = Arc::new(sub_id);
let methods = self.methods.clone();
let headers = self.follow_handle.subscribe().events().flat_map(move |ev| {
let sub_id = sub_id.clone();
let methods = methods.clone();
let block_refs = f(ev).into_iter();
futures::stream::iter(block_refs).filter_map(move |block_ref| {
let sub_id = sub_id.clone();
let headers =
FollowStreamFinalizedHeads::new(self.follow_handle.subscribe(), f).flat_map(move |r| {
let methods = methods.clone();
async move {
let res = methods
.chainhead_v1_header(&sub_id, block_ref.hash())
.await
.transpose()?;
let (sub_id, block_refs) = match r {
Ok(ev) => ev,
Err(e) => return Either::Left(futures::stream::once(async { Err(e) })),
};
let header = match res {
Ok(header) => header,
Err(e) => return Some(Err(e)),
};
Either::Right(
futures::stream::iter(block_refs).filter_map(move |block_ref| {
let methods = methods.clone();
let sub_id = sub_id.clone();
Some(Ok((header, block_ref.into())))
}
})
});
async move {
let res = methods
.chainhead_v1_header(&sub_id, block_ref.hash())
.await
.transpose()?;
let header = match res {
Ok(header) => header,
Err(e) => return Some(Err(e)),
};
Some(Ok((header, block_ref.into())))
}
}),
)
});
Ok(StreamOf(Box::pin(headers)))
}
@@ -194,31 +202,34 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
keys: Vec<Vec<u8>>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
let queries = keys.iter().map(|key| StorageQuery {
key: &**key,
query_type: StorageQueryType::Value,
});
retry(|| async {
let queries = keys.iter().map(|key| StorageQuery {
key: &**key,
query_type: StorageQueryType::Value,
});
let storage_items =
StorageItems::from_methods(queries, at, &self.follow_handle, self.methods.clone())
.await?;
let storage_items =
StorageItems::from_methods(queries, at, &self.follow_handle, self.methods.clone())
.await?;
let storage_result_stream = storage_items.filter_map(|val| async move {
let val = match val {
Ok(val) => val,
Err(e) => return Some(Err(e)),
};
let stream = storage_items.filter_map(|val| async move {
let val = match val {
Ok(val) => val,
Err(e) => return Some(Err(e)),
};
let StorageResultType::Value(result) = val.result else {
return None;
};
Some(Ok(StorageResponse {
key: val.key.0,
value: result.0,
}))
});
let StorageResultType::Value(result) = val.result else {
return None;
};
Some(Ok(StorageResponse {
key: val.key.0,
value: result.0,
}))
});
Ok(StreamOf(Box::pin(storage_result_stream)))
Ok(StreamOf(Box::pin(stream)))
})
.await
}
async fn storage_fetch_descendant_keys(
@@ -226,22 +237,25 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
key: Vec<u8>,
at: T::Hash,
) -> Result<StreamOfResults<Vec<u8>>, Error> {
// Ask for hashes, and then just ignore them and return the keys that come back.
let query = StorageQuery {
key: &*key,
query_type: StorageQueryType::DescendantsHashes,
};
retry(|| async {
// Ask for hashes, and then just ignore them and return the keys that come back.
let query = StorageQuery {
key: &*key,
query_type: StorageQueryType::DescendantsHashes,
};
let storage_items = StorageItems::from_methods(
std::iter::once(query),
at,
&self.follow_handle,
self.methods.clone(),
)
.await?;
let storage_items = StorageItems::from_methods(
std::iter::once(query),
at,
&self.follow_handle,
self.methods.clone(),
)
.await?;
let storage_result_stream = storage_items.map(|val| val.map(|v| v.key.0));
Ok(StreamOf(Box::pin(storage_result_stream)))
let storage_result_stream = storage_items.map(|val| val.map(|v| v.key.0));
Ok(StreamOf(Box::pin(storage_result_stream)))
})
.await
}
async fn storage_fetch_descendant_values(
@@ -249,72 +263,81 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
key: Vec<u8>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
let query = StorageQuery {
key: &*key,
query_type: StorageQueryType::DescendantsValues,
};
let storage_items = StorageItems::from_methods(
std::iter::once(query),
at,
&self.follow_handle,
self.methods.clone(),
)
.await?;
let storage_result_stream = storage_items.filter_map(|val| async move {
let val = match val {
Ok(val) => val,
Err(e) => return Some(Err(e)),
retry(|| async {
let query = StorageQuery {
key: &*key,
query_type: StorageQueryType::DescendantsValues,
};
let StorageResultType::Value(result) = val.result else {
return None;
};
Some(Ok(StorageResponse {
key: val.key.0,
value: result.0,
}))
});
let storage_items = StorageItems::from_methods(
std::iter::once(query),
at,
&self.follow_handle,
self.methods.clone(),
)
.await?;
Ok(StreamOf(Box::pin(storage_result_stream)))
let storage_result_stream = storage_items.filter_map(|val| async move {
let val = match val {
Ok(val) => val,
Err(e) => return Some(Err(e)),
};
let StorageResultType::Value(result) = val.result else {
return None;
};
Some(Ok(StorageResponse {
key: val.key.0,
value: result.0,
}))
});
Ok(StreamOf(Box::pin(storage_result_stream)))
})
.await
}
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
self.methods.chainspec_v1_genesis_hash().await
retry(|| self.methods.chainspec_v1_genesis_hash()).await
}
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_v1_header(&sub_id, at).await
retry(|| async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_v1_header(&sub_id, at).await
})
.await
}
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
retry(|| async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let status = self.methods.chainhead_v1_body(&sub_id, at).await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id,
};
// Wait for the response to come back with the correct operationId.
let mut exts_stream = follow_events.filter_map(|ev| {
let FollowEvent::OperationBodyDone(body) = ev else {
return std::future::ready(None);
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let status = self.methods.chainhead_v1_body(&sub_id, at).await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id,
};
if body.operation_id != operation_id {
return std::future::ready(None);
}
let exts: Vec<_> = body.value.into_iter().map(|ext| ext.0).collect();
std::future::ready(Some(exts))
});
Ok(exts_stream.next().await)
// Wait for the response to come back with the correct operationId.
let mut exts_stream = follow_events.filter_map(|ev| {
let FollowEvent::OperationBodyDone(body) = ev else {
return std::future::ready(None);
};
if body.operation_id != operation_id {
return std::future::ready(None);
}
let exts: Vec<_> = body.value.into_iter().map(|ext| ext.0).collect();
std::future::ready(Some(exts))
});
Ok(exts_stream.next().await)
})
.await
}
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error> {
@@ -423,12 +446,16 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
std::future::ready(Some(Ok(runtime_version)))
});
Ok(StreamOf(Box::pin(runtime_stream)))
Ok(StreamOf::new(Box::pin(runtime_stream)))
}
async fn stream_all_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
// TODO: https://github.com/paritytech/subxt/issues/1568
//
// It's possible that blocks may be silently missed if
// a reconnection occurs because it's restarted by the unstable backend.
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::NewBlock(ev) => {
@@ -442,6 +469,10 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
async fn stream_best_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
// TODO: https://github.com/paritytech/subxt/issues/1568
//
// It's possible that blocks may be silently missed if
// a reconnection occurs because it's restarted by the unstable backend.
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash],
@@ -638,37 +669,40 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
call_parameters: Option<&[u8]>,
at: T::Hash,
) -> Result<Vec<u8>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
retry(|| async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let call_parameters = call_parameters.unwrap_or(&[]);
let status = self
.methods
.chainhead_v1_call(&sub_id, at, method, call_parameters)
.await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id,
};
// Wait for the response to come back with the correct operationId.
let mut call_data_stream = follow_events.filter_map(|ev| {
let FollowEvent::OperationCallDone(body) = ev else {
return std::future::ready(None);
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let call_parameters = call_parameters.unwrap_or(&[]);
let status = self
.methods
.chainhead_v1_call(&sub_id, at, method, call_parameters)
.await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id,
};
if body.operation_id != operation_id {
return std::future::ready(None);
}
std::future::ready(Some(body.output.0))
});
call_data_stream
.next()
.await
.ok_or_else(|| RpcError::SubscriptionDropped.into())
// Wait for the response to come back with the correct operationId.
let mut call_data_stream = follow_events.filter_map(|ev| {
let FollowEvent::OperationCallDone(body) = ev else {
return std::future::ready(None);
};
if body.operation_id != operation_id {
return std::future::ready(None);
}
std::future::ready(Some(body.output.0))
});
call_data_stream
.next()
.await
.ok_or_else(|| RpcError::SubscriptionDropped.into())
})
.await
}
}
@@ -111,6 +111,11 @@ impl<T: Config> Stream for StorageItems<T> {
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
if e.is_disconnected_will_reconnect() {
self.continue_fut = Some((self.continue_call)());
continue;
}
self.done = true;
return Poll::Ready(Some(Err(e)));
}
+271
View File
@@ -0,0 +1,271 @@
//! RPC utils.
use super::{StreamOf, StreamOfResults};
use crate::error::Error;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream, StreamExt};
use std::{future::Future, pin::Pin, task::Poll};
/// Resubscribe callback.
type ResubscribeGetter<T> = Box<dyn FnMut() -> ResubscribeFuture<T> + Send>;
/// Future that resolves to a subscription stream.
type ResubscribeFuture<T> = Pin<Box<dyn Future<Output = Result<StreamOfResults<T>, Error>> + Send>>;
pub(crate) enum PendingOrStream<T> {
Pending(BoxFuture<'static, Result<StreamOfResults<T>, Error>>),
Stream(StreamOfResults<T>),
}
impl<T> std::fmt::Debug for PendingOrStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PendingOrStream::Pending(_) => write!(f, "Pending"),
PendingOrStream::Stream(_) => write!(f, "Stream"),
}
}
}
/// Retry subscription.
struct RetrySubscription<T> {
resubscribe: ResubscribeGetter<T>,
state: Option<PendingOrStream<T>>,
}
impl<T> std::marker::Unpin for RetrySubscription<T> {}
impl<T> Stream for RetrySubscription<T> {
type Item = Result<T, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let Some(mut this) = self.state.take() else {
return Poll::Ready(None);
};
match this {
PendingOrStream::Stream(ref mut s) => match s.poll_next_unpin(cx) {
Poll::Ready(Some(Err(err))) => {
if err.is_disconnected_will_reconnect() {
self.state = Some(PendingOrStream::Pending((self.resubscribe)()));
}
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Ok(val))) => {
self.state = Some(this);
return Poll::Ready(Some(Ok(val)));
}
Poll::Pending => {
self.state = Some(this);
return Poll::Pending;
}
},
PendingOrStream::Pending(mut fut) => match fut.poll_unpin(cx) {
Poll::Ready(Ok(stream)) => {
self.state = Some(PendingOrStream::Stream(stream));
continue;
}
Poll::Ready(Err(err)) => {
if err.is_disconnected_will_reconnect() {
self.state = Some(PendingOrStream::Pending((self.resubscribe)()));
}
return Poll::Ready(Some(Err(err)));
}
Poll::Pending => {
self.state = Some(PendingOrStream::Pending(fut));
return Poll::Pending;
}
},
};
}
}
}
/// Retry a future until it doesn't return a disconnected error.
///
/// # Example
///
/// ```no_run
/// use subxt::backend::utils::retry;
///
/// async fn some_future() -> Result<(), subxt::error::Error> {
/// Ok(())
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let result = retry(|| some_future()).await;
/// }
/// ```
pub async fn retry<T, F, R>(mut retry_future: F) -> Result<R, Error>
where
F: FnMut() -> T,
T: Future<Output = Result<R, Error>>,
{
const REJECTED_MAX_RETRIES: usize = 10;
let mut rejected_retries = 0;
loop {
match retry_future().await {
Ok(v) => return Ok(v),
Err(e) => {
if e.is_disconnected_will_reconnect() {
continue;
}
// TODO: https://github.com/paritytech/subxt/issues/1567
// This is a hack because if a reconnection occurs
// the order of pending calls is not guaranteed.
//
// Such that it's possible the a pending future completes
// before `chainHead_follow` is established with fresh
// subscription id.
//
if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES {
rejected_retries += 1;
continue;
}
return Err(e);
}
}
}
}
/// Create a retry stream that will resubscribe on disconnect.
///
/// It's important to note that this function is intended to work only for stateless subscriptions.
/// If the subscription takes input or modifies state, this function should not be used.
///
/// # Example
///
/// ```no_run
/// use subxt::backend::{utils::retry_stream, StreamOf};
/// use futures::future::FutureExt;
///
/// #[tokio::main]
/// async fn main() {
/// retry_stream(|| {
/// // This needs to return a stream of results but if you are using
/// // the subxt backend already it will return StreamOf so you can just
/// // return it directly in the async block below.
/// async move { Ok(StreamOf::new(Box::pin(futures::stream::iter([Ok(2)])))) }.boxed()
/// }).await;
/// }
/// ```
pub async fn retry_stream<F, R>(sub_stream: F) -> Result<StreamOfResults<R>, Error>
where
F: FnMut() -> ResubscribeFuture<R> + Send + 'static + Clone,
R: Send + 'static,
{
let stream = retry(sub_stream.clone()).await?;
let resubscribe = Box::new(move || {
let sub_stream = sub_stream.clone();
async move { retry(sub_stream).await }.boxed()
});
// The extra Box is to encapsulate the retry subscription type
Ok(StreamOf::new(Box::pin(RetrySubscription {
state: Some(PendingOrStream::Stream(stream)),
resubscribe,
})))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::StreamOf;
fn disconnect_err() -> Error {
Error::Rpc(crate::error::RpcError::DisconnectedWillReconnect(
String::new(),
))
}
fn custom_err() -> Error {
Error::Other(String::new())
}
#[tokio::test]
async fn retry_stream_works() {
let retry_stream = retry_stream(|| {
async {
Ok(StreamOf::new(Box::pin(futures::stream::iter([
Ok(1),
Ok(2),
Ok(3),
Err(disconnect_err()),
]))))
}
.boxed()
})
.await
.unwrap();
let result = retry_stream
.take(5)
.collect::<Vec<Result<usize, Error>>>()
.await;
assert!(matches!(result[0], Ok(r) if r == 1));
assert!(matches!(result[1], Ok(r) if r == 2));
assert!(matches!(result[2], Ok(r) if r == 3));
assert!(matches!(result[3], Err(ref e) if e.is_disconnected_will_reconnect()));
assert!(matches!(result[4], Ok(r) if r == 1));
}
#[tokio::test]
async fn retry_sub_works() {
let stream = futures::stream::iter([Ok(1), Err(disconnect_err())]);
let resubscribe = Box::new(move || {
async move { Ok(StreamOf::new(Box::pin(futures::stream::iter([Ok(2)])))) }.boxed()
});
let retry_stream = RetrySubscription {
state: Some(PendingOrStream::Stream(StreamOf::new(Box::pin(stream)))),
resubscribe,
};
let result: Vec<_> = retry_stream.collect().await;
assert!(matches!(result[0], Ok(r) if r == 1));
assert!(matches!(result[1], Err(ref e) if e.is_disconnected_will_reconnect()));
assert!(matches!(result[2], Ok(r) if r == 2));
}
#[tokio::test]
async fn retry_sub_err_terminates_stream() {
let stream = futures::stream::iter([Ok(1)]);
let resubscribe = Box::new(move || async move { Err(custom_err()) }.boxed());
let retry_stream = RetrySubscription {
state: Some(PendingOrStream::Stream(StreamOf::new(Box::pin(stream)))),
resubscribe,
};
assert_eq!(retry_stream.count().await, 1);
}
#[tokio::test]
async fn retry_sub_resubscribe_err() {
let stream = futures::stream::iter([Ok(1), Err(disconnect_err())]);
let resubscribe = Box::new(move || async move { Err(custom_err()) }.boxed());
let retry_stream = RetrySubscription {
state: Some(PendingOrStream::Stream(StreamOf::new(Box::pin(stream)))),
resubscribe,
};
let result: Vec<_> = retry_stream.collect().await;
assert!(matches!(result[0], Ok(r) if r == 1));
assert!(matches!(result[1], Err(ref e) if e.is_disconnected_will_reconnect()));
assert!(matches!(result[2], Err(ref e) if matches!(e, Error::Other(_))));
}
}
+6 -6
View File
@@ -95,8 +95,8 @@ where
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let sub = client.backend().stream_all_block_headers().await?;
BlockStreamRes::Ok(sub)
let stream = client.backend().stream_all_block_headers().await?;
BlockStreamRes::Ok(stream)
})
}
@@ -112,8 +112,8 @@ where
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let sub = client.backend().stream_best_block_headers().await?;
BlockStreamRes::Ok(sub)
let stream = client.backend().stream_best_block_headers().await?;
BlockStreamRes::Ok(stream)
})
}
@@ -126,8 +126,8 @@ where
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let sub = client.backend().stream_finalized_block_headers().await?;
BlockStreamRes::Ok(sub)
let stream = client.backend().stream_finalized_block_headers().await?;
BlockStreamRes::Ok(stream)
})
}
}
+1 -2
View File
@@ -432,9 +432,8 @@ impl<T: Config> ClientRuntimeUpdater<T> {
/// Instead that's up to the user of this API to decide when to update and
/// to perform the actual updating.
pub async fn runtime_updates(&self) -> Result<RuntimeUpdaterStream<T>, Error> {
let stream = self.0.backend().stream_runtime_version().await?;
Ok(RuntimeUpdaterStream {
stream,
stream: self.0.backend().stream_runtime_version().await?,
client: self.0.clone(),
})
}
+5
View File
@@ -125,6 +125,11 @@ impl Error {
pub fn is_disconnected_will_reconnect(&self) -> bool {
matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_)))
}
/// Checks whether the error was caused by a RPC request being rejected.
pub fn is_rejected(&self) -> bool {
matches!(self, Error::Rpc(RpcError::RequestRejected(_)))
}
}
/// An RPC error. Since we are generic over the RPC client that is used,
+1 -1
View File
@@ -11,11 +11,11 @@ use crate::{
client::OnlineClientT,
error::{DispatchError, Error, RpcError, TransactionError},
events::EventsClient,
utils::strip_compact_prefix,
Config,
};
use derive_where::derive_where;
use futures::{Stream, StreamExt};
use subxt_core::utils::strip_compact_prefix;
/// This struct represents a subscription to the progress of some transaction.
pub struct TxProgress<T: Config, C> {