diff --git a/new/src/backend/archive.rs b/new/src/backend/archive.rs index 3a1cecb73d..e3b5aae4f7 100644 --- a/new/src/backend/archive.rs +++ b/new/src/backend/archive.rs @@ -9,11 +9,6 @@ //! Specifically, the focus here is on the `archive` methods. These can only be used //! to interact with archive nodes, but are less restrictive than the `chainHead` methods //! in terms of the allowed operations. -//! -//! # Warning -//! -//! Everything in this module is **unstable**, meaning that it could change without -//! warning at any time. mod storage_stream; @@ -31,11 +26,6 @@ use subxt_rpcs::methods::chain_head::{ }; use storage_stream::ArchiveStorageStream; -/// Re-export RPC types and methods from [`subxt_rpcs::methods::chain_head`]. -pub mod rpc_methods { - pub use subxt_rpcs::methods::chain_head::*; -} - // Expose the RPC methods. pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods as ArchiveRpcMethods; @@ -203,7 +193,7 @@ impl Backend for ArchiveBackend { let res = self.methods.archive_v1_call(at, method, call_parameters.unwrap_or(&[])).await?; match res { ArchiveCallResult::Success(bytes) => Ok(bytes.0), - ArchiveCallResult::Error(e) => Err(BackendError::Other(e)), + ArchiveCallResult::Error(e) => Err(BackendError::other(e)), } } } diff --git a/new/src/backend/archive/storage_stream.rs b/new/src/backend/archive/storage_stream.rs index 7839402abe..994789bc2c 100644 --- a/new/src/backend/archive/storage_stream.rs +++ b/new/src/backend/archive/storage_stream.rs @@ -143,7 +143,7 @@ impl Stream for ArchiveStorageStream { }, ArchiveStorageEvent::Error(e) => { this.state = None; - return Poll::Ready(Some(Err(BackendError::Other(e.error)))) + return Poll::Ready(Some(Err(BackendError::other(e.error)))) }, ArchiveStorageEvent::Done => { this.state = None; diff --git a/new/src/backend/chain_head.rs b/new/src/backend/chain_head.rs index 8b2eba8481..f34ce28cae 100644 --- a/new/src/backend/chain_head.rs +++ b/new/src/backend/chain_head.rs @@ -7,11 +7,6 @@ //! [`rpc_methods`] for the raw API calls. //! //! Specifically, the focus here is on the `chainHead` methods. -//! -//! # Warning -//! -//! Everything in this module is **unstable**, meaning that it could change without -//! warning at any time. mod follow_stream; mod follow_stream_driver; @@ -37,11 +32,6 @@ use subxt_rpcs::methods::chain_head::{ FollowEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType, }; -/// Re-export RPC types and methods from [`subxt_rpcs::methods::chain_head`]. -pub mod rpc_methods { - pub use subxt_rpcs::methods::chain_head::*; -} - // Expose the RPC methods. pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods; @@ -162,19 +152,9 @@ impl ChainHeadBackendBuilder { /// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`. #[cfg(feature = "runtime")] pub fn build_with_background_driver(self, client: impl Into) -> ChainHeadBackend { - fn spawn(future: F) { - #[cfg(not(target_family = "wasm"))] - tokio::spawn(async move { - future.await; - }); - #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - wasm_bindgen_futures::spawn_local(async move { - future.await; - }); - } - let (backend, mut driver) = self.build(client); - spawn(async move { + + super::utils::spawn(async move { // NOTE: we need to poll the driver until it's done i.e returns None // to ensure that the backend is shutdown properly. while let Some(res) = driver.next().await { @@ -642,7 +622,7 @@ async fn submit_transaction_tracking_follow_events( let start_instant = web_time::Instant::now(); // A quick helper to return a generic error. - let err_other = |s: &str| Some(Err(BackendError::Other(s.into()))); + let err_other = |s: &'static str| Some(Err(BackendError::other(s))); // Now we can attempt to associate tx events with pinned blocks. let tx_stream = futures::stream::poll_fn(move |cx| { diff --git a/new/src/backend/chain_head/follow_stream.rs b/new/src/backend/chain_head/follow_stream.rs index 958e923477..874f13395c 100644 --- a/new/src/backend/chain_head/follow_stream.rs +++ b/new/src/backend/chain_head/follow_stream.rs @@ -112,9 +112,8 @@ impl FollowStream { let stream = methods.chainhead_v1_follow(true).await?; // Extract the subscription ID: let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else { - return Err(BackendError::Other( + return Err(BackendError::other( "Subscription ID expected for chainHead_follow response, but not given" - .to_owned(), )); }; // Map stream errors into the higher level subxt one: @@ -311,7 +310,7 @@ pub mod test { Ok(FollowEvent::Stop), Ok(ev_new_block(1, 2)), // Nothing should be emitted after an error: - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), Ok(ev_new_block(2, 3)), ] }); diff --git a/new/src/backend/chain_head/follow_stream_driver.rs b/new/src/backend/chain_head/follow_stream_driver.rs index f1ff507729..0324f5ea35 100644 --- a/new/src/backend/chain_head/follow_stream_driver.rs +++ b/new/src/backend/chain_head/follow_stream_driver.rs @@ -537,7 +537,7 @@ mod test { Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), Ok(ev_finalized([1], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -580,7 +580,7 @@ mod test { Ok(ev_finalized([1], [])), Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -630,7 +630,7 @@ mod test { Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), Ok(ev_finalized([1], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -668,7 +668,7 @@ mod test { Ok(FollowEvent::Stop), Ok(ev_initialized(1)), Ok(ev_finalized([2], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -714,7 +714,7 @@ mod test { // Emulate that we missed some blocks. Ok(ev_initialized(13)), Ok(ev_finalized([14], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, diff --git a/new/src/backend/chain_head/follow_stream_unpin.rs b/new/src/backend/chain_head/follow_stream_unpin.rs index 3be783552d..b8e9c144f8 100644 --- a/new/src/backend/chain_head/follow_stream_unpin.rs +++ b/new/src/backend/chain_head/follow_stream_unpin.rs @@ -567,7 +567,7 @@ mod test { Ok(ev_new_block(0, 1)), Ok(ev_new_block(1, 2)), Ok(ev_new_block(2, 3)), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -593,7 +593,7 @@ mod test { [ Ok(ev_initialized(0)), Ok(ev_finalized([1], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 3, @@ -624,7 +624,7 @@ mod test { Ok(ev_finalized([3], [])), Ok(ev_finalized([4], [])), Ok(ev_finalized([5], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 3, @@ -663,7 +663,7 @@ mod test { Ok(ev_new_block(1, 2)), Ok(ev_finalized([1], [])), Ok(ev_finalized([2], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -711,7 +711,7 @@ mod test { Ok(ev_finalized([1], [])), Ok(ev_finalized([2], [3])), Ok(ev_finalized([4], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, @@ -771,7 +771,7 @@ mod test { Ok(ev_best_block(1)), Ok(ev_finalized([1], [])), Ok(ev_finalized([2], [])), - Err(BackendError::Other("ended".to_owned())), + Err(BackendError::other("ended")), ] }, 10, diff --git a/new/src/backend/chain_head/storage_items.rs b/new/src/backend/chain_head/storage_items.rs index 917beacda2..31cbea8c09 100644 --- a/new/src/backend/chain_head/storage_items.rs +++ b/new/src/backend/chain_head/storage_items.rs @@ -157,7 +157,7 @@ impl Stream for StorageItems { FollowEvent::OperationError(err) if err.operation_id == *self.operation_id => { // Something went wrong obtaining storage items; mark as done and return the error. self.done = true; - return Poll::Ready(Some(Err(BackendError::Other(err.error)))); + return Poll::Ready(Some(Err(BackendError::other(err.error)))); } _ => { // We don't care about this event; wait for the next. diff --git a/new/src/backend/combined.rs b/new/src/backend/combined.rs index e69de29bb2..37806f072e 100644 --- a/new/src/backend/combined.rs +++ b/new/src/backend/combined.rs @@ -0,0 +1,305 @@ +// Copyright 2019-2025 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! This module exposes a backend implementation which will lookup the methods available +//! to it from the RPC client and call methods accordingly. + +use crate::backend::chain_head::ChainHeadBackendDriver; +use crate::backend::{ + legacy::LegacyBackend, + chain_head::{ChainHeadBackend, }, + archive::ArchiveBackend, + Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, + TransactionStatus, utils::retry, +}; +use crate::config::{Config, HashFor, RpcConfigFor}; +use crate::error::BackendError; +use async_trait::async_trait; +use futures::StreamExt; +use subxt_rpcs::RpcClient; +use subxt_rpcs::methods::chain_head::{ + ArchiveStorageQuery, ArchiveCallResult, StorageQueryType, +}; +use futures::Stream; +use std::task::Poll; + +pub struct CombinedBackendBuilder { + archive: BackendChoice>, + chainhead: BackendChoice>, + legacy: BackendChoice>, +} + +enum BackendChoice { + Use(V), + DontUse, + UseDefault, +} + +impl CombinedBackendBuilder { + /// Create a new [`CombinedBackendBuilder`]. + pub fn new() -> Self { + CombinedBackendBuilder { + archive: BackendChoice::UseDefault, + chainhead: BackendChoice::UseDefault, + legacy: BackendChoice::UseDefault, + } + } + + /// Use the given [`ArchiveBackend`] where applicable. + pub fn with_archive_backend(mut self, backend: ArchiveBackend) -> Self { + self.archive = BackendChoice::Use(backend); + self + } + + /// Use the given [`ChainHeadBackend`] where applicable. + pub fn with_chainhead_backend(mut self, backend: ChainHeadBackend) -> Self { + self.chainhead = BackendChoice::Use(backend); + self + } + + /// Use the given [`LegacyBackend`] where applicable. + pub fn with_legacy_backend(mut self, backend: LegacyBackend) -> Self { + self.legacy = BackendChoice::Use(backend); + self + } + + /// Don't use any default backends; only use what is explicitly configured via + /// [`CombinedBackendBuilder::with_archive_backend`], + /// [`CombinedBackendBuilder::with_chainhead_backend`] and + /// [`CombinedBackendBuilder::with_legacy_backend`]. + pub fn no_default_backends(mut self) -> Self { + if matches!(self.legacy, BackendChoice::UseDefault) { + self.legacy = BackendChoice::DontUse; + } + if matches!(self.archive, BackendChoice::UseDefault) { + self.archive = BackendChoice::DontUse; + } + if matches!(self.chainhead, BackendChoice::UseDefault) { + self.chainhead = BackendChoice::DontUse; + } + self + } + + /// A low-level API to build the backend and driver which requires polling the driver for the backend + /// to make progress. + /// + /// This is useful if you want to manage the driver yourself, for example if you want to run it in on + /// a specific runtime. + /// + /// If you just want to run the driver in the background until completion in on the default runtime, + /// use [`CombinedBackendBuilder::build_with_background_driver`] instead. + pub async fn build(self, rpc_client: impl Into) -> Result<(CombinedBackend, CombinedBackendDriver), BackendError> { + let rpc_client = rpc_client.into(); + + // What does the thing wer're talking to actually know about? + let methods: Vec = rpc_client + .request("rpc_methods", subxt_rpcs::rpc_params![]) + .await?; + + let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_")); + let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1")); + + let mut combined_driver = CombinedBackendDriver { chainhead_driver: None }; + + let archive = if has_archive_methods { + match self.archive { + BackendChoice::Use(b) => Some(b), + BackendChoice::UseDefault => Some(ArchiveBackend::new(rpc_client.clone())), + BackendChoice::DontUse => None, + } + } else { None }; + + let chainhead = if has_chainhead_methods { + match self.chainhead { + BackendChoice::Use(b) => Some(b), + BackendChoice::UseDefault => { + let (chainhead, chainhead_driver) = ChainHeadBackend::builder().build(rpc_client.clone()); + combined_driver.chainhead_driver = Some(chainhead_driver); + Some(chainhead) + }, + BackendChoice::DontUse => None, + } + } else { None }; + + let legacy = match self.legacy { + BackendChoice::Use(b) => Some(b), + BackendChoice::UseDefault => Some(LegacyBackend::builder().build(rpc_client.clone())), + BackendChoice::DontUse => None, + }; + + let combined = CombinedBackend { + archive, + chainhead, + legacy + }; + + Ok((combined, combined_driver)) + } + + /// An API to build the backend and driver which will run in the background until completion + /// on the default runtime. + /// + /// - On non-wasm targets, this will spawn the driver on `tokio`. + /// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`. + #[cfg(feature = "runtime")] + pub async fn build_with_background_driver(self, client: impl Into) -> Result, BackendError> { + let (backend, mut driver) = self.build(client).await?; + + super::utils::spawn(async move { + // NOTE: we need to poll the driver until it's done i.e returns None + // to ensure that the backend is shutdown properly. + while let Some(res) = driver.next().await { + if let Err(err) = res { + tracing::debug!(target: "subxt", "chainHead backend error={err}"); + } + } + + tracing::debug!(target: "subxt", "combined backend was closed"); + }); + + Ok(backend) + } +} + +/// Driver for the [`CombinedBackend`]. This needs to be polled to ensure +/// that the [`CombinedBackend`] can make progress. It does not need polling +/// if [`CombinedBackendDriver::needs_polling`] returns `false`. +pub struct CombinedBackendDriver { + chainhead_driver: Option> +} + +impl CombinedBackendDriver { + pub fn needs_polling(&self) -> bool { + self.chainhead_driver.is_some() + } +} + +impl Stream for CombinedBackendDriver { + type Item = as Stream>::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.chainhead_driver { + Some(driver) => driver.poll_next_unpin(cx), + None => Poll::Ready(None) + } + } +} + +/// A combined backend. This selects which RPC calls to use based on the `rpc_methods` +/// available from the given RPC client we're given. +pub struct CombinedBackend { + archive: Option>, + chainhead: Option>, + legacy: Option>, +} + +impl CombinedBackend { + /// Configure and construct a [`CombinedBackend`]. + pub fn builder() -> CombinedBackendBuilder { + CombinedBackendBuilder::new() + } +} + +impl super::sealed::Sealed for CombinedBackend {} + +static NO_AVAILABLE_BACKEND: &str = "No available RPC methods to use. `no_default_backends` was used, but no applicable backends were then provided."; + +macro_rules! call_backends { + ({$($backend_name:ident)|+}. $method_name:ident ( $($arg:expr),* )) => {{ + let mut err = BackendError::other(NO_AVAILABLE_BACKEND); + + $( + if let Some(backend) = &self.$backend_name { + err = match backend.$method_name($( $arg, )*).await { + Ok(res) => return Ok(res), + Err(e) => e + } + } + )+ + + return Err(err) + }} +} + +#[async_trait] +impl Backend for CombinedBackend { + async fn storage_fetch_values( + &self, + keys: Vec>, + at: HashFor, + ) -> Result, BackendError> { + call_backends!({archive|chainhead|legacy}.storage_fetch_values(keys, at)) + } + + async fn storage_fetch_descendant_keys( + &self, + key: Vec, + at: HashFor, + ) -> Result>, BackendError> { + + } + + async fn storage_fetch_descendant_values( + &self, + key: Vec, + at: HashFor, + ) -> Result, BackendError> { + + } + + async fn genesis_hash(&self) -> Result, BackendError> { + + } + + async fn block_header(&self, at: HashFor) -> Result, BackendError> { + + } + + async fn block_body(&self, at: HashFor) -> Result>>, BackendError> { + + } + + async fn latest_finalized_block_ref(&self) -> Result>, BackendError> { + + } + + async fn stream_all_block_headers( + &self, + _hasher: T::Hasher, + ) -> Result>)>, BackendError> { + + } + + async fn stream_best_block_headers( + &self, + _hasher: T::Hasher, + ) -> Result>)>, BackendError> { + + } + + async fn stream_finalized_block_headers( + &self, + _hasher: T::Hasher, + ) -> Result>)>, BackendError> { + + } + + async fn submit_transaction( + &self, + extrinsic: &[u8], + ) -> Result>>, BackendError> { + + } + + async fn call( + &self, + method: &str, + call_parameters: Option<&[u8]>, + at: HashFor, + ) -> Result, BackendError> { + + } +} diff --git a/new/src/backend/utils.rs b/new/src/backend/utils.rs index 54f6ba669f..ef120b5149 100644 --- a/new/src/backend/utils.rs +++ b/new/src/backend/utils.rs @@ -1,86 +1,24 @@ -//! RPC utils. +//! Backend utils. use super::{StreamOf, StreamOfResults}; use crate::error::BackendError; use futures::{FutureExt, Stream, StreamExt}; use std::{future::Future, pin::Pin, task::Poll}; -/// Resubscribe callback. -type ResubscribeGetter = Box ResubscribeFuture + Send>; - -/// Future that resolves to a subscription stream. -type ResubscribeFuture = - Pin, BackendError>> + Send>>; - -/// Retry subscription. -struct RetrySubscription { - resubscribe: F, - state: RetrySubscriptionState, -} - -enum RetrySubscriptionState { - Init, - Pending(R), - Stream(StreamOfResults), - Done, -} - -impl std::marker::Unpin for RetrySubscription {} - -impl Stream for RetrySubscription -where - F: FnMut() -> R, - R: Future, BackendError>> + Unpin, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - loop { - match &mut self.state { - RetrySubscriptionState::Init => { - self.state = RetrySubscriptionState::Pending((self.resubscribe)()); - }, - RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) { - Poll::Ready(Some(Err(err))) => { - if err.is_disconnected_will_reconnect() { - self.state = RetrySubscriptionState::Init; - } - return Poll::Ready(Some(Err(err))); - } - Poll::Ready(None) => { - return Poll::Ready(None) - } - Poll::Ready(Some(Ok(val))) => { - return Poll::Ready(Some(Ok(val))); - } - Poll::Pending => { - return Poll::Pending; - } - }, - RetrySubscriptionState::Pending(fut) => match fut.poll_unpin(cx) { - Poll::Ready(Err(err)) => { - if err.is_disconnected_will_reconnect() { - self.state = RetrySubscriptionState::Init; - } - return Poll::Ready(Some(Err(err))); - } - Poll::Ready(Ok(stream)) => { - self.state = RetrySubscriptionState::Stream(stream); - continue; - } - Poll::Pending => { - return Poll::Pending; - } - }, - RetrySubscriptionState::Done => { - return Poll::Ready(None) - } - }; - } - } +/// Spawn a task. +/// +/// - On non-wasm targets, this will spawn a task via [`tokio::spawn`]. +/// - On wasm targets, this will spawn a task via [`wasm_bindgen_futures::spawn_local`]. +#[cfg(feature = "runtime")] +pub(crate) fn spawn(future: F) { + #[cfg(not(target_family = "wasm"))] + tokio::spawn(async move { + future.await; + }); + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] + wasm_bindgen_futures::spawn_local(async move { + future.await; + }); } /// Retry a future until it doesn't return a disconnected error. @@ -180,6 +118,84 @@ where }))) } +/// Resubscribe callback. +type ResubscribeGetter = Box ResubscribeFuture + Send>; + +/// Future that resolves to a subscription stream. +type ResubscribeFuture = + Pin, BackendError>> + Send>>; + +/// Retry subscription. +struct RetrySubscription { + resubscribe: F, + state: RetrySubscriptionState, +} + +enum RetrySubscriptionState { + Init, + Pending(R), + Stream(StreamOfResults), + Done, +} + +impl std::marker::Unpin for RetrySubscription {} + +impl Stream for RetrySubscription +where + F: FnMut() -> R, + R: Future, BackendError>> + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + match &mut self.state { + RetrySubscriptionState::Init => { + self.state = RetrySubscriptionState::Pending((self.resubscribe)()); + }, + RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) { + Poll::Ready(Some(Err(err))) => { + if err.is_disconnected_will_reconnect() { + self.state = RetrySubscriptionState::Init; + } + return Poll::Ready(Some(Err(err))); + } + Poll::Ready(None) => { + return Poll::Ready(None) + } + Poll::Ready(Some(Ok(val))) => { + return Poll::Ready(Some(Ok(val))); + } + Poll::Pending => { + return Poll::Pending; + } + }, + RetrySubscriptionState::Pending(fut) => match fut.poll_unpin(cx) { + Poll::Ready(Err(err)) => { + if err.is_disconnected_will_reconnect() { + self.state = RetrySubscriptionState::Init; + } + return Poll::Ready(Some(Err(err))); + } + Poll::Ready(Ok(stream)) => { + self.state = RetrySubscriptionState::Stream(stream); + continue; + } + Poll::Pending => { + return Poll::Pending; + } + }, + RetrySubscriptionState::Done => { + return Poll::Ready(None) + } + }; + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -190,7 +206,7 @@ mod tests { } fn custom_err() -> BackendError { - BackendError::Other(String::new()) + BackendError::other("") } #[tokio::test] diff --git a/new/src/error.rs b/new/src/error.rs index 07b385fdef..b68057930b 100644 --- a/new/src/error.rs +++ b/new/src/error.rs @@ -8,6 +8,7 @@ mod dispatch_error; mod hex; use thiserror::Error as DeriveError; +use std::borrow::Cow; #[cfg(feature = "unstable-light-client")] pub use subxt_lightclient::LightClientError; @@ -276,7 +277,7 @@ pub enum BackendError { CouldNotDecodeMetadata(codec::Error), // This is for errors in `Backend` implementations which aren't any of the "pre-defined" set above: #[error("Custom backend error: {0}")] - Other(String), + Other(Cow<'static, str>), } impl BackendError { @@ -294,6 +295,11 @@ impl BackendError { pub fn is_rpc_limit_reached(&self) -> bool { matches!(self, BackendError::Rpc(RpcError::LimitReached)) } + + /// Create a [`BackendError::Other`] given a message. + pub fn other(message: impl Into>) -> Self { + BackendError::Other(message.into()) + } } impl From for BackendError {