Files
pezkuwi-subxt/subxt/src/blocks/blocks_client.rs
T
Niklas Adolfsson bec896d91a rpc: add full support reconnecting rpc client (#1505)
* add simple reconnecting rpc client

* initial retryable calls

* add reconnecting backend

* add reconnecting example for unstable backend

* add todo what isn't working

* FollowStream: restart on reconn

* naive fix: fetch sub_id in stream_headers

* cleanup

* remove resubscribe APIs

* cleanup and remove many wrapper streams

* remove retry backend

* legacy rpc: make it retryable

* unstable rpc: make it retryable

* fix nits

* support wasm as well

* remove deadcode

* address grumbles

* revert rpc methods

* don't create a subscription per block

* get rid off retry logic in subxt rpc

* Update subxt/Cargo.toml

* Update subxt/src/backend/legacy/mod.rs

* Update subxt/src/backend/legacy/mod.rs

* remove outdated comments

* fix bad merge

* Fix reconnecting RPC client and update dependencies

* add back retry logic and remove `finito`

* fix nits

* cleanup

* add hack for race when reconnecting

* backend: emit Stop event DisconnectWillRecoonect

* merge reconnecting client examples

* add fn retry_stream

* cleanup

* add all features from reconnecting-rpc-client

* fix build

* remove needless retry for fetch_storage

* StorageFetchDescendantKeysStream handle disconnect err

* dont retry transactions

* fetch subscription ID from FollowStreamMsg

* fix nits

* Update subxt/src/backend/legacy/mod.rs

* Update subxt/src/backend/legacy/mod.rs

* add reconn to StorageItems stream

* StorageFetchDescendantKeysStreamchore: retry storage call

* RetryStream: emit DisconnectWillReconnect msg

* runtime subscriptions ignore DisconnectWillReconn

* Update subxt/examples/setup_reconnecting_rpc_client.rs

* Update subxt/src/client/online_client.rs

* Update subxt/src/client/online_client.rs

* Add custom stream wrapper for finalized blocks

* add missing retry block

* clippy

* clippy again

* cleanup

* remove duplicate logic

* fix more grumbles

* Update subxt/examples/setup_reconnecting_rpc_client.rs

Co-authored-by: James Wilson <james@jsdw.me>

* simplify the example

* remove pin-project dep

* remove duplicate retry logic

* remove extra code

* specify trait bounds for retry api

* simplify the example

* fix weird Poll::Pending return

* fix nit in poll impl

* remove needless paths

* make retry_stream pub and add doc examples

* Update subxt/src/backend/utils.rs

---------

Co-authored-by: James Wilson <james@jsdw.me>
2024-05-08 15:12:54 +02:00

159 lines
5.1 KiB
Rust

// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::Block;
use crate::{
backend::{BlockRef, StreamOfResults},
client::OnlineClientT,
config::Config,
error::{BlockError, Error},
utils::PhantomDataSendSync,
};
use derive_where::derive_where;
use futures::StreamExt;
use std::future::Future;
type BlockStream<T> = StreamOfResults<T>;
type BlockStreamRes<T> = Result<BlockStream<T>, Error>;
/// A client for working with blocks.
#[derive_where(Clone; Client)]
pub struct BlocksClient<T, Client> {
client: Client,
_marker: PhantomDataSendSync<T>,
}
impl<T, Client> BlocksClient<T, Client> {
/// Create a new [`BlocksClient`].
pub fn new(client: Client) -> Self {
Self {
client,
_marker: PhantomDataSendSync::new(),
}
}
}
impl<T, Client> BlocksClient<T, Client>
where
T: Config,
Client: OnlineClientT<T>,
{
/// Obtain block details given the provided block hash.
///
/// # Warning
///
/// This call only supports blocks produced since the most recent
/// runtime upgrade. You can attempt to retrieve older blocks,
/// but may run into errors attempting to work with them.
pub fn at(
&self,
block_ref: impl Into<BlockRef<T::Hash>>,
) -> impl Future<Output = Result<Block<T, Client>, Error>> + Send + 'static {
self.at_or_latest(Some(block_ref.into()))
}
/// Obtain block details of the latest block hash.
pub fn at_latest(
&self,
) -> impl Future<Output = Result<Block<T, Client>, Error>> + Send + 'static {
self.at_or_latest(None)
}
/// Obtain block details given the provided block hash, or the latest block if `None` is
/// provided.
fn at_or_latest(
&self,
block_ref: Option<BlockRef<T::Hash>>,
) -> impl Future<Output = Result<Block<T, Client>, Error>> + Send + 'static {
let client = self.client.clone();
async move {
// If a block ref isn't provided, we'll get the latest finalized ref to use.
let block_ref = match block_ref {
Some(r) => r,
None => client.backend().latest_finalized_block_ref().await?,
};
let block_header = match client.backend().block_header(block_ref.hash()).await? {
Some(header) => header,
None => return Err(BlockError::not_found(block_ref.hash()).into()),
};
Ok(Block::new(block_header, block_ref, client))
}
}
/// Subscribe to all new blocks imported by the node.
///
/// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of
/// the time.
pub fn subscribe_all(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, Error>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let stream = client.backend().stream_all_block_headers().await?;
BlockStreamRes::Ok(stream)
})
}
/// Subscribe to all new blocks imported by the node onto the current best fork.
///
/// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of
/// the time.
pub fn subscribe_best(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, Error>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let stream = client.backend().stream_best_block_headers().await?;
BlockStreamRes::Ok(stream)
})
}
/// Subscribe to finalized blocks.
pub fn subscribe_finalized(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, Error>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let stream = client.backend().stream_finalized_block_headers().await?;
BlockStreamRes::Ok(stream)
})
}
}
/// Take a promise that will return a subscription to some block headers,
/// and return a subscription to some blocks based on this.
async fn header_sub_fut_to_block_sub<T, Client, S>(
blocks_client: BlocksClient<T, Client>,
sub: S,
) -> Result<BlockStream<Block<T, Client>>, Error>
where
T: Config,
S: Future<Output = Result<BlockStream<(T::Header, BlockRef<T::Hash>)>, Error>> + Send + 'static,
Client: OnlineClientT<T> + Send + Sync + 'static,
{
let sub = sub.await?.then(move |header_and_ref| {
let client = blocks_client.client.clone();
async move {
let (header, block_ref) = match header_and_ref {
Ok(header_and_ref) => header_and_ref,
Err(e) => return Err(e),
};
Ok(Block::new(header, block_ref, client))
}
});
BlockStreamRes::Ok(StreamOfResults::new(Box::pin(sub)))
}