WIP more backend work

This commit is contained in:
James Wilson
2025-11-28 17:20:02 +00:00
parent fbde20cb0e
commit b216240575
10 changed files with 425 additions and 129 deletions
+1 -11
View File
@@ -9,11 +9,6 @@
//! Specifically, the focus here is on the `archive` methods. These can only be used
//! to interact with archive nodes, but are less restrictive than the `chainHead` methods
//! in terms of the allowed operations.
//!
//! # Warning
//!
//! Everything in this module is **unstable**, meaning that it could change without
//! warning at any time.
mod storage_stream;
@@ -31,11 +26,6 @@ use subxt_rpcs::methods::chain_head::{
};
use storage_stream::ArchiveStorageStream;
/// Re-export RPC types and methods from [`subxt_rpcs::methods::chain_head`].
pub mod rpc_methods {
pub use subxt_rpcs::methods::chain_head::*;
}
// Expose the RPC methods.
pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods as ArchiveRpcMethods;
@@ -203,7 +193,7 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
let res = self.methods.archive_v1_call(at, method, call_parameters.unwrap_or(&[])).await?;
match res {
ArchiveCallResult::Success(bytes) => Ok(bytes.0),
ArchiveCallResult::Error(e) => Err(BackendError::Other(e)),
ArchiveCallResult::Error(e) => Err(BackendError::other(e)),
}
}
}
+1 -1
View File
@@ -143,7 +143,7 @@ impl<T: Config> Stream for ArchiveStorageStream<T> {
},
ArchiveStorageEvent::Error(e) => {
this.state = None;
return Poll::Ready(Some(Err(BackendError::Other(e.error))))
return Poll::Ready(Some(Err(BackendError::other(e.error))))
},
ArchiveStorageEvent::Done => {
this.state = None;
+3 -23
View File
@@ -7,11 +7,6 @@
//! [`rpc_methods`] for the raw API calls.
//!
//! Specifically, the focus here is on the `chainHead` methods.
//!
//! # Warning
//!
//! Everything in this module is **unstable**, meaning that it could change without
//! warning at any time.
mod follow_stream;
mod follow_stream_driver;
@@ -37,11 +32,6 @@ use subxt_rpcs::methods::chain_head::{
FollowEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType,
};
/// Re-export RPC types and methods from [`subxt_rpcs::methods::chain_head`].
pub mod rpc_methods {
pub use subxt_rpcs::methods::chain_head::*;
}
// Expose the RPC methods.
pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods;
@@ -162,19 +152,9 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
/// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`.
#[cfg(feature = "runtime")]
pub fn build_with_background_driver(self, client: impl Into<RpcClient>) -> ChainHeadBackend<T> {
fn spawn<F: std::future::Future + Send + 'static>(future: F) {
#[cfg(not(target_family = "wasm"))]
tokio::spawn(async move {
future.await;
});
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
}
let (backend, mut driver) = self.build(client);
spawn(async move {
super::utils::spawn(async move {
// NOTE: we need to poll the driver until it's done i.e returns None
// to ensure that the backend is shutdown properly.
while let Some(res) = driver.next().await {
@@ -642,7 +622,7 @@ async fn submit_transaction_tracking_follow_events<T: Config>(
let start_instant = web_time::Instant::now();
// A quick helper to return a generic error.
let err_other = |s: &str| Some(Err(BackendError::Other(s.into())));
let err_other = |s: &'static str| Some(Err(BackendError::other(s)));
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
+2 -3
View File
@@ -112,9 +112,8 @@ impl<Hash> FollowStream<Hash> {
let stream = methods.chainhead_v1_follow(true).await?;
// Extract the subscription ID:
let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else {
return Err(BackendError::Other(
return Err(BackendError::other(
"Subscription ID expected for chainHead_follow response, but not given"
.to_owned(),
));
};
// Map stream errors into the higher level subxt one:
@@ -311,7 +310,7 @@ pub mod test {
Ok(FollowEvent::Stop),
Ok(ev_new_block(1, 2)),
// Nothing should be emitted after an error:
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
Ok(ev_new_block(2, 3)),
]
});
@@ -537,7 +537,7 @@ mod test {
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -580,7 +580,7 @@ mod test {
Ok(ev_finalized([1], [])),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -630,7 +630,7 @@ mod test {
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Ok(ev_finalized([1], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -668,7 +668,7 @@ mod test {
Ok(FollowEvent::Stop),
Ok(ev_initialized(1)),
Ok(ev_finalized([2], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -714,7 +714,7 @@ mod test {
// Emulate that we missed some blocks.
Ok(ev_initialized(13)),
Ok(ev_finalized([14], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -567,7 +567,7 @@ mod test {
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -593,7 +593,7 @@ mod test {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
3,
@@ -624,7 +624,7 @@ mod test {
Ok(ev_finalized([3], [])),
Ok(ev_finalized([4], [])),
Ok(ev_finalized([5], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
3,
@@ -663,7 +663,7 @@ mod test {
Ok(ev_new_block(1, 2)),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -711,7 +711,7 @@ mod test {
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [3])),
Ok(ev_finalized([4], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
@@ -771,7 +771,7 @@ mod test {
Ok(ev_best_block(1)),
Ok(ev_finalized([1], [])),
Ok(ev_finalized([2], [])),
Err(BackendError::Other("ended".to_owned())),
Err(BackendError::other("ended")),
]
},
10,
+1 -1
View File
@@ -157,7 +157,7 @@ impl<T: Config> Stream for StorageItems<T> {
FollowEvent::OperationError(err) if err.operation_id == *self.operation_id => {
// Something went wrong obtaining storage items; mark as done and return the error.
self.done = true;
return Poll::Ready(Some(Err(BackendError::Other(err.error))));
return Poll::Ready(Some(Err(BackendError::other(err.error))));
}
_ => {
// We don't care about this event; wait for the next.
+305
View File
@@ -0,0 +1,305 @@
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! This module exposes a backend implementation which will lookup the methods available
//! to it from the RPC client and call methods accordingly.
use crate::backend::chain_head::ChainHeadBackendDriver;
use crate::backend::{
legacy::LegacyBackend,
chain_head::{ChainHeadBackend, },
archive::ArchiveBackend,
Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults,
TransactionStatus, utils::retry,
};
use crate::config::{Config, HashFor, RpcConfigFor};
use crate::error::BackendError;
use async_trait::async_trait;
use futures::StreamExt;
use subxt_rpcs::RpcClient;
use subxt_rpcs::methods::chain_head::{
ArchiveStorageQuery, ArchiveCallResult, StorageQueryType,
};
use futures::Stream;
use std::task::Poll;
pub struct CombinedBackendBuilder<T: Config> {
archive: BackendChoice<ArchiveBackend<T>>,
chainhead: BackendChoice<ChainHeadBackend<T>>,
legacy: BackendChoice<LegacyBackend<T>>,
}
enum BackendChoice<V> {
Use(V),
DontUse,
UseDefault,
}
impl <T: Config> CombinedBackendBuilder<T> {
/// Create a new [`CombinedBackendBuilder`].
pub fn new() -> Self {
CombinedBackendBuilder {
archive: BackendChoice::UseDefault,
chainhead: BackendChoice::UseDefault,
legacy: BackendChoice::UseDefault,
}
}
/// Use the given [`ArchiveBackend`] where applicable.
pub fn with_archive_backend(mut self, backend: ArchiveBackend<T>) -> Self {
self.archive = BackendChoice::Use(backend);
self
}
/// Use the given [`ChainHeadBackend`] where applicable.
pub fn with_chainhead_backend(mut self, backend: ChainHeadBackend<T>) -> Self {
self.chainhead = BackendChoice::Use(backend);
self
}
/// Use the given [`LegacyBackend`] where applicable.
pub fn with_legacy_backend(mut self, backend: LegacyBackend<T>) -> Self {
self.legacy = BackendChoice::Use(backend);
self
}
/// Don't use any default backends; only use what is explicitly configured via
/// [`CombinedBackendBuilder::with_archive_backend`],
/// [`CombinedBackendBuilder::with_chainhead_backend`] and
/// [`CombinedBackendBuilder::with_legacy_backend`].
pub fn no_default_backends(mut self) -> Self {
if matches!(self.legacy, BackendChoice::UseDefault) {
self.legacy = BackendChoice::DontUse;
}
if matches!(self.archive, BackendChoice::UseDefault) {
self.archive = BackendChoice::DontUse;
}
if matches!(self.chainhead, BackendChoice::UseDefault) {
self.chainhead = BackendChoice::DontUse;
}
self
}
/// A low-level API to build the backend and driver which requires polling the driver for the backend
/// to make progress.
///
/// This is useful if you want to manage the driver yourself, for example if you want to run it in on
/// a specific runtime.
///
/// If you just want to run the driver in the background until completion in on the default runtime,
/// use [`CombinedBackendBuilder::build_with_background_driver`] instead.
pub async fn build(self, rpc_client: impl Into<RpcClient>) -> Result<(CombinedBackend<T>, CombinedBackendDriver<T>), BackendError> {
let rpc_client = rpc_client.into();
// What does the thing wer're talking to actually know about?
let methods: Vec<String> = rpc_client
.request("rpc_methods", subxt_rpcs::rpc_params![])
.await?;
let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_"));
let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1"));
let mut combined_driver = CombinedBackendDriver { chainhead_driver: None };
let archive = if has_archive_methods {
match self.archive {
BackendChoice::Use(b) => Some(b),
BackendChoice::UseDefault => Some(ArchiveBackend::new(rpc_client.clone())),
BackendChoice::DontUse => None,
}
} else { None };
let chainhead = if has_chainhead_methods {
match self.chainhead {
BackendChoice::Use(b) => Some(b),
BackendChoice::UseDefault => {
let (chainhead, chainhead_driver) = ChainHeadBackend::builder().build(rpc_client.clone());
combined_driver.chainhead_driver = Some(chainhead_driver);
Some(chainhead)
},
BackendChoice::DontUse => None,
}
} else { None };
let legacy = match self.legacy {
BackendChoice::Use(b) => Some(b),
BackendChoice::UseDefault => Some(LegacyBackend::builder().build(rpc_client.clone())),
BackendChoice::DontUse => None,
};
let combined = CombinedBackend {
archive,
chainhead,
legacy
};
Ok((combined, combined_driver))
}
/// An API to build the backend and driver which will run in the background until completion
/// on the default runtime.
///
/// - On non-wasm targets, this will spawn the driver on `tokio`.
/// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`.
#[cfg(feature = "runtime")]
pub async fn build_with_background_driver(self, client: impl Into<RpcClient>) -> Result<CombinedBackend<T>, BackendError> {
let (backend, mut driver) = self.build(client).await?;
super::utils::spawn(async move {
// NOTE: we need to poll the driver until it's done i.e returns None
// to ensure that the backend is shutdown properly.
while let Some(res) = driver.next().await {
if let Err(err) = res {
tracing::debug!(target: "subxt", "chainHead backend error={err}");
}
}
tracing::debug!(target: "subxt", "combined backend was closed");
});
Ok(backend)
}
}
/// Driver for the [`CombinedBackend`]. This needs to be polled to ensure
/// that the [`CombinedBackend`] can make progress. It does not need polling
/// if [`CombinedBackendDriver::needs_polling`] returns `false`.
pub struct CombinedBackendDriver<T: Config> {
chainhead_driver: Option<ChainHeadBackendDriver<T>>
}
impl <T: Config> CombinedBackendDriver<T> {
pub fn needs_polling(&self) -> bool {
self.chainhead_driver.is_some()
}
}
impl<T: Config> Stream for CombinedBackendDriver<T> {
type Item = <ChainHeadBackendDriver<T> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match &mut self.chainhead_driver {
Some(driver) => driver.poll_next_unpin(cx),
None => Poll::Ready(None)
}
}
}
/// A combined backend. This selects which RPC calls to use based on the `rpc_methods`
/// available from the given RPC client we're given.
pub struct CombinedBackend<T: Config> {
archive: Option<ArchiveBackend<T>>,
chainhead: Option<ChainHeadBackend<T>>,
legacy: Option<LegacyBackend<T>>,
}
impl <T: Config> CombinedBackend<T> {
/// Configure and construct a [`CombinedBackend`].
pub fn builder() -> CombinedBackendBuilder<T> {
CombinedBackendBuilder::new()
}
}
impl<T: Config> super::sealed::Sealed for CombinedBackend<T> {}
static NO_AVAILABLE_BACKEND: &str = "No available RPC methods to use. `no_default_backends` was used, but no applicable backends were then provided.";
macro_rules! call_backends {
({$($backend_name:ident)|+}. $method_name:ident ( $($arg:expr),* )) => {{
let mut err = BackendError::other(NO_AVAILABLE_BACKEND);
$(
if let Some(backend) = &self.$backend_name {
err = match backend.$method_name($( $arg, )*).await {
Ok(res) => return Ok(res),
Err(e) => e
}
}
)+
return Err(err)
}}
}
#[async_trait]
impl<T: Config> Backend<T> for CombinedBackend<T> {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> {
call_backends!({archive|chainhead|legacy}.storage_fetch_values(keys, at))
}
async fn storage_fetch_descendant_keys(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError> {
}
async fn storage_fetch_descendant_values(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> {
}
async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError> {
}
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
}
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError> {
}
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError> {
}
async fn stream_all_block_headers(
&self,
_hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
}
async fn stream_best_block_headers(
&self,
_hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
}
async fn stream_finalized_block_headers(
&self,
_hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
}
async fn submit_transaction(
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError> {
}
async fn call(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: HashFor<T>,
) -> Result<Vec<u8>, BackendError> {
}
}
+94 -78
View File
@@ -1,86 +1,24 @@
//! RPC utils.
//! Backend utils.
use super::{StreamOf, StreamOfResults};
use crate::error::BackendError;
use futures::{FutureExt, Stream, StreamExt};
use std::{future::Future, pin::Pin, task::Poll};
/// Resubscribe callback.
type ResubscribeGetter<T> = Box<dyn FnMut() -> ResubscribeFuture<T> + Send>;
/// Future that resolves to a subscription stream.
type ResubscribeFuture<T> =
Pin<Box<dyn Future<Output = Result<StreamOfResults<T>, BackendError>> + Send>>;
/// Retry subscription.
struct RetrySubscription<F, R, T> {
resubscribe: F,
state: RetrySubscriptionState<R, T>,
}
enum RetrySubscriptionState<R, T> {
Init,
Pending(R),
Stream(StreamOfResults<T>),
Done,
}
impl<F, R, T> std::marker::Unpin for RetrySubscription<F, R, T> {}
impl<F, R, T> Stream for RetrySubscription<F, R, T>
where
F: FnMut() -> R,
R: Future<Output = Result<StreamOfResults<T>, BackendError>> + Unpin,
{
type Item = Result<T, BackendError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
RetrySubscriptionState::Init => {
self.state = RetrySubscriptionState::Pending((self.resubscribe)());
},
RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) {
Poll::Ready(Some(Err(err))) => {
if err.is_disconnected_will_reconnect() {
self.state = RetrySubscriptionState::Init;
}
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(None) => {
return Poll::Ready(None)
}
Poll::Ready(Some(Ok(val))) => {
return Poll::Ready(Some(Ok(val)));
}
Poll::Pending => {
return Poll::Pending;
}
},
RetrySubscriptionState::Pending(fut) => match fut.poll_unpin(cx) {
Poll::Ready(Err(err)) => {
if err.is_disconnected_will_reconnect() {
self.state = RetrySubscriptionState::Init;
}
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(Ok(stream)) => {
self.state = RetrySubscriptionState::Stream(stream);
continue;
}
Poll::Pending => {
return Poll::Pending;
}
},
RetrySubscriptionState::Done => {
return Poll::Ready(None)
}
};
}
}
/// Spawn a task.
///
/// - On non-wasm targets, this will spawn a task via [`tokio::spawn`].
/// - On wasm targets, this will spawn a task via [`wasm_bindgen_futures::spawn_local`].
#[cfg(feature = "runtime")]
pub(crate) fn spawn<F: std::future::Future + Send + 'static>(future: F) {
#[cfg(not(target_family = "wasm"))]
tokio::spawn(async move {
future.await;
});
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
}
/// Retry a future until it doesn't return a disconnected error.
@@ -180,6 +118,84 @@ where
})))
}
/// Resubscribe callback.
type ResubscribeGetter<T> = Box<dyn FnMut() -> ResubscribeFuture<T> + Send>;
/// Future that resolves to a subscription stream.
type ResubscribeFuture<T> =
Pin<Box<dyn Future<Output = Result<StreamOfResults<T>, BackendError>> + Send>>;
/// Retry subscription.
struct RetrySubscription<F, R, T> {
resubscribe: F,
state: RetrySubscriptionState<R, T>,
}
enum RetrySubscriptionState<R, T> {
Init,
Pending(R),
Stream(StreamOfResults<T>),
Done,
}
impl<F, R, T> std::marker::Unpin for RetrySubscription<F, R, T> {}
impl<F, R, T> Stream for RetrySubscription<F, R, T>
where
F: FnMut() -> R,
R: Future<Output = Result<StreamOfResults<T>, BackendError>> + Unpin,
{
type Item = Result<T, BackendError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
RetrySubscriptionState::Init => {
self.state = RetrySubscriptionState::Pending((self.resubscribe)());
},
RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) {
Poll::Ready(Some(Err(err))) => {
if err.is_disconnected_will_reconnect() {
self.state = RetrySubscriptionState::Init;
}
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(None) => {
return Poll::Ready(None)
}
Poll::Ready(Some(Ok(val))) => {
return Poll::Ready(Some(Ok(val)));
}
Poll::Pending => {
return Poll::Pending;
}
},
RetrySubscriptionState::Pending(fut) => match fut.poll_unpin(cx) {
Poll::Ready(Err(err)) => {
if err.is_disconnected_will_reconnect() {
self.state = RetrySubscriptionState::Init;
}
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(Ok(stream)) => {
self.state = RetrySubscriptionState::Stream(stream);
continue;
}
Poll::Pending => {
return Poll::Pending;
}
},
RetrySubscriptionState::Done => {
return Poll::Ready(None)
}
};
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -190,7 +206,7 @@ mod tests {
}
fn custom_err() -> BackendError {
BackendError::Other(String::new())
BackendError::other("")
}
#[tokio::test]
+7 -1
View File
@@ -8,6 +8,7 @@ mod dispatch_error;
mod hex;
use thiserror::Error as DeriveError;
use std::borrow::Cow;
#[cfg(feature = "unstable-light-client")]
pub use subxt_lightclient::LightClientError;
@@ -276,7 +277,7 @@ pub enum BackendError {
CouldNotDecodeMetadata(codec::Error),
// This is for errors in `Backend` implementations which aren't any of the "pre-defined" set above:
#[error("Custom backend error: {0}")]
Other(String),
Other(Cow<'static, str>),
}
impl BackendError {
@@ -294,6 +295,11 @@ impl BackendError {
pub fn is_rpc_limit_reached(&self) -> bool {
matches!(self, BackendError::Rpc(RpcError::LimitReached))
}
/// Create a [`BackendError::Other`] given a message.
pub fn other(message: impl Into<Cow<'static, str>>) -> Self {
BackendError::Other(message.into())
}
}
impl From<subxt_rpcs::Error> for BackendError {