From a3381207d27d1ff3961ebf20eea5e137643d18ee Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 1 Dec 2025 16:18:45 +0000 Subject: [PATCH] WIP: Combined backend done, client in progress --- new/src/backend.rs | 27 ++- new/src/backend/archive.rs | 21 +- new/src/backend/archive/storage_stream.rs | 6 +- new/src/backend/chain_head.rs | 8 +- new/src/backend/combined.rs | 216 +++++++++++++---- new/src/backend/legacy.rs | 23 +- new/src/backend/utils.rs | 8 +- new/src/client/offline_client.rs | 3 +- new/src/client/online_client.rs | 221 ++++++++---------- .../online_client/block_number_or_ref.rs | 41 ++++ new/src/config.rs | 4 +- new/src/config/polkadot.rs | 2 +- new/src/config/substrate.rs | 12 +- new/src/error.rs | 69 ++++-- 14 files changed, 439 insertions(+), 222 deletions(-) create mode 100644 new/src/client/online_client/block_number_or_ref.rs diff --git a/new/src/backend.rs b/new/src/backend.rs index 7ba400ec28..7a8b866095 100644 --- a/new/src/backend.rs +++ b/new/src/backend.rs @@ -21,6 +21,25 @@ use std::pin::Pin; use std::sync::Arc; use subxt_metadata::Metadata; +// Expose our various backends. +pub use chain_head::{ + ChainHeadBackend, + ChainHeadBackendBuilder, + ChainHeadBackendDriver, +}; +pub use archive::{ + ArchiveBackend +}; +pub use legacy::{ + LegacyBackend, + LegacyBackendBuilder +}; +pub use combined::{ + CombinedBackend, + CombinedBackendBuilder, + CombinedBackendDriver +}; + /// Prevent the backend trait being implemented externally. #[doc(hidden)] pub(crate) mod sealed { @@ -55,7 +74,13 @@ pub trait Backend: sealed::Sealed + Send + Sync + 'static { /// Fetch the genesis hash async fn genesis_hash(&self) -> Result, BackendError>; - /// Get a block header + /// Convert a block number to a hash. This should return `None` in the event that + /// multiple block hashes correspond to the given number (ie if the number is greater + /// than that of the latest finalized block and some forks exist). Nevertheless, it could + /// still return the hash to a block on some fork that is pruned. + async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError>; + + /// Get a block header. async fn block_header(&self, at: HashFor) -> Result, BackendError>; /// Return the extrinsics found in the block. Each extrinsic is represented diff --git a/new/src/backend/archive.rs b/new/src/backend/archive.rs index e3b5aae4f7..96c44cfff9 100644 --- a/new/src/backend/archive.rs +++ b/new/src/backend/archive.rs @@ -12,6 +12,7 @@ mod storage_stream; +use subxt_rpcs::methods::ChainHeadRpcMethods; use crate::backend::{ Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, utils::retry, @@ -26,20 +27,17 @@ use subxt_rpcs::methods::chain_head::{ }; use storage_stream::ArchiveStorageStream; -// Expose the RPC methods. -pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods as ArchiveRpcMethods; - /// The archive backend. #[derive(Debug, Clone)] pub struct ArchiveBackend { // RPC methods we'll want to call: - methods: ArchiveRpcMethods>, + methods: ChainHeadRpcMethods>, } impl ArchiveBackend { /// Configure and construct an [`ArchiveBackend`] and the associated [`ChainHeadBackendDriver`]. pub fn new(client: impl Into,) -> ArchiveBackend { - let methods = ArchiveRpcMethods::new(client.into()); + let methods = ChainHeadRpcMethods::new(client.into()); ArchiveBackend { methods } } @@ -123,6 +121,19 @@ impl Backend for ArchiveBackend { .await } + async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError> { + retry(|| async { + let mut hashes = self.methods.archive_v1_hash_by_height(number as usize).await?; + if let (Some(hash), None) = (hashes.pop(), hashes.pop()) { + // One hash; return it. + Ok(Some(BlockRef::from_hash(hash))) + } else { + // More than one; return None. + Ok(None) + } + }).await + } + async fn block_header(&self, at: HashFor) -> Result, BackendError> { retry(|| async { let header = self.methods.archive_v1_header(at).await?; diff --git a/new/src/backend/archive/storage_stream.rs b/new/src/backend/archive/storage_stream.rs index 994789bc2c..9fa37512e9 100644 --- a/new/src/backend/archive/storage_stream.rs +++ b/new/src/backend/archive/storage_stream.rs @@ -7,11 +7,11 @@ use futures::{FutureExt, Stream, StreamExt}; use std::task::{Context, Poll}; use crate::error::BackendError; use crate::config::{Config, HashFor, RpcConfigFor}; -use super::ArchiveRpcMethods; +use subxt_rpcs::methods::ChainHeadRpcMethods; pub struct ArchiveStorageStream { at: HashFor, - methods: ArchiveRpcMethods>, + methods: ChainHeadRpcMethods>, query_queue: VecDeque>>, state: Option>, } @@ -31,7 +31,7 @@ impl ArchiveStorageStream { /// Fetch descendant keys. pub fn new( at: HashFor, - methods: ArchiveRpcMethods>, + methods: ChainHeadRpcMethods>, query_queue: VecDeque>>, ) -> Self { Self { diff --git a/new/src/backend/chain_head.rs b/new/src/backend/chain_head.rs index f34ce28cae..64d7bcdad9 100644 --- a/new/src/backend/chain_head.rs +++ b/new/src/backend/chain_head.rs @@ -13,6 +13,7 @@ mod follow_stream_driver; mod follow_stream_unpin; mod storage_items; +use subxt_rpcs::methods::ChainHeadRpcMethods; use self::follow_stream_driver::FollowStreamFinalizedHeads; use crate::backend::{ Backend, BlockRef, BlockRefT, StorageResponse, StreamOf, StreamOfResults, @@ -32,9 +33,6 @@ use subxt_rpcs::methods::chain_head::{ FollowEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType, }; -// Expose the RPC methods. -pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods; - /// Configure and build an [`ChainHeadBackend`]. pub struct ChainHeadBackendBuilder { max_block_life: usize, @@ -375,6 +373,10 @@ impl Backend for ChainHeadBackend { .await } + async fn block_number_to_hash(&self, _number: u64) -> Result>>, BackendError> { + Err(BackendError::other("The ChainHead V1 RPCs do not support obtaining a block hash from a number.")) + } + async fn block_header(&self, at: HashFor) -> Result, BackendError> { retry(|| async { let sub_id = get_subscription_id(&self.follow_handle).await?; diff --git a/new/src/backend/combined.rs b/new/src/backend/combined.rs index 37806f072e..aae90543cc 100644 --- a/new/src/backend/combined.rs +++ b/new/src/backend/combined.rs @@ -8,19 +8,16 @@ use crate::backend::chain_head::ChainHeadBackendDriver; use crate::backend::{ legacy::LegacyBackend, - chain_head::{ChainHeadBackend, }, + chain_head::ChainHeadBackend, archive::ArchiveBackend, - Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, - TransactionStatus, utils::retry, + Backend, BlockRef, StorageResponse, StreamOfResults, + TransactionStatus, }; -use crate::config::{Config, HashFor, RpcConfigFor}; -use crate::error::BackendError; +use crate::config::{Config, HashFor}; +use crate::error::{BackendError, CombinedBackendError}; use async_trait::async_trait; use futures::StreamExt; use subxt_rpcs::RpcClient; -use subxt_rpcs::methods::chain_head::{ - ArchiveStorageQuery, ArchiveCallResult, StorageQueryType, -}; use futures::Stream; use std::task::Poll; @@ -89,13 +86,14 @@ impl CombinedBackendBuilder { /// /// If you just want to run the driver in the background until completion in on the default runtime, /// use [`CombinedBackendBuilder::build_with_background_driver`] instead. - pub async fn build(self, rpc_client: impl Into) -> Result<(CombinedBackend, CombinedBackendDriver), BackendError> { + pub async fn build(self, rpc_client: impl Into) -> Result<(CombinedBackend, CombinedBackendDriver), CombinedBackendError> { let rpc_client = rpc_client.into(); // What does the thing wer're talking to actually know about? let methods: Vec = rpc_client .request("rpc_methods", subxt_rpcs::rpc_params![]) - .await?; + .await + .map_err(CombinedBackendError::CouldNotObtainRpcMethodList)?; let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_")); let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1")); @@ -143,8 +141,10 @@ impl CombinedBackendBuilder { /// - On non-wasm targets, this will spawn the driver on `tokio`. /// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`. #[cfg(feature = "runtime")] - pub async fn build_with_background_driver(self, client: impl Into) -> Result, BackendError> { - let (backend, mut driver) = self.build(client).await?; + pub async fn build_with_background_driver(self, client: impl Into) -> Result, CombinedBackendError> { + let (backend, mut driver) = self + .build(client) + .await?; super::utils::spawn(async move { // NOTE: we need to poll the driver until it's done i.e returns None @@ -201,29 +201,41 @@ impl CombinedBackend { pub fn builder() -> CombinedBackendBuilder { CombinedBackendBuilder::new() } + + fn archive(&self) -> Option<&dyn Backend> { + self.archive.as_ref().map(|a| { + let a: &dyn Backend = a; + a + }) + } + + fn chainhead(&self) -> Option<&dyn Backend> { + self.chainhead.as_ref().map(|a| { + let a: &dyn Backend = a; + a + }) + } + + fn legacy(&self) -> Option<&dyn Backend> { + self.legacy.as_ref().map(|a| { + let a: &dyn Backend = a; + a + }) + } } impl super::sealed::Sealed for CombinedBackend {} -static NO_AVAILABLE_BACKEND: &str = "No available RPC methods to use. `no_default_backends` was used, but no applicable backends were then provided."; - -macro_rules! call_backends { - ({$($backend_name:ident)|+}. $method_name:ident ( $($arg:expr),* )) => {{ - let mut err = BackendError::other(NO_AVAILABLE_BACKEND); - - $( - if let Some(backend) = &self.$backend_name { - err = match backend.$method_name($( $arg, )*).await { - Ok(res) => return Ok(res), - Err(e) => e - } - } - )+ - - return Err(err) - }} -} - +// Our default behaviour: +// - Try the archive backend first if it's available. Why? It has all block headers/bodies +// etc so it's mroe likely to succeed than chainHead backend and give back things that won't +// expire. +// - If archive calls aren't available, fall back to the chainHead backend. Blocks given back +// by this are more likely to expire. +// - If neither exists / works, we fall back to the legacy methods. These have some limits on +// what is available (often fewer limits than chainHead though) but tend to do the job. We'd +// rather not use these as they are old and should go away, but until then they are a good +// fallback. #[async_trait] impl Backend for CombinedBackend { async fn storage_fetch_values( @@ -231,7 +243,13 @@ impl Backend for CombinedBackend { keys: Vec>, at: HashFor, ) -> Result, BackendError> { - call_backends!({archive|chainhead|legacy}.storage_fetch_values(keys, at)) + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.storage_fetch_values(keys.clone(), at).await + }).await } async fn storage_fetch_descendant_keys( @@ -239,7 +257,13 @@ impl Backend for CombinedBackend { key: Vec, at: HashFor, ) -> Result>, BackendError> { - + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.storage_fetch_descendant_keys(key.clone(), at).await + }).await } async fn storage_fetch_descendant_values( @@ -247,51 +271,123 @@ impl Backend for CombinedBackend { key: Vec, at: HashFor, ) -> Result, BackendError> { - + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.storage_fetch_descendant_values(key.clone(), at).await + }).await } async fn genesis_hash(&self) -> Result, BackendError> { + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.genesis_hash().await + }).await + } + async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError> { + try_backends(&[ + self.archive(), + self.legacy(), + // chainHead last as it cannot handle this request and will fail, so it's here + // just to hand back a more relevant error in case the above two backends aren't + // enabled or have some issue. + self.chainhead() + ], async |b: &dyn Backend| { + b.block_number_to_hash(number).await + }).await } async fn block_header(&self, at: HashFor) -> Result, BackendError> { - + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.block_header(at).await + }).await } async fn block_body(&self, at: HashFor) -> Result>>, BackendError> { - + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.block_body(at).await + }).await } async fn latest_finalized_block_ref(&self) -> Result>, BackendError> { - + try_backends(&[ + // Prioritize chainHead backend since it's streaming these things; save another call. + self.chainhead(), + self.archive(), + self.legacy() + ], async |b: &dyn Backend| { + b.latest_finalized_block_ref().await + }).await } async fn stream_all_block_headers( &self, - _hasher: T::Hasher, + hasher: T::Hasher, ) -> Result>)>, BackendError> { - + try_backends(&[ + // Ignore archive backend; it doesn't support this. + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.stream_all_block_headers(hasher.clone()).await + }).await } async fn stream_best_block_headers( &self, - _hasher: T::Hasher, + hasher: T::Hasher, ) -> Result>)>, BackendError> { - + try_backends(&[ + // Ignore archive backend; it doesn't support this. + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.stream_best_block_headers(hasher.clone()).await + }).await } async fn stream_finalized_block_headers( &self, - _hasher: T::Hasher, + hasher: T::Hasher, ) -> Result>)>, BackendError> { - + try_backends(&[ + // Ignore archive backend; it doesn't support this. + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.stream_finalized_block_headers(hasher.clone()).await + }).await } async fn submit_transaction( &self, extrinsic: &[u8], ) -> Result>>, BackendError> { - + try_backends(&[ + // chainHead first as it does the same as the archive backend, but with better + // guarantees around the block handed back being pinned & ready to access. + self.chainhead(), + self.legacy(), + // archive last just incase chainHead & legacy fail or aren't provided for some + // reason. + self.archive(), + ], async |b: &dyn Backend| { + b.submit_transaction(extrinsic).await + }).await } async fn call( @@ -300,6 +396,36 @@ impl Backend for CombinedBackend { call_parameters: Option<&[u8]>, at: HashFor, ) -> Result, BackendError> { - + try_backends(&[ + self.archive(), + self.chainhead(), + self.legacy() + ], async |b: &dyn Backend| { + b.call(method, call_parameters, at).await + }).await } } + +/// Call one backend after the other in the list until we get a successful result back. +async fn try_backends<'s, 'b, T, Func, Fut, O>( + backends: &'s [Option<&'b dyn Backend>], + mut f: Func +) -> Result +where + 'b: 's, + T: Config, + Func: FnMut(&'b dyn Backend) -> Fut, + Fut: Future> + 'b, +{ + static NO_AVAILABLE_BACKEND: &str = "None of the configured backends are capable of handling this request"; + let mut err = BackendError::other(NO_AVAILABLE_BACKEND); + + for backend in backends.into_iter().filter_map(|b| *b) { + match f(backend).await { + Ok(res) => return Ok(res), + Err(e) => { err = e } + } + } + + Err(err) +} \ No newline at end of file diff --git a/new/src/backend/legacy.rs b/new/src/backend/legacy.rs index 75c62eee17..b8b853e33a 100644 --- a/new/src/backend/legacy.rs +++ b/new/src/backend/legacy.rs @@ -7,7 +7,7 @@ mod descendant_streams; -use self::rpc_methods::TransactionStatus as RpcTransactionStatus; +use subxt_rpcs::methods::legacy::{ TransactionStatus as RpcTransactionStatus, LegacyRpcMethods }; use crate::backend::utils::{retry, retry_stream}; use crate::backend::{ Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, @@ -19,17 +19,10 @@ use async_trait::async_trait; use futures::TryStreamExt; use futures::{Future, Stream, StreamExt, future, future::Either, stream}; use subxt_rpcs::RpcClient; +use subxt_rpcs::methods::legacy::NumberOrHex; use codec::Encode; use descendant_streams::{StorageFetchDescendantKeysStream, StorageFetchDescendantValuesStream}; -/// Re-export legacy RPC types and methods from [`subxt_rpcs::methods::legacy`]. -pub mod rpc_methods { - pub use subxt_rpcs::methods::legacy::*; -} - -// Expose the RPC methods. -pub use rpc_methods::LegacyRpcMethods; - /// Configure and build an [`LegacyBackend`]. pub struct LegacyBackendBuilder { storage_page_size: u32, @@ -184,6 +177,18 @@ impl Backend for LegacyBackend { .await } + async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError> { + retry(|| async { + let number_or_hash = NumberOrHex::Number(number); + let hash = self + .methods + .chain_get_block_hash(Some(number_or_hash)) + .await? + .map(BlockRef::from_hash); + Ok(hash) + }).await + } + async fn block_header(&self, at: HashFor) -> Result, BackendError> { retry(|| async { let header = self.methods.chain_get_header(Some(at)).await?; diff --git a/new/src/backend/utils.rs b/new/src/backend/utils.rs index ef120b5149..18d51969f7 100644 --- a/new/src/backend/utils.rs +++ b/new/src/backend/utils.rs @@ -118,13 +118,6 @@ where }))) } -/// Resubscribe callback. -type ResubscribeGetter = Box ResubscribeFuture + Send>; - -/// Future that resolves to a subscription stream. -type ResubscribeFuture = - Pin, BackendError>> + Send>>; - /// Retry subscription. struct RetrySubscription { resubscribe: F, @@ -164,6 +157,7 @@ where return Poll::Ready(Some(Err(err))); } Poll::Ready(None) => { + self.state = RetrySubscriptionState::Done; return Poll::Ready(None) } Poll::Ready(Some(Ok(val))) => { diff --git a/new/src/client/offline_client.rs b/new/src/client/offline_client.rs index 1ac0a41a11..ebdf6a8068 100644 --- a/new/src/client/offline_client.rs +++ b/new/src/client/offline_client.rs @@ -22,8 +22,9 @@ impl OfflineClient { /// [`OfflineClient`] it's called on, and so cannot outlive it. pub fn at( &self, - block_number: u32, + block_number: impl Into, ) -> Result, OfflineClientAtBlockError> { + let block_number = block_number.into(); let spec_version = self .config .spec_version_for_block_number(block_number) diff --git a/new/src/client/online_client.rs b/new/src/client/online_client.rs index e2afa6c0b3..dc35f2a40f 100644 --- a/new/src/client/online_client.rs +++ b/new/src/client/online_client.rs @@ -1,11 +1,13 @@ +mod block_number_or_ref; + use super::ClientAtBlock; use super::OfflineClientAtBlockT; +use crate::config::Header; use crate::config::{ Config, HashFor, RpcConfigFor }; use crate::error::OnlineClientAtBlockError; -use crate::backend::Backend; +use crate::backend::{ Backend, CombinedBackend, BlockRef }; use codec::{Compact, Decode, Encode}; use frame_metadata::{RuntimeMetadata, RuntimeMetadataPrefixed}; -use scale_info_legacy::TypeRegistrySet; use std::sync::Arc; use subxt_rpcs::methods::chain_head::ArchiveCallResult; use subxt_rpcs::{ChainHeadRpcMethods, RpcClient}; @@ -15,6 +17,8 @@ use subxt_metadata::Metadata; #[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))] use crate::error::OnlineClientError; +pub use block_number_or_ref::BlockNumberOrRef; + /// A client which exposes the means to decode historic data on a chain online. #[derive(Clone, Debug)] pub struct OnlineClient { @@ -42,14 +46,14 @@ impl OnlineClient { /// point to a locally running node on `ws://127.0.0.1:9944`. /// /// **Note:** This will only work if the local node is an archive node. - #[cfg(feature = "jsonrpsee")] + #[cfg(all(feature = "jsonrpsee", feature = "runtime"))] pub async fn new(config: T) -> Result, OnlineClientError> { let url = "ws://127.0.0.1:9944"; OnlineClient::from_url(config, url).await } /// Construct a new [`OnlineClient`], providing a URL to connect to. - #[cfg(feature = "jsonrpsee")] + #[cfg(all(feature = "jsonrpsee", feature = "runtime"))] pub async fn from_url( config: T, url: impl AsRef, @@ -69,13 +73,13 @@ impl OnlineClient { /// Construct a new [`OnlineClient`], providing a URL to connect to. /// /// Allows insecure URLs without SSL encryption, e.g. (http:// and ws:// URLs). - #[cfg(feature = "jsonrpsee")] + #[cfg(all(feature = "jsonrpsee", feature = "runtime"))] pub async fn from_insecure_url( config: T, url: impl AsRef, ) -> Result, OnlineClientError> { let rpc_client = RpcClient::from_insecure_url(url).await?; - Ok(OnlineClient::from_rpc_client(config, rpc_client)) + OnlineClient::from_rpc_client(config, rpc_client).await } fn is_url_secure(url: &url::Url) -> bool { @@ -90,19 +94,23 @@ impl OnlineClient { /// Construct a new [`OnlineClient`] by providing an [`RpcClient`] to drive the connection. /// This will use the current default [`Backend`], which may change in future releases. - #[cfg(feature = "jsonrpsee")] - pub fn from_rpc_client( + #[cfg(all(feature = "jsonrpsee", feature = "runtime"))] + pub async fn from_rpc_client( config: T, rpc_client: impl Into, - ) -> OnlineClient { + ) -> Result, OnlineClientError> { let rpc_client = rpc_client.into(); - let backend = Arc::new(LegacyBackend::builder().build(rpc_client)); - OnlineClient::from_backend(config, backend) + let backend = CombinedBackend::builder() + .build_with_background_driver(rpc_client) + .await + .map_err(OnlineClientError::CannotBuildCombinedBackend)?; + let backend: Arc> = Arc::new(backend); + Ok(OnlineClient::from_backend(config, backend)) } /// Construct a new [`OnlineClient`] by providing an underlying [`Backend`] /// implementation to power it. - pub fn from_backend>( + pub fn from_backend( config: T, backend: impl Into>>, ) -> OnlineClient { @@ -118,56 +126,86 @@ impl OnlineClient { /// [`OnlineClient`] it's called on, and so cannot outlive it. pub async fn at_block( &self, - block_number: u32, + number_or_hash: impl Into>, ) -> Result, T>, OnlineClientAtBlockError> { - let config = &self.inner.config; - let rpc_methods = &self.inner.rpc_methods; + let number_or_hash = number_or_hash.into(); - let block_hash = rpc_methods - .archive_v1_hash_by_height(block_number as usize) - .await - .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash { - block_number, - reason: e, - })? - .pop() - .ok_or_else(|| OnlineClientAtBlockError::BlockNotFound { block_number })? - .into(); - - // Get our configuration, or fetch from the node if not available. - let spec_version = - if let Some(spec_version) = config.spec_version_for_block_number(block_number) { - spec_version - } else { - // Fetch spec version. Caching this doesn't really make sense, so either - // details are provided offline or we fetch them every time. - get_spec_version(rpc_methods, block_hash).await? - }; - let metadata = if let Some(metadata) = config.metadata_for_spec_version(spec_version) { - metadata - } else { - // Fetch and then give our config the opportunity to cache this metadata. - let metadata = get_metadata(rpc_methods, block_hash).await?; - let metadata = Arc::new(metadata); - config.set_metadata_for_spec_version(spec_version, metadata.clone()); - metadata + // We are given either a block hash or number. We need both. + let (block_ref, block_num) = match number_or_hash { + BlockNumberOrRef::BlockRef(block_ref) => { + let block_hash = block_ref.hash(); + let block_header = self + .inner + .backend + .block_header(block_hash) + .await + .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHeader { + block_hash: block_hash.into(), + reason: e + })? + .ok_or(OnlineClientAtBlockError::BlockHeaderNotFound { + block_hash: block_hash.into() + })?; + (block_ref, block_header.number()) + }, + BlockNumberOrRef::Number(block_num) => { + let block_ref = self + .inner + .backend + .block_number_to_hash(block_num) + .await + .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash { + block_number: block_num, + reason: e + })? + .ok_or(OnlineClientAtBlockError::BlockNotFound { + block_number: block_num + })?; + (block_ref, block_num) + } }; - let mut historic_types = config.legacy_types_for_spec_version(spec_version); - // The metadata can be used to construct call and event types instead of us having to hardcode them all for every spec version: - let types_from_metadata = frame_decode::helpers::type_registry_from_metadata_any(&metadata) - .map_err( - |parse_error| OnlineClientAtBlockError::CannotInjectMetadataTypes { parse_error }, - )?; - historic_types.prepend(types_from_metadata); + // Obtain the spec version so that we know which metadata to use at this block. + let spec_version = match self.inner.config.spec_version_for_block_number(block_num) { + Some(version) => version, + None => { + let block_hash = block_ref.hash(); + let spec_version_bytes = self + .inner + .backend + .call("Core_version", None, block_hash) + .await + .map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion { + block_hash: block_hash.into(), + reason: e + })?; - Ok(ClientAtBlock::new(OnlineClientAtBlock { - config, - historic_types, - metadata, - rpc_methods, - block_hash, - })) + #[derive(codec::Decode)] + struct SpecVersionHeader { + _spec_name: String, + _impl_name: String, + _authoring_version: u32, + spec_version: u32, + } + SpecVersionHeader::decode(&mut &spec_version_bytes[..]) + .map_err(|e| OnlineClientAtBlockError::CannotDecodeSpecVersion { + block_hash: block_hash.into(), + reason: e, + })? + .spec_version + } + }; + + // Obtain the metadata for the block, allowing our config to cache it. + let metadata = match self.inner.config.metadata_for_spec_version(spec_version) { + Some(metadata) => metadata, + None => { + //self.inner.backend. + todo!() + } + }; + + todo!() } } @@ -189,7 +227,7 @@ pub struct OnlineClientAtBlock { metadata: Arc, backend: Arc>, hasher: T::Hasher, - block_hash: HashFor, + block_ref: BlockRef>, } impl OnlineClientAtBlockT for OnlineClientAtBlock { @@ -197,7 +235,7 @@ impl OnlineClientAtBlockT for OnlineClientAtBlock { &*self.backend } fn block_hash(&self) -> HashFor { - self.block_hash + self.block_ref.hash() } } @@ -207,66 +245,15 @@ impl OfflineClientAtBlockT for OnlineClientAtBlock { } } -async fn get_spec_version( - rpc_methods: &ChainHeadRpcMethods>, - block_hash: HashFor, -) -> Result { - use codec::Decode; - use subxt_rpcs::methods::chain_head::ArchiveCallResult; - - // make a runtime call to get the version information. This is also a constant - // in the metadata and so we could fetch it from there to avoid the call, but it would be a - // bit more effort. - let spec_version_bytes = { - let call_res = rpc_methods - .archive_v1_call(block_hash.into(), "Core_version", &[]) - .await - .map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion { - block_hash: block_hash.to_string(), - reason: format!("Error calling Core_version: {e}"), - })?; - match call_res { - ArchiveCallResult::Success(bytes) => bytes.0, - ArchiveCallResult::Error(e) => { - return Err(OnlineClientAtBlockError::CannotGetSpecVersion { - block_hash: block_hash.to_string(), - reason: format!("Core_version returned an error: {e}"), - }); - } - } - }; - - // We only care about the spec version, so just decode enough of this version information - // to be able to pluck out what we want, and ignore the rest. - let spec_version = { - #[derive(codec::Decode)] - struct SpecVersionHeader { - _spec_name: String, - _impl_name: String, - _authoring_version: u32, - spec_version: u32, - } - SpecVersionHeader::decode(&mut &spec_version_bytes[..]) - .map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion { - block_hash: block_hash.to_string(), - reason: format!("Error decoding Core_version response: {e}"), - })? - .spec_version - }; - - Ok(spec_version) -} - async fn get_metadata( - rpc_methods: &ChainHeadRpcMethods>, + backend: &dyn Backend, block_hash: HashFor, ) -> Result { // First, try to use the "modern" metadata APIs to get the most recent version we can. - let version_to_get = rpc_methods - .archive_v1_call(block_hash.into(), "Metadata_metadata_versions", &[]) + let version_to_get = backend + .call("Metadata_metadata_versions", None, block_hash) .await .ok() - .and_then(|res| res.as_success()) .and_then(|res| >::decode(&mut &res[..]).ok()) .and_then(|versions| { // We want to filter out the "unstable" version, which is represented by u32::MAX. @@ -276,12 +263,8 @@ async fn get_metadata( // We had success calling the above API, so we expect the "modern" metadata API to work. if let Some(version_to_get) = version_to_get { let version_bytes = version_to_get.encode(); - let rpc_response = rpc_methods - .archive_v1_call( - block_hash.into(), - "Metadata_metadata_at_version", - &version_bytes, - ) + let rpc_response = backend + .call("Metadata_metadata_at_version", Some(&version_bytes), block_hash) .await .map_err(|e| OnlineClientAtBlockError::CannotGetMetadata { block_hash: block_hash.to_string(), diff --git a/new/src/client/online_client/block_number_or_ref.rs b/new/src/client/online_client/block_number_or_ref.rs new file mode 100644 index 0000000000..93b3837659 --- /dev/null +++ b/new/src/client/online_client/block_number_or_ref.rs @@ -0,0 +1,41 @@ +use crate::config::{ Config, HashFor, Hasher }; +use crate::backend::BlockRef; + +/// This represents either a block number or a reference +/// to a block, which is essentially a block hash. +pub enum BlockNumberOrRef { + /// A block number. + Number(u64), + /// A block ref / hash. + BlockRef(BlockRef>) +} + +impl From for BlockNumberOrRef { + fn from(value: u32) -> Self { + BlockNumberOrRef::Number(value.into()) + } +} + +impl From for BlockNumberOrRef { + fn from(value: u64) -> Self { + BlockNumberOrRef::Number(value) + } +} + +impl From>> for BlockNumberOrRef { + fn from(block_ref: BlockRef>) -> Self { + BlockNumberOrRef::BlockRef(block_ref) + } +} + +// Ideally we'd have `impl From> for BlockNumberOrRef` but since our config +// could set _any_ hash type, this boils down to `impl From for ..` which is too general. +// Thus, we target our current concrete hash type. +impl From for BlockNumberOrRef +where + ::Hash: From +{ + fn from(hash: crate::config::substrate::H256) -> Self { + BlockNumberOrRef::BlockRef(BlockRef::from_hash(hash.into())) + } +} diff --git a/new/src/config.rs b/new/src/config.rs index 2c1c2a5461..0e13713bfb 100644 --- a/new/src/config.rs +++ b/new/src/config.rs @@ -63,7 +63,7 @@ pub trait Config: Clone + Debug + Sized + Send + Sync + 'static { /// /// The [`crate::client::OnlineClient`] will look this up on chain if it's not available here, /// but the [`crate::client::OfflineClient`] will error if this is not available for the required block number. - fn spec_version_for_block_number(&self, _block_number: u32) -> Option { + fn spec_version_for_block_number(&self, _block_number: u64) -> Option { None } @@ -170,5 +170,5 @@ pub trait Hasher: Debug + Clone + Send + Sync + 'static { /// This represents the block header type used by a node. pub trait Header: Sized + Encode + Decode + Debug + Sync + Send + DeserializeOwned + Clone { /// Return the block number of this header. - fn number(&self) -> u32; + fn number(&self) -> u64; } diff --git a/new/src/config/polkadot.rs b/new/src/config/polkadot.rs index 88ba9507fc..d64e0a0364 100644 --- a/new/src/config/polkadot.rs +++ b/new/src/config/polkadot.rs @@ -75,7 +75,7 @@ impl Config for PolkadotConfig { self.0.legacy_types_for_spec_version(spec_version) } - fn spec_version_for_block_number(&self, block_number: u32) -> Option { + fn spec_version_for_block_number(&self, block_number: u64) -> Option { self.0.spec_version_for_block_number(block_number) } diff --git a/new/src/config/substrate.rs b/new/src/config/substrate.rs index 96990f517b..29bb0dc3fe 100644 --- a/new/src/config/substrate.rs +++ b/new/src/config/substrate.rs @@ -20,7 +20,7 @@ use std::sync::Mutex; /// Construct a [`SubstrateConfig`] using this. pub struct SubstrateConfigBuilder { legacy_types: Option, - spec_version_for_block_number: RangeMap, + spec_version_for_block_number: RangeMap, metadata_for_spec_version: Mutex>>, use_old_v9_hashers_before_spec_version: u32, } @@ -107,7 +107,7 @@ impl SubstrateConfigBuilder { pub struct SpecVersionForRange { /// The block range that this spec version applies to. Inclusive of the start /// and exclusive of the enc. - pub block_range: std::ops::Range, + pub block_range: std::ops::Range, /// The spec version at this block range. pub spec_version: u32, } @@ -122,7 +122,7 @@ pub struct SubstrateConfig { #[derive(Debug)] struct SubstrateConfigInner { legacy_types: Option, - spec_version_for_block_number: RangeMap, + spec_version_for_block_number: RangeMap, metadata_for_spec_version: Mutex>>, } @@ -149,7 +149,7 @@ impl Config for SubstrateConfig { .map(|types| types.for_spec_version(spec_version as u64)) } - fn spec_version_for_block_number(&self, block_number: u32) -> Option { + fn spec_version_for_block_number(&self, block_number: u64) -> Option { self.inner.spec_version_for_block_number .get(block_number) .copied() @@ -268,7 +268,7 @@ pub struct SubstrateHeader { deserialize_with = "deserialize_number" )] #[codec(compact)] - pub number: u32, + pub number: u64, /// The state trie merkle root pub state_root: Hash, /// The merkle root of the extrinsics. @@ -282,7 +282,7 @@ where H: Hash, SubstrateHeader: Encode + Decode, { - fn number(&self) -> u32 { + fn number(&self) -> u64 { self.number.into() } } diff --git a/new/src/error.rs b/new/src/error.rs index b68057930b..8886efc5b6 100644 --- a/new/src/error.rs +++ b/new/src/error.rs @@ -171,7 +171,7 @@ pub enum OfflineClientAtBlockError { )] SpecVersionNotFound { /// The block number for which the spec version was not found. - block_number: u32, + block_number: u64, }, #[error( "Cannot construct OfflineClientAtBlock: metadata not found for spec version {spec_version}" @@ -193,6 +193,8 @@ pub enum OnlineClientError { }, #[error("Cannot construct OnlineClient: {0}")] RpcError(#[from] subxt_rpcs::Error), + #[error("Could not construct the CombinedBackend: {0}")] + CannotBuildCombinedBackend(CombinedBackendError), #[error( "Cannot construct OnlineClient: Cannot fetch latest finalized block to obtain init details from: {0}" )] @@ -229,38 +231,57 @@ pub enum OnlineClientAtBlockError { /// Block number we failed to get the hash for. block_number: u64, /// The error we encountered. - reason: subxt_rpcs::Error, + reason: BackendError, }, #[error("Cannot construct OnlineClientAtBlock: block number {block_number} not found")] BlockNotFound { /// The block number for which a block was not found. block_number: u64, }, - #[error( - "Cannot construct OnlineClientAtBlock: failed to get spec version for block hash {block_hash}: {reason}" - )] + #[error("Cannot construct OnlineClientAtBlock: cannot get the block header for block {block_hash}: {reason}")] + CannotGetBlockHeader { + /// Block hash that we failed to fetch the header for. + block_hash: Hex, + /// The error we encountered. + reason: BackendError, + }, + #[error("Cannot construct OnlineClientAtBlock: cannot find the block header for block {block_hash}")] + BlockHeaderNotFound { + /// Block hash that we failed to find the header for. + block_hash: Hex, + }, + #[error("Cannot construct OnlineClientAtBlock: failed to obtain spec version for block {block_hash}: {reason}")] CannotGetSpecVersion { - /// The block hash for which we failed to get the spec version. - block_hash: String, + /// The block hash for which we failed to obtain the spec version. + block_hash: Hex, /// The error we encountered. - reason: String, + reason: BackendError, }, #[error( - "Cannot construct OnlineClientAtBlock: failed to get metadata for block hash {block_hash}: {reason}" + "Cannot construct OnlineClientAtBlock: failed to decode spec version for block {block_hash}: {reason}" )] - CannotGetMetadata { - /// The block hash for which we failed to get the metadata. - block_hash: String, + CannotDecodeSpecVersion { + /// The block hash for which we failed to decode the spec version. + block_hash: Hex, /// The error we encountered. - reason: String, - }, - #[error( - "Cannot inject types from metadata: failure to parse a type found in the metadata: {parse_error}" - )] - CannotInjectMetadataTypes { - /// Error parsing a type found in the metadata. - parse_error: scale_info_legacy::lookup_name::ParseError, + reason: codec::Error, }, + // #[error( + // "Cannot construct OnlineClientAtBlock: failed to get metadata for block {block_hash}: {reason}" + // )] + // CannotGetMetadata { + // /// The block hash for which we failed to get the metadata. + // block_hash: Hex, + // /// The error we encountered. + // reason: String, + // }, + // #[error( + // "Cannot construct OnlineClientAtBlock: cannot inject types from metadata: failure to parse a type found in the metadata: {parse_error}" + // )] + // CannotInjectMetadataTypes { + // /// Error parsing a type found in the metadata. + // parse_error: scale_info_legacy::lookup_name::ParseError, + // }, } #[derive(Debug, thiserror::Error)] @@ -308,6 +329,14 @@ impl From for BackendError { } } +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +#[allow(missing_docs)] +pub enum CombinedBackendError { + #[error("Could not obtain the list of RPC methods to determine which backends can be used")] + CouldNotObtainRpcMethodList(subxt_rpcs::Error) +} + /// An RPC error. Since we are generic over the RPC client that is used, /// the error is boxed and could be casted. #[derive(Debug, thiserror::Error)]