From e4028e5a774b76ed9fa167d7756174ab59272f0d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 2 Dec 2025 13:11:36 +0000 Subject: [PATCH] Add ability to stream blocks and tidy errors --- historic/src/storage.rs | 12 +- new/src/backend.rs | 32 +- new/src/backend/archive.rs | 118 ++++--- new/src/backend/archive/storage_stream.rs | 115 +++--- new/src/backend/chain_head.rs | 58 +-- new/src/backend/chain_head/follow_stream.rs | 6 +- new/src/backend/combined.rs | 268 +++++++------- new/src/backend/legacy.rs | 21 +- new/src/backend/legacy/descendant_streams.rs | 12 +- new/src/backend/utils.rs | 12 +- new/src/client.rs | 28 +- new/src/client/offline_client.rs | 28 +- new/src/client/online_client.rs | 334 +++++++++++------- .../online_client/block_number_or_ref.rs | 16 +- new/src/client/online_client/blocks.rs | 76 ++++ new/src/config.rs | 31 +- new/src/config/extrinsic_params.rs | 4 +- new/src/config/polkadot.rs | 15 +- new/src/config/substrate.rs | 31 +- new/src/config/transaction_extensions.rs | 4 +- new/src/error.rs | 280 ++++++++------- new/src/error/dispatch_error.rs | 2 +- new/src/lib.rs | 4 +- new/src/utils.rs | 4 +- new/src/utils/range_map.rs | 6 +- new/src/utils/unchecked_extrinsic.rs | 6 +- rpcs/src/methods/chain_head.rs | 8 +- 27 files changed, 888 insertions(+), 643 deletions(-) create mode 100644 new/src/client/online_client/blocks.rs diff --git a/historic/src/storage.rs b/historic/src/storage.rs index 2d818b9b4e..cf09303c47 100644 --- a/historic/src/storage.rs +++ b/historic/src/storage.rs @@ -216,7 +216,7 @@ where > { use futures::stream::StreamExt; use subxt_rpcs::methods::chain_head::{ - ArchiveStorageEvent, StorageQuery, StorageQueryType, + ArchiveStorageEvent, ArchiveStorageQuery, StorageQueryType, }; let expected_num_keys = with_info!(info = &*self.info => { @@ -235,9 +235,10 @@ where let block_hash = self.client.block_hash(); let key_bytes = self.key(keys)?; - let items = std::iter::once(StorageQuery { + let items = std::iter::once(ArchiveStorageQuery { key: &*key_bytes, query_type: StorageQueryType::DescendantsValues, + pagination_start_key: None, }); let sub = self @@ -297,11 +298,14 @@ where T: Config + 'atblock, Client: OnlineClientAtBlockT<'atblock, T>, { - use subxt_rpcs::methods::chain_head::{ArchiveStorageEvent, StorageQuery, StorageQueryType}; + use subxt_rpcs::methods::chain_head::{ + ArchiveStorageEvent, ArchiveStorageQuery, StorageQueryType, + }; - let query = StorageQuery { + let query = ArchiveStorageQuery { key: key_bytes, query_type: StorageQueryType::Value, + pagination_start_key: None, }; let mut response_stream = client diff --git a/new/src/backend.rs b/new/src/backend.rs index 7a8b866095..7a324c7bc4 100644 --- a/new/src/backend.rs +++ b/new/src/backend.rs @@ -6,10 +6,10 @@ //! the necessary information (probably from a JSON-RPC API, but that's up to the //! implementation). -mod chain_head; mod archive; -mod legacy; +mod chain_head; mod combined; +mod legacy; mod utils; use crate::config::{Config, HashFor}; @@ -22,23 +22,10 @@ use std::sync::Arc; use subxt_metadata::Metadata; // Expose our various backends. -pub use chain_head::{ - ChainHeadBackend, - ChainHeadBackendBuilder, - ChainHeadBackendDriver, -}; -pub use archive::{ - ArchiveBackend -}; -pub use legacy::{ - LegacyBackend, - LegacyBackendBuilder -}; -pub use combined::{ - CombinedBackend, - CombinedBackendBuilder, - CombinedBackendDriver -}; +pub use archive::ArchiveBackend; +pub use chain_head::{ChainHeadBackend, ChainHeadBackendBuilder, ChainHeadBackendDriver}; +pub use combined::{CombinedBackend, CombinedBackendBuilder, CombinedBackendDriver}; +pub use legacy::{LegacyBackend, LegacyBackendBuilder}; /// Prevent the backend trait being implemented externally. #[doc(hidden)] @@ -77,8 +64,11 @@ pub trait Backend: sealed::Sealed + Send + Sync + 'static { /// Convert a block number to a hash. This should return `None` in the event that /// multiple block hashes correspond to the given number (ie if the number is greater /// than that of the latest finalized block and some forks exist). Nevertheless, it could - /// still return the hash to a block on some fork that is pruned. - async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError>; + /// still return the hash to a block on some fork that is pruned. + async fn block_number_to_hash( + &self, + number: u64, + ) -> Result>>, BackendError>; /// Get a block header. async fn block_header(&self, at: HashFor) -> Result, BackendError>; diff --git a/new/src/backend/archive.rs b/new/src/backend/archive.rs index 96c44cfff9..6dd8b91252 100644 --- a/new/src/backend/archive.rs +++ b/new/src/backend/archive.rs @@ -12,20 +12,17 @@ mod storage_stream; -use subxt_rpcs::methods::ChainHeadRpcMethods; use crate::backend::{ - Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, - TransactionStatus, utils::retry, + Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, utils::retry, }; use crate::config::{Config, HashFor, RpcConfigFor}; use crate::error::BackendError; use async_trait::async_trait; use futures::StreamExt; -use subxt_rpcs::RpcClient; -use subxt_rpcs::methods::chain_head::{ - ArchiveStorageQuery, ArchiveCallResult, StorageQueryType, -}; use storage_stream::ArchiveStorageStream; +use subxt_rpcs::RpcClient; +use subxt_rpcs::methods::ChainHeadRpcMethods; +use subxt_rpcs::methods::chain_head::{ArchiveCallResult, ArchiveStorageQuery, StorageQueryType}; /// The archive backend. #[derive(Debug, Clone)] @@ -36,7 +33,7 @@ pub struct ArchiveBackend { impl ArchiveBackend { /// Configure and construct an [`ArchiveBackend`] and the associated [`ChainHeadBackendDriver`]. - pub fn new(client: impl Into,) -> ArchiveBackend { + pub fn new(client: impl Into) -> ArchiveBackend { let methods = ChainHeadRpcMethods::new(client.into()); ArchiveBackend { methods } @@ -50,7 +47,8 @@ impl Backend for ArchiveBackend { keys: Vec>, at: HashFor, ) -> Result, BackendError> { - let queries = keys.into_iter() + let queries = keys + .into_iter() .map(|key| ArchiveStorageQuery { key: key, query_type: StorageQueryType::Value, @@ -58,12 +56,17 @@ impl Backend for ArchiveBackend { }) .collect(); - let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| { - match item { + let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries) + .map(|item| match item { Err(e) => Some(Err(e)), - Ok(item) => item.value.map(|val| Ok(StorageResponse { key: item.key.0, value: val.0 })) - } - }).filter_map(async |item| item); + Ok(item) => item.value.map(|val| { + Ok(StorageResponse { + key: item.key.0, + value: val.0, + }) + }), + }) + .filter_map(async |item| item); Ok(StreamOf(Box::pin(stream))) } @@ -74,19 +77,18 @@ impl Backend for ArchiveBackend { at: HashFor, ) -> Result>, BackendError> { let queries = std::iter::once(ArchiveStorageQuery { - key: key, - // Just ask for the hash and then ignore it and return keys - query_type: StorageQueryType::DescendantsHashes, - pagination_start_key: None, - }) - .collect(); + key: key, + // Just ask for the hash and then ignore it and return keys + query_type: StorageQueryType::DescendantsHashes, + pagination_start_key: None, + }) + .collect(); - let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| { - match item { + let stream = + ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| match item { Err(e) => Err(e), - Ok(item) => Ok(item.key.0) - } - }); + Ok(item) => Ok(item.key.0), + }); Ok(StreamOf(Box::pin(stream))) } @@ -97,18 +99,23 @@ impl Backend for ArchiveBackend { at: HashFor, ) -> Result, BackendError> { let queries = std::iter::once(ArchiveStorageQuery { - key: key, - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: None, - }) - .collect(); + key: key, + query_type: StorageQueryType::DescendantsValues, + pagination_start_key: None, + }) + .collect(); - let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| { - match item { + let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries) + .map(|item| match item { Err(e) => Some(Err(e)), - Ok(item) => item.value.map(|val| Ok(StorageResponse { key: item.key.0, value: val.0 })) - } - }).filter_map(async |item| item); + Ok(item) => item.value.map(|val| { + Ok(StorageResponse { + key: item.key.0, + value: val.0, + }) + }), + }) + .filter_map(async |item| item); Ok(StreamOf(Box::pin(stream))) } @@ -121,9 +128,15 @@ impl Backend for ArchiveBackend { .await } - async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError> { + async fn block_number_to_hash( + &self, + number: u64, + ) -> Result>>, BackendError> { retry(|| async { - let mut hashes = self.methods.archive_v1_hash_by_height(number as usize).await?; + let mut hashes = self + .methods + .archive_v1_hash_by_height(number as usize) + .await?; if let (Some(hash), None) = (hashes.pop(), hashes.pop()) { // One hash; return it. Ok(Some(BlockRef::from_hash(hash))) @@ -131,7 +144,8 @@ impl Backend for ArchiveBackend { // More than one; return None. Ok(None) } - }).await + }) + .await } async fn block_header(&self, at: HashFor) -> Result, BackendError> { @@ -147,9 +161,7 @@ impl Backend for ArchiveBackend { let Some(exts) = self.methods.archive_v1_body(at).await? else { return Ok(None); }; - Ok(Some( - exts.into_iter().map(|ext| ext.0).collect() - )) + Ok(Some(exts.into_iter().map(|ext| ext.0).collect())) }) .await } @@ -159,7 +171,9 @@ impl Backend for ArchiveBackend { let height = self.methods.archive_v1_finalized_height().await?; let mut hashes = self.methods.archive_v1_hash_by_height(height).await?; let Some(hash) = hashes.pop() else { - return Err(BackendError::Other("Multiple hashes not expected at a finalized height".into())) + return Err(BackendError::Other( + "Multiple hashes not expected at a finalized height".into(), + )); }; Ok(BlockRef::from_hash(hash)) }) @@ -170,21 +184,27 @@ impl Backend for ArchiveBackend { &self, _hasher: T::Hasher, ) -> Result>)>, BackendError> { - Err(BackendError::Other("The archive backend cannot stream block headers".into())) + Err(BackendError::Other( + "The archive backend cannot stream block headers".into(), + )) } async fn stream_best_block_headers( &self, _hasher: T::Hasher, ) -> Result>)>, BackendError> { - Err(BackendError::Other("The archive backend cannot stream block headers".into())) + Err(BackendError::Other( + "The archive backend cannot stream block headers".into(), + )) } async fn stream_finalized_block_headers( &self, _hasher: T::Hasher, ) -> Result>)>, BackendError> { - Err(BackendError::Other("The archive backend cannot stream block headers".into())) + Err(BackendError::Other( + "The archive backend cannot stream block headers".into(), + )) } async fn submit_transaction( @@ -201,7 +221,10 @@ impl Backend for ArchiveBackend { call_parameters: Option<&[u8]>, at: HashFor, ) -> Result, BackendError> { - let res = self.methods.archive_v1_call(at, method, call_parameters.unwrap_or(&[])).await?; + let res = self + .methods + .archive_v1_call(at, method, call_parameters.unwrap_or(&[])) + .await?; match res { ArchiveCallResult::Success(bytes) => Ok(bytes.0), ArchiveCallResult::Error(e) => Err(BackendError::other(e)), @@ -209,5 +232,4 @@ impl Backend for ArchiveBackend { } } - impl crate::backend::sealed::Sealed for ArchiveBackend {} - +impl crate::backend::sealed::Sealed for ArchiveBackend {} diff --git a/new/src/backend/archive/storage_stream.rs b/new/src/backend/archive/storage_stream.rs index 9fa37512e9..0d3b8c7591 100644 --- a/new/src/backend/archive/storage_stream.rs +++ b/new/src/backend/archive/storage_stream.rs @@ -1,13 +1,15 @@ -use std::collections::VecDeque; -use subxt_rpcs::Error as RpcError; -use subxt_rpcs::methods::chain_head::{ArchiveStorageQuery, ArchiveStorageSubscription, ArchiveStorageEvent, ArchiveStorageEventItem}; -use std::pin::Pin; -use std::future::Future; -use futures::{FutureExt, Stream, StreamExt}; -use std::task::{Context, Poll}; -use crate::error::BackendError; use crate::config::{Config, HashFor, RpcConfigFor}; +use crate::error::BackendError; +use futures::{FutureExt, Stream, StreamExt}; +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use subxt_rpcs::Error as RpcError; use subxt_rpcs::methods::ChainHeadRpcMethods; +use subxt_rpcs::methods::chain_head::{ + ArchiveStorageEvent, ArchiveStorageEventItem, ArchiveStorageQuery, ArchiveStorageSubscription, +}; pub struct ArchiveStorageStream { at: HashFor, @@ -19,15 +21,21 @@ pub struct ArchiveStorageStream { enum StreamState { GetSubscription { current_query: ArchiveStorageQuery>, - sub_fut: Pin>, RpcError>> + Send + 'static>> + sub_fut: Pin< + Box< + dyn Future>, RpcError>> + + Send + + 'static, + >, + >, }, RunSubscription { current_query: ArchiveStorageQuery>, - sub: ArchiveStorageSubscription> + sub: ArchiveStorageSubscription>, }, } -impl ArchiveStorageStream { +impl ArchiveStorageStream { /// Fetch descendant keys. pub fn new( at: HashFor, @@ -58,38 +66,34 @@ impl Stream for ArchiveStorageStream { let Some(query) = this.query_queue.pop_front() else { return Poll::Ready(None); }; - + let at = this.at; let methods = this.methods.clone(); let current_query = query.clone(); let sub_fut = async move { - let query = std::iter::once(ArchiveStorageQuery { - key: query.key.as_ref(), - query_type: query.query_type, - pagination_start_key: query.pagination_start_key.as_deref(), + let query = std::iter::once(ArchiveStorageQuery { + key: query.key.as_ref(), + query_type: query.query_type, + pagination_start_key: query.pagination_start_key.as_deref(), }); - methods.archive_v1_storage( - at, - query, - None - ).await + methods.archive_v1_storage(at, query, None).await }; - - this.state = Some(StreamState::GetSubscription { + + this.state = Some(StreamState::GetSubscription { current_query, - sub_fut: Box::pin(sub_fut) + sub_fut: Box::pin(sub_fut), }); - }, + } // We're getting our subscription stream for the current query. - Some(StreamState::GetSubscription { current_query, mut sub_fut }) => { + Some(StreamState::GetSubscription { + current_query, + mut sub_fut, + }) => { match sub_fut.poll_unpin(cx) { Poll::Ready(Ok(sub)) => { - this.state = Some(StreamState::RunSubscription { - current_query, - sub - }); - }, + this.state = Some(StreamState::RunSubscription { current_query, sub }); + } Poll::Ready(Err(e)) => { if e.is_disconnected_will_reconnect() { // Push the query back onto the queue to try again @@ -98,34 +102,37 @@ impl Stream for ArchiveStorageStream { } this.state = None; - return Poll::Ready(Some(Err(e.into()))) + return Poll::Ready(Some(Err(e.into()))); } Poll::Pending => { - this.state = Some(StreamState::GetSubscription { - current_query, - sub_fut + this.state = Some(StreamState::GetSubscription { + current_query, + sub_fut, }); - return Poll::Pending - }, + return Poll::Pending; + } } - }, + } // Running the subscription and returning results. - Some(StreamState::RunSubscription { current_query, mut sub }) => { + Some(StreamState::RunSubscription { + current_query, + mut sub, + }) => { match sub.poll_next_unpin(cx) { Poll::Ready(Some(Ok(val))) => { match val { ArchiveStorageEvent::Item(item) => { - this.state = Some(StreamState::RunSubscription { - current_query: ArchiveStorageQuery { - key: current_query.key, - query_type: current_query.query_type, + this.state = Some(StreamState::RunSubscription { + current_query: ArchiveStorageQuery { + key: current_query.key, + query_type: current_query.query_type, // In the event of error, we resume from the last seen value. // At the time of writing, it's not clear if paginationStartKey // starts from the key itself or the first key after it: // https://github.com/paritytech/json-rpc-interface-spec/issues/176 - pagination_start_key: Some(item.key.0.clone()) - }, - sub + pagination_start_key: Some(item.key.0.clone()), + }, + sub, }); // We treat `paginationStartKey` as being the key we want results to begin _after_. @@ -140,17 +147,17 @@ impl Stream for ArchiveStorageStream { } return Poll::Ready(Some(Ok(item))); - }, + } ArchiveStorageEvent::Error(e) => { this.state = None; - return Poll::Ready(Some(Err(BackendError::other(e.error)))) - }, + return Poll::Ready(Some(Err(BackendError::other(e.error)))); + } ArchiveStorageEvent::Done => { this.state = None; continue; - }, + } } - }, + } Poll::Ready(Some(Err(e))) => { if e.is_disconnected_will_reconnect() { // Put the current query back into the queue and retry. @@ -167,11 +174,9 @@ impl Stream for ArchiveStorageStream { this.state = None; continue; } - Poll::Pending => { - return Poll::Pending - }, + Poll::Pending => return Poll::Pending, } - }, + } } } } diff --git a/new/src/backend/chain_head.rs b/new/src/backend/chain_head.rs index 64d7bcdad9..d218ab30c0 100644 --- a/new/src/backend/chain_head.rs +++ b/new/src/backend/chain_head.rs @@ -5,7 +5,7 @@ //! This module exposes a backend implementation based on the new APIs //! described at . See //! [`rpc_methods`] for the raw API calls. -//! +//! //! Specifically, the focus here is on the `chainHead` methods. mod follow_stream; @@ -13,11 +13,10 @@ mod follow_stream_driver; mod follow_stream_unpin; mod storage_items; -use subxt_rpcs::methods::ChainHeadRpcMethods; use self::follow_stream_driver::FollowStreamFinalizedHeads; use crate::backend::{ - Backend, BlockRef, BlockRefT, StorageResponse, StreamOf, StreamOfResults, - TransactionStatus, utils::retry, + Backend, BlockRef, BlockRefT, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, + utils::retry, }; use crate::config::{Config, Hash, HashFor, RpcConfigFor}; use crate::error::{BackendError, RpcError}; @@ -29,6 +28,7 @@ use std::collections::HashMap; use std::task::Poll; use storage_items::StorageItems; use subxt_rpcs::RpcClient; +use subxt_rpcs::methods::ChainHeadRpcMethods; use subxt_rpcs::methods::chain_head::{ FollowEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType, }; @@ -151,7 +151,7 @@ impl ChainHeadBackendBuilder { #[cfg(feature = "runtime")] pub fn build_with_background_driver(self, client: impl Into) -> ChainHeadBackend { let (backend, mut driver) = self.build(client); - + super::utils::spawn(async move { // NOTE: we need to poll the driver until it's done i.e returns None // to ensure that the backend is shutdown properly. @@ -373,8 +373,13 @@ impl Backend for ChainHeadBackend { .await } - async fn block_number_to_hash(&self, _number: u64) -> Result>>, BackendError> { - Err(BackendError::other("The ChainHead V1 RPCs do not support obtaining a block hash from a number.")) + async fn block_number_to_hash( + &self, + _number: u64, + ) -> Result>>, BackendError> { + Err(BackendError::other( + "The ChainHead V1 RPCs do not support obtaining a block hash from a number.", + )) } async fn block_header(&self, at: HashFor) -> Result, BackendError> { @@ -567,24 +572,29 @@ pub(crate) async fn submit_transaction_ignoring_follow_events( RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted, RpcTransactionStatus::BestChainBlockIncluded { block: None } => { TransactionStatus::NoLongerInBestBlock - }, + } RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => { - TransactionStatus::InBestBlock { hash: BlockRef::from_hash(block.hash) } - }, + TransactionStatus::InBestBlock { + hash: BlockRef::from_hash(block.hash), + } + } RpcTransactionStatus::Finalized { block } => { - TransactionStatus::InFinalizedBlock { hash: BlockRef::from_hash(block.hash) } - }, + TransactionStatus::InFinalizedBlock { + hash: BlockRef::from_hash(block.hash), + } + } RpcTransactionStatus::Error { error } => { TransactionStatus::Error { message: error } - }, + } RpcTransactionStatus::Invalid { error } => { TransactionStatus::Invalid { message: error } - }, + } RpcTransactionStatus::Dropped { error } => { TransactionStatus::Dropped { message: error } - }, + } } - }).map_err(Into::into) + }) + .map_err(Into::into) }); Ok(StreamOf(Box::pin(tx_progress))) @@ -644,9 +654,7 @@ async fn submit_transaction_tracking_follow_events( // Poll for a follow event, and error if the stream has unexpectedly ended. let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) { Poll::Ready(None) => { - return Poll::Ready(err_other( - "chainHead_follow stream ended unexpectedly", - )); + return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly")); } Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev), Poll::Pending => Poll::Pending, @@ -671,10 +679,8 @@ async fn submit_transaction_tracking_follow_events( } FollowEvent::Finalized(ev) => { for block_ref in ev.finalized_block_hashes { - seen_blocks.insert( - block_ref.hash(), - (SeenBlockMarker::Finalized, block_ref), - ); + seen_blocks + .insert(block_ref.hash(), (SeenBlockMarker::Finalized, block_ref)); } } FollowEvent::Stop => { @@ -694,9 +700,7 @@ async fn submit_transaction_tracking_follow_events( // If we have a finalized hash, we are done looking for tx events and we are just waiting // for a pinned block with a matching hash (which must appear eventually given it's finalized). if let Some(hash) = &finalized_hash { - if let Some((SeenBlockMarker::Finalized, block_ref)) = - seen_blocks.remove(hash) - { + if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash) { // Found it! Hand back the event with a pinned block. We're done. done = true; let ev = TransactionStatus::InFinalizedBlock { @@ -768,4 +772,4 @@ async fn submit_transaction_tracking_follow_events( }); Ok(StreamOf(Box::pin(tx_stream))) -} \ No newline at end of file +} diff --git a/new/src/backend/chain_head/follow_stream.rs b/new/src/backend/chain_head/follow_stream.rs index 874f13395c..b763a6270b 100644 --- a/new/src/backend/chain_head/follow_stream.rs +++ b/new/src/backend/chain_head/follow_stream.rs @@ -103,7 +103,9 @@ impl FollowStream { } /// Create a new [`FollowStream`] given the RPC methods. - pub fn from_methods(methods: ChainHeadRpcMethods>) -> FollowStream> { + pub fn from_methods( + methods: ChainHeadRpcMethods>, + ) -> FollowStream> { FollowStream { stream_getter: Box::new(move || { let methods = methods.clone(); @@ -113,7 +115,7 @@ impl FollowStream { // Extract the subscription ID: let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else { return Err(BackendError::other( - "Subscription ID expected for chainHead_follow response, but not given" + "Subscription ID expected for chainHead_follow response, but not given", )); }; // Map stream errors into the higher level subxt one: diff --git a/new/src/backend/combined.rs b/new/src/backend/combined.rs index aae90543cc..7870c37b5f 100644 --- a/new/src/backend/combined.rs +++ b/new/src/backend/combined.rs @@ -7,19 +7,16 @@ use crate::backend::chain_head::ChainHeadBackendDriver; use crate::backend::{ - legacy::LegacyBackend, - chain_head::ChainHeadBackend, - archive::ArchiveBackend, - Backend, BlockRef, StorageResponse, StreamOfResults, - TransactionStatus, + Backend, BlockRef, StorageResponse, StreamOfResults, TransactionStatus, + archive::ArchiveBackend, chain_head::ChainHeadBackend, legacy::LegacyBackend, }; use crate::config::{Config, HashFor}; use crate::error::{BackendError, CombinedBackendError}; use async_trait::async_trait; -use futures::StreamExt; -use subxt_rpcs::RpcClient; use futures::Stream; +use futures::StreamExt; use std::task::Poll; +use subxt_rpcs::RpcClient; pub struct CombinedBackendBuilder { archive: BackendChoice>, @@ -33,7 +30,7 @@ enum BackendChoice { UseDefault, } -impl CombinedBackendBuilder { +impl CombinedBackendBuilder { /// Create a new [`CombinedBackendBuilder`]. pub fn new() -> Self { CombinedBackendBuilder { @@ -86,9 +83,12 @@ impl CombinedBackendBuilder { /// /// If you just want to run the driver in the background until completion in on the default runtime, /// use [`CombinedBackendBuilder::build_with_background_driver`] instead. - pub async fn build(self, rpc_client: impl Into) -> Result<(CombinedBackend, CombinedBackendDriver), CombinedBackendError> { + pub async fn build( + self, + rpc_client: impl Into, + ) -> Result<(CombinedBackend, CombinedBackendDriver), CombinedBackendError> { let rpc_client = rpc_client.into(); - + // What does the thing wer're talking to actually know about? let methods: Vec = rpc_client .request("rpc_methods", subxt_rpcs::rpc_params![]) @@ -98,7 +98,9 @@ impl CombinedBackendBuilder { let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_")); let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1")); - let mut combined_driver = CombinedBackendDriver { chainhead_driver: None }; + let mut combined_driver = CombinedBackendDriver { + chainhead_driver: None, + }; let archive = if has_archive_methods { match self.archive { @@ -106,19 +108,24 @@ impl CombinedBackendBuilder { BackendChoice::UseDefault => Some(ArchiveBackend::new(rpc_client.clone())), BackendChoice::DontUse => None, } - } else { None }; + } else { + None + }; let chainhead = if has_chainhead_methods { match self.chainhead { BackendChoice::Use(b) => Some(b), BackendChoice::UseDefault => { - let (chainhead, chainhead_driver) = ChainHeadBackend::builder().build(rpc_client.clone()); + let (chainhead, chainhead_driver) = + ChainHeadBackend::builder().build(rpc_client.clone()); combined_driver.chainhead_driver = Some(chainhead_driver); Some(chainhead) - }, + } BackendChoice::DontUse => None, } - } else { None }; + } else { + None + }; let legacy = match self.legacy { BackendChoice::Use(b) => Some(b), @@ -129,7 +136,7 @@ impl CombinedBackendBuilder { let combined = CombinedBackend { archive, chainhead, - legacy + legacy, }; Ok((combined, combined_driver)) @@ -141,10 +148,11 @@ impl CombinedBackendBuilder { /// - On non-wasm targets, this will spawn the driver on `tokio`. /// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`. #[cfg(feature = "runtime")] - pub async fn build_with_background_driver(self, client: impl Into) -> Result, CombinedBackendError> { - let (backend, mut driver) = self - .build(client) - .await?; + pub async fn build_with_background_driver( + self, + client: impl Into, + ) -> Result, CombinedBackendError> { + let (backend, mut driver) = self.build(client).await?; super::utils::spawn(async move { // NOTE: we need to poll the driver until it's done i.e returns None @@ -166,10 +174,10 @@ impl CombinedBackendBuilder { /// that the [`CombinedBackend`] can make progress. It does not need polling /// if [`CombinedBackendDriver::needs_polling`] returns `false`. pub struct CombinedBackendDriver { - chainhead_driver: Option> + chainhead_driver: Option>, } -impl CombinedBackendDriver { +impl CombinedBackendDriver { pub fn needs_polling(&self) -> bool { self.chainhead_driver.is_some() } @@ -183,7 +191,7 @@ impl Stream for CombinedBackendDriver { ) -> std::task::Poll> { match &mut self.chainhead_driver { Some(driver) => driver.poll_next_unpin(cx), - None => Poll::Ready(None) + None => Poll::Ready(None), } } } @@ -196,7 +204,7 @@ pub struct CombinedBackend { legacy: Option>, } -impl CombinedBackend { +impl CombinedBackend { /// Configure and construct a [`CombinedBackend`]. pub fn builder() -> CombinedBackendBuilder { CombinedBackendBuilder::new() @@ -234,7 +242,7 @@ impl super::sealed::Sealed for CombinedBackend {} // by this are more likely to expire. // - If neither exists / works, we fall back to the legacy methods. These have some limits on // what is available (often fewer limits than chainHead though) but tend to do the job. We'd -// rather not use these as they are old and should go away, but until then they are a good +// rather not use these as they are old and should go away, but until then they are a good // fallback. #[async_trait] impl Backend for CombinedBackend { @@ -243,13 +251,11 @@ impl Backend for CombinedBackend { keys: Vec>, at: HashFor, ) -> Result, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.storage_fetch_values(keys.clone(), at).await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.storage_fetch_values(keys.clone(), at).await, + ) + .await } async fn storage_fetch_descendant_keys( @@ -257,13 +263,11 @@ impl Backend for CombinedBackend { key: Vec, at: HashFor, ) -> Result>, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.storage_fetch_descendant_keys(key.clone(), at).await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.storage_fetch_descendant_keys(key.clone(), at).await, + ) + .await } async fn storage_fetch_descendant_values( @@ -271,123 +275,130 @@ impl Backend for CombinedBackend { key: Vec, at: HashFor, ) -> Result, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.storage_fetch_descendant_values(key.clone(), at).await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.storage_fetch_descendant_values(key.clone(), at).await, + ) + .await } async fn genesis_hash(&self) -> Result, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.genesis_hash().await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.genesis_hash().await, + ) + .await } - async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError> { - try_backends(&[ - self.archive(), - self.legacy(), - // chainHead last as it cannot handle this request and will fail, so it's here - // just to hand back a more relevant error in case the above two backends aren't - // enabled or have some issue. - self.chainhead() - ], async |b: &dyn Backend| { - b.block_number_to_hash(number).await - }).await + async fn block_number_to_hash( + &self, + number: u64, + ) -> Result>>, BackendError> { + try_backends( + &[ + self.archive(), + self.legacy(), + // chainHead last as it cannot handle this request and will fail, so it's here + // just to hand back a more relevant error in case the above two backends aren't + // enabled or have some issue. + self.chainhead(), + ], + async |b: &dyn Backend| b.block_number_to_hash(number).await, + ) + .await } async fn block_header(&self, at: HashFor) -> Result, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.block_header(at).await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.block_header(at).await, + ) + .await } async fn block_body(&self, at: HashFor) -> Result>>, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.block_body(at).await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.block_body(at).await, + ) + .await } async fn latest_finalized_block_ref(&self) -> Result>, BackendError> { - try_backends(&[ - // Prioritize chainHead backend since it's streaming these things; save another call. - self.chainhead(), - self.archive(), - self.legacy() - ], async |b: &dyn Backend| { - b.latest_finalized_block_ref().await - }).await + try_backends( + &[ + // Prioritize chainHead backend since it's streaming these things; save another call. + self.chainhead(), + self.archive(), + self.legacy(), + ], + async |b: &dyn Backend| b.latest_finalized_block_ref().await, + ) + .await } async fn stream_all_block_headers( &self, hasher: T::Hasher, ) -> Result>)>, BackendError> { - try_backends(&[ - // Ignore archive backend; it doesn't support this. - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.stream_all_block_headers(hasher.clone()).await - }).await + try_backends( + &[ + // Ignore archive backend; it doesn't support this. + self.chainhead(), + self.legacy(), + ], + async |b: &dyn Backend| b.stream_all_block_headers(hasher.clone()).await, + ) + .await } async fn stream_best_block_headers( &self, hasher: T::Hasher, ) -> Result>)>, BackendError> { - try_backends(&[ - // Ignore archive backend; it doesn't support this. - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.stream_best_block_headers(hasher.clone()).await - }).await + try_backends( + &[ + // Ignore archive backend; it doesn't support this. + self.chainhead(), + self.legacy(), + ], + async |b: &dyn Backend| b.stream_best_block_headers(hasher.clone()).await, + ) + .await } async fn stream_finalized_block_headers( &self, hasher: T::Hasher, ) -> Result>)>, BackendError> { - try_backends(&[ - // Ignore archive backend; it doesn't support this. - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.stream_finalized_block_headers(hasher.clone()).await - }).await + try_backends( + &[ + // Ignore archive backend; it doesn't support this. + self.chainhead(), + self.legacy(), + ], + async |b: &dyn Backend| b.stream_finalized_block_headers(hasher.clone()).await, + ) + .await } async fn submit_transaction( &self, extrinsic: &[u8], ) -> Result>>, BackendError> { - try_backends(&[ - // chainHead first as it does the same as the archive backend, but with better - // guarantees around the block handed back being pinned & ready to access. - self.chainhead(), - self.legacy(), - // archive last just incase chainHead & legacy fail or aren't provided for some - // reason. - self.archive(), - ], async |b: &dyn Backend| { - b.submit_transaction(extrinsic).await - }).await + try_backends( + &[ + // chainHead first as it does the same as the archive backend, but with better + // guarantees around the block handed back being pinned & ready to access. + self.chainhead(), + self.legacy(), + // archive last just incase chainHead & legacy fail or aren't provided for some + // reason. + self.archive(), + ], + async |b: &dyn Backend| b.submit_transaction(extrinsic).await, + ) + .await } async fn call( @@ -396,20 +407,18 @@ impl Backend for CombinedBackend { call_parameters: Option<&[u8]>, at: HashFor, ) -> Result, BackendError> { - try_backends(&[ - self.archive(), - self.chainhead(), - self.legacy() - ], async |b: &dyn Backend| { - b.call(method, call_parameters, at).await - }).await + try_backends( + &[self.archive(), self.chainhead(), self.legacy()], + async |b: &dyn Backend| b.call(method, call_parameters, at).await, + ) + .await } } /// Call one backend after the other in the list until we get a successful result back. async fn try_backends<'s, 'b, T, Func, Fut, O>( backends: &'s [Option<&'b dyn Backend>], - mut f: Func + mut f: Func, ) -> Result where 'b: 's, @@ -417,15 +426,16 @@ where Func: FnMut(&'b dyn Backend) -> Fut, Fut: Future> + 'b, { - static NO_AVAILABLE_BACKEND: &str = "None of the configured backends are capable of handling this request"; + static NO_AVAILABLE_BACKEND: &str = + "None of the configured backends are capable of handling this request"; let mut err = BackendError::other(NO_AVAILABLE_BACKEND); for backend in backends.into_iter().filter_map(|b| *b) { match f(backend).await { Ok(res) => return Ok(res), - Err(e) => { err = e } + Err(e) => err = e, } } Err(err) -} \ No newline at end of file +} diff --git a/new/src/backend/legacy.rs b/new/src/backend/legacy.rs index b8b853e33a..d7aadf4989 100644 --- a/new/src/backend/legacy.rs +++ b/new/src/backend/legacy.rs @@ -7,21 +7,20 @@ mod descendant_streams; -use subxt_rpcs::methods::legacy::{ TransactionStatus as RpcTransactionStatus, LegacyRpcMethods }; use crate::backend::utils::{retry, retry_stream}; use crate::backend::{ - Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, - TransactionStatus, + Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, }; use crate::config::{Config, HashFor, Hasher, Header, RpcConfigFor}; use crate::error::BackendError; use async_trait::async_trait; +use codec::Encode; +use descendant_streams::{StorageFetchDescendantKeysStream, StorageFetchDescendantValuesStream}; use futures::TryStreamExt; use futures::{Future, Stream, StreamExt, future, future::Either, stream}; use subxt_rpcs::RpcClient; use subxt_rpcs::methods::legacy::NumberOrHex; -use codec::Encode; -use descendant_streams::{StorageFetchDescendantKeysStream, StorageFetchDescendantValuesStream}; +use subxt_rpcs::methods::legacy::{LegacyRpcMethods, TransactionStatus as RpcTransactionStatus}; /// Configure and build an [`LegacyBackend`]. pub struct LegacyBackendBuilder { @@ -135,7 +134,7 @@ impl Backend for LegacyBackend { self.methods.clone(), key, at, - self.storage_page_size + self.storage_page_size, ); let keys = keys.flat_map(|keys| { @@ -163,7 +162,7 @@ impl Backend for LegacyBackend { self.methods.clone(), key, at, - self.storage_page_size + self.storage_page_size, ); Ok(StreamOf(Box::pin(values_stream))) @@ -177,7 +176,10 @@ impl Backend for LegacyBackend { .await } - async fn block_number_to_hash(&self, number: u64) -> Result>>, BackendError> { + async fn block_number_to_hash( + &self, + number: u64, + ) -> Result>>, BackendError> { retry(|| async { let number_or_hash = NumberOrHex::Number(number); let hash = self @@ -186,7 +188,8 @@ impl Backend for LegacyBackend { .await? .map(BlockRef::from_hash); Ok(hash) - }).await + }) + .await } async fn block_header(&self, at: HashFor) -> Result, BackendError> { diff --git a/new/src/backend/legacy/descendant_streams.rs b/new/src/backend/legacy/descendant_streams.rs index dab5e48508..5c96159e0c 100644 --- a/new/src/backend/legacy/descendant_streams.rs +++ b/new/src/backend/legacy/descendant_streams.rs @@ -1,12 +1,12 @@ -use crate::backend::utils::retry; +use super::LegacyRpcMethods; use crate::backend::StorageResponse; +use crate::backend::utils::retry; use crate::config::{Config, HashFor, RpcConfigFor}; use crate::error::BackendError; use futures::{Future, FutureExt, Stream, StreamExt}; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -use super::LegacyRpcMethods; /// This provides a stream of values given some prefix `key`. It /// internally manages pagination and such. @@ -26,7 +26,7 @@ pub struct StorageFetchDescendantKeysStream { done: bool, } -impl StorageFetchDescendantKeysStream { +impl StorageFetchDescendantKeysStream { /// Fetch descendant keys. pub fn new( methods: LegacyRpcMethods>, @@ -91,7 +91,7 @@ impl Stream for StorageFetchDescendantKeysStream { // Error getting keys? Return it. return Poll::Ready(Some(Err(e))); - }, + } Poll::Pending => { this.keys_fut = Some(keys_fut); return Poll::Pending; @@ -142,7 +142,7 @@ pub struct StorageFetchDescendantValuesStream { results: VecDeque<(Vec, Vec)>, } -impl StorageFetchDescendantValuesStream { +impl StorageFetchDescendantValuesStream { /// Fetch descendant values. pub fn new( methods: LegacyRpcMethods>, @@ -201,7 +201,7 @@ impl Stream for StorageFetchDescendantValuesStream { continue; } - return Poll::Ready(Some(Err(e))) + return Poll::Ready(Some(Err(e))); } Poll::Pending => { this.results_fut = Some(results_fut); diff --git a/new/src/backend/utils.rs b/new/src/backend/utils.rs index 18d51969f7..d687c734b8 100644 --- a/new/src/backend/utils.rs +++ b/new/src/backend/utils.rs @@ -6,7 +6,7 @@ use futures::{FutureExt, Stream, StreamExt}; use std::{future::Future, pin::Pin, task::Poll}; /// Spawn a task. -/// +/// /// - On non-wasm targets, this will spawn a task via [`tokio::spawn`]. /// - On wasm targets, this will spawn a task via [`wasm_bindgen_futures::spawn_local`]. #[cfg(feature = "runtime")] @@ -133,7 +133,7 @@ enum RetrySubscriptionState { impl std::marker::Unpin for RetrySubscription {} -impl Stream for RetrySubscription +impl Stream for RetrySubscription where F: FnMut() -> R, R: Future, BackendError>> + Unpin, @@ -148,7 +148,7 @@ where match &mut self.state { RetrySubscriptionState::Init => { self.state = RetrySubscriptionState::Pending((self.resubscribe)()); - }, + } RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) { Poll::Ready(Some(Err(err))) => { if err.is_disconnected_will_reconnect() { @@ -158,7 +158,7 @@ where } Poll::Ready(None) => { self.state = RetrySubscriptionState::Done; - return Poll::Ready(None) + return Poll::Ready(None); } Poll::Ready(Some(Ok(val))) => { return Poll::Ready(Some(Ok(val))); @@ -182,9 +182,7 @@ where return Poll::Pending; } }, - RetrySubscriptionState::Done => { - return Poll::Ready(None) - } + RetrySubscriptionState::Done => return Poll::Ready(None), }; } } diff --git a/new/src/client.rs b/new/src/client.rs index d2e347249c..fd5dfe9d97 100644 --- a/new/src/client.rs +++ b/new/src/client.rs @@ -1,15 +1,17 @@ mod offline_client; mod online_client; +use crate::config::{Config, HashFor}; use core::marker::PhantomData; +use subxt_metadata::Metadata; // We keep these traits internal, so that we can mess with them later if needed, // and instead only the concrete types are public which wrap these trait impls. pub(crate) use offline_client::OfflineClientAtBlockT; pub(crate) use online_client::OnlineClientAtBlockT; -pub use offline_client::OfflineClient; -pub use online_client::OnlineClient; +pub use offline_client::{OfflineClient, OfflineClientAtBlock}; +pub use online_client::{OnlineClient, OnlineClientAtBlock}; /// This represents a client at a specific block number. #[derive(Clone, Debug)] @@ -26,4 +28,24 @@ impl ClientAtBlock { marker: PhantomData, } } -} \ No newline at end of file +} + +impl ClientAtBlock +where + T: Config, + Client: OfflineClientAtBlockT, +{ + pub fn metadata_ref(&self) -> &Metadata { + self.client.metadata_ref() + } +} + +impl ClientAtBlock +where + T: Config, + Client: OnlineClientAtBlockT, +{ + pub fn block_hash(&self) -> HashFor { + self.client.block_hash() + } +} diff --git a/new/src/client/offline_client.rs b/new/src/client/offline_client.rs index 3a25d15ccf..9698640185 100644 --- a/new/src/client/offline_client.rs +++ b/new/src/client/offline_client.rs @@ -1,8 +1,8 @@ -use crate::config::Config; use crate::client::ClientAtBlock; +use crate::config::Config; use crate::error::OfflineClientAtBlockError; -use subxt_metadata::Metadata; use std::sync::Arc; +use subxt_metadata::Metadata; #[derive(Clone, Debug)] pub struct OfflineClient { @@ -13,9 +13,7 @@ pub struct OfflineClient { impl OfflineClient { /// Create a new [`OfflineClient`] with the given configuration. pub fn new(config: T) -> Self { - OfflineClient { - config, - } + OfflineClient { config } } /// Pick the block height at which to operate. This references data from the @@ -35,25 +33,29 @@ impl OfflineClient { .metadata_for_spec_version(spec_version) .ok_or(OfflineClientAtBlockError::MetadataNotFound { spec_version })?; - Ok(ClientAtBlock::new(OfflineClientAtBlock { - metadata, - })) + Ok(ClientAtBlock::new(OfflineClientAtBlock { metadata })) } } +#[derive(Clone)] pub struct OfflineClientAtBlock { metadata: Arc, } /// This represents an offline-only client at a specific block. #[doc(hidden)] -pub trait OfflineClientAtBlockT { - /// Get the metadata appropriate for this block. - fn metadata(&self) -> &Metadata; +pub trait OfflineClientAtBlockT: Clone { + /// Get a reference to the metadata appropriate for this block. + fn metadata_ref(&self) -> &Metadata; + /// Get a clone of the metadata appropriate for this block. + fn metadata(&self) -> Arc; } impl OfflineClientAtBlockT for OfflineClientAtBlock { - fn metadata(&self) -> &Metadata { + fn metadata_ref(&self) -> &Metadata { &self.metadata } -} \ No newline at end of file + fn metadata(&self) -> Arc { + self.metadata.clone() + } +} diff --git a/new/src/client/online_client.rs b/new/src/client/online_client.rs index 109807b862..0f822b1361 100644 --- a/new/src/client/online_client.rs +++ b/new/src/client/online_client.rs @@ -1,17 +1,19 @@ mod block_number_or_ref; +mod blocks; -use core::marker::PhantomData; use super::ClientAtBlock; use super::OfflineClientAtBlockT; -use crate::config::{ Config, HashFor, Header, Hasher }; -use crate::error::OnlineClientAtBlockError; -use crate::backend::{ Backend, CombinedBackend, BlockRef }; +use crate::backend::{Backend, BlockRef, CombinedBackend}; +use crate::config::{Config, HashFor, Hasher, Header}; +use crate::error::{BlocksError, OnlineClientAtBlockError}; +use blocks::Blocks; use codec::{Compact, Decode, Encode}; +use core::marker::PhantomData; use frame_metadata::{RuntimeMetadata, RuntimeMetadataPrefixed}; use scale_info_legacy::TypeRegistrySet; use std::sync::Arc; -use subxt_rpcs::RpcClient; use subxt_metadata::Metadata; +use subxt_rpcs::RpcClient; #[cfg(feature = "jsonrpsee")] #[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))] @@ -32,7 +34,7 @@ struct OnlineClientInner { backend: Arc>, } -impl std::fmt::Debug for OnlineClientInner { +impl std::fmt::Debug for OnlineClientInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OnlineClientInner") .field("config", &"") @@ -63,9 +65,9 @@ impl OnlineClient { url: url_str.to_string(), })?; if !Self::is_url_secure(&url) { - return Err(OnlineClientError::RpcError( - subxt_rpcs::Error::InsecureUrl(url_str.to_string()), - )); + return Err(OnlineClientError::RpcError(subxt_rpcs::Error::InsecureUrl( + url_str.to_string(), + ))); } OnlineClient::from_insecure_url(config, url).await } @@ -110,18 +112,98 @@ impl OnlineClient { /// Construct a new [`OnlineClient`] by providing an underlying [`Backend`] /// implementation to power it. - pub fn from_backend( - config: T, - backend: impl Into>>, - ) -> OnlineClient { + pub fn from_backend(config: T, backend: impl Into>>) -> OnlineClient { OnlineClient { inner: Arc::new(OnlineClientInner { config, - backend: backend.into() - }) + backend: backend.into(), + }), } } + /// Obtain a stream of all blocks imported by the node. + /// + /// **Note:** You probably want to use [`Self::stream_blocks()`] most of + /// the time. Blocks returned here may be pruned at any time and become inaccessible, + /// leading to errors when trying to work with them. + pub async fn stream_all_blocks(&self) -> Result, BlocksError> { + // We need a hasher to know how to hash things. Thus, we need metadata to instantiate + // the hasher, so let's use the current block. + let current_block = self + .at_current_block() + .await + .map_err(BlocksError::CannotGetCurrentBlock)?; + let hasher = current_block.client.hasher.clone(); + + let stream = self + .inner + .backend + .stream_all_block_headers(hasher) + .await + .map_err(BlocksError::CannotGetBlockHeaderStream)?; + + Ok(Blocks::from_headers_stream(self.clone(), stream)) + } + + /// Obtain a stream of blocks imported by the node onto the current best fork. + /// + /// **Note:** You probably want to use [`Self::stream_blocks()`] most of + /// the time. Blocks returned here may be pruned at any time and become inaccessible, + /// leading to errors when trying to work with them. + pub async fn stream_best_blocks(&self) -> Result, BlocksError> { + // We need a hasher to know how to hash things. Thus, we need metadata to instantiate + // the hasher, so let's use the current block. + let current_block = self + .at_current_block() + .await + .map_err(BlocksError::CannotGetCurrentBlock)?; + let hasher = current_block.client.hasher.clone(); + + let stream = self + .inner + .backend + .stream_best_block_headers(hasher) + .await + .map_err(BlocksError::CannotGetBlockHeaderStream)?; + + Ok(Blocks::from_headers_stream(self.clone(), stream)) + } + + /// Obtain a stream of finalized blocks. + pub async fn stream_blocks(&self) -> Result, BlocksError> { + // We need a hasher to know how to hash things. Thus, we need metadata to instantiate + // the hasher, so let's use the current block. + let current_block = self + .at_current_block() + .await + .map_err(BlocksError::CannotGetCurrentBlock)?; + let hasher = current_block.client.hasher.clone(); + + let stream = self + .inner + .backend + .stream_finalized_block_headers(hasher) + .await + .map_err(BlocksError::CannotGetBlockHeaderStream)?; + + Ok(Blocks::from_headers_stream(self.clone(), stream)) + } + + /// Instantiate a client to work at the current finalized block _at the time of instantiation_. + /// This does not track new blocks. + pub async fn at_current_block( + &self, + ) -> Result, T>, OnlineClientAtBlockError> { + let latest_block = self + .inner + .backend + .latest_finalized_block_ref() + .await + .map_err(|e| OnlineClientAtBlockError::CannotGetCurrentBlock { reason: e })?; + + self.at_block(latest_block).await + } + /// Instantiate a client for working at a specific block. pub async fn at_block( &self, @@ -138,27 +220,27 @@ impl OnlineClient { .backend .block_header(block_hash) .await - .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHeader { - block_hash: block_hash.into(), - reason: e + .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHeader { + block_hash: block_hash.into(), + reason: e, })? - .ok_or(OnlineClientAtBlockError::BlockHeaderNotFound { - block_hash: block_hash.into() + .ok_or(OnlineClientAtBlockError::BlockHeaderNotFound { + block_hash: block_hash.into(), })?; (block_ref, block_header.number()) - }, + } BlockNumberOrRef::Number(block_num) => { let block_ref = self .inner .backend .block_number_to_hash(block_num) .await - .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash { - block_number: block_num, - reason: e + .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash { + block_number: block_num, + reason: e, })? - .ok_or(OnlineClientAtBlockError::BlockNotFound { - block_number: block_num + .ok_or(OnlineClientAtBlockError::BlockNotFound { + block_number: block_num, })?; (block_ref, block_num) } @@ -174,9 +256,9 @@ impl OnlineClient { .backend .call("Core_version", None, block_hash) .await - .map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion { - block_hash: block_hash.into(), - reason: e + .map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion { + block_hash: block_hash.into(), + reason: e, })?; #[derive(codec::Decode)] @@ -199,101 +281,107 @@ impl OnlineClient { let metadata = match self.inner.config.metadata_for_spec_version(spec_version) { Some(metadata) => metadata, None => { - let metadata: Metadata = match get_metadata(&*self.inner.backend, block_hash).await? { - m @ RuntimeMetadata::V0(_) | - m @ RuntimeMetadata::V1(_) | - m @ RuntimeMetadata::V2(_) | - m @ RuntimeMetadata::V3(_) | - m @ RuntimeMetadata::V4(_) | - m @ RuntimeMetadata::V5(_) | - m @ RuntimeMetadata::V6(_) | - m @ RuntimeMetadata::V7(_) => { - return Err(OnlineClientAtBlockError::UnsupportedMetadataVersion { - block_hash: block_hash.into(), - version: m.version() - }) - }, - RuntimeMetadata::V8(m) => { - let types = get_legacy_types(self, spec_version)?; - Metadata::from_v8(&m, &types) - .map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { + let metadata: Metadata = + match get_metadata(&*self.inner.backend, block_hash).await? { + m @ RuntimeMetadata::V0(_) + | m @ RuntimeMetadata::V1(_) + | m @ RuntimeMetadata::V2(_) + | m @ RuntimeMetadata::V3(_) + | m @ RuntimeMetadata::V4(_) + | m @ RuntimeMetadata::V5(_) + | m @ RuntimeMetadata::V6(_) + | m @ RuntimeMetadata::V7(_) => { + return Err(OnlineClientAtBlockError::UnsupportedMetadataVersion { block_hash: block_hash.into(), - metadata_version: 8, - reason: e + version: m.version(), + }); + } + RuntimeMetadata::V8(m) => { + let types = get_legacy_types(self, spec_version)?; + Metadata::from_v8(&m, &types).map_err(|e| { + OnlineClientAtBlockError::CannotConvertLegacyMetadata { + block_hash: block_hash.into(), + metadata_version: 8, + reason: e, + } })? - }, - RuntimeMetadata::V9(m) => { - let types = get_legacy_types(self, spec_version)?; - Metadata::from_v9(&m, &types) - .map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { - block_hash: block_hash.into(), - metadata_version: 9, - reason: e + } + RuntimeMetadata::V9(m) => { + let types = get_legacy_types(self, spec_version)?; + Metadata::from_v9(&m, &types).map_err(|e| { + OnlineClientAtBlockError::CannotConvertLegacyMetadata { + block_hash: block_hash.into(), + metadata_version: 9, + reason: e, + } })? - }, - RuntimeMetadata::V10(m) => { - let types = get_legacy_types(self, spec_version)?; - Metadata::from_v10(&m, &types) - .map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { - block_hash: block_hash.into(), - metadata_version: 10, - reason: e + } + RuntimeMetadata::V10(m) => { + let types = get_legacy_types(self, spec_version)?; + Metadata::from_v10(&m, &types).map_err(|e| { + OnlineClientAtBlockError::CannotConvertLegacyMetadata { + block_hash: block_hash.into(), + metadata_version: 10, + reason: e, + } })? - }, - RuntimeMetadata::V11(m) => { - let types = get_legacy_types(self, spec_version)?; - Metadata::from_v11(&m, &types) - .map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { - block_hash: block_hash.into(), - metadata_version: 11, - reason: e + } + RuntimeMetadata::V11(m) => { + let types = get_legacy_types(self, spec_version)?; + Metadata::from_v11(&m, &types).map_err(|e| { + OnlineClientAtBlockError::CannotConvertLegacyMetadata { + block_hash: block_hash.into(), + metadata_version: 11, + reason: e, + } })? - }, - RuntimeMetadata::V12(m) => { - let types = get_legacy_types(self, spec_version)?; - Metadata::from_v12(&m, &types) - .map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { - block_hash: block_hash.into(), - metadata_version: 12, - reason: e + } + RuntimeMetadata::V12(m) => { + let types = get_legacy_types(self, spec_version)?; + Metadata::from_v12(&m, &types).map_err(|e| { + OnlineClientAtBlockError::CannotConvertLegacyMetadata { + block_hash: block_hash.into(), + metadata_version: 12, + reason: e, + } })? - }, - RuntimeMetadata::V13(m) => { - let types = get_legacy_types(self, spec_version)?; - Metadata::from_v13(&m, &types) - .map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { - block_hash: block_hash.into(), - metadata_version: 13, - reason: e + } + RuntimeMetadata::V13(m) => { + let types = get_legacy_types(self, spec_version)?; + Metadata::from_v13(&m, &types).map_err(|e| { + OnlineClientAtBlockError::CannotConvertLegacyMetadata { + block_hash: block_hash.into(), + metadata_version: 13, + reason: e, + } })? - }, - RuntimeMetadata::V14(m) => { - Metadata::from_v14(m) - .map_err(|e| OnlineClientAtBlockError::CannotConvertModernMetadata { + } + RuntimeMetadata::V14(m) => Metadata::from_v14(m).map_err(|e| { + OnlineClientAtBlockError::CannotConvertModernMetadata { block_hash: block_hash.into(), metadata_version: 14, - reason: e - })? - }, - RuntimeMetadata::V15(m) => { - Metadata::from_v15(m) - .map_err(|e| OnlineClientAtBlockError::CannotConvertModernMetadata { + reason: e, + } + })?, + RuntimeMetadata::V15(m) => Metadata::from_v15(m).map_err(|e| { + OnlineClientAtBlockError::CannotConvertModernMetadata { block_hash: block_hash.into(), metadata_version: 15, - reason: e - })? - }, - RuntimeMetadata::V16(m) => { - Metadata::from_v16(m) - .map_err(|e| OnlineClientAtBlockError::CannotConvertModernMetadata { + reason: e, + } + })?, + RuntimeMetadata::V16(m) => Metadata::from_v16(m).map_err(|e| { + OnlineClientAtBlockError::CannotConvertModernMetadata { block_hash: block_hash.into(), metadata_version: 16, - reason: e - })? - }, - }; + reason: e, + } + })?, + }; let metadata = Arc::new(metadata); - self.inner.config.set_metadata_for_spec_version(spec_version, metadata.clone()); + self.inner + .config + .set_metadata_for_spec_version(spec_version, metadata.clone()); metadata } }; @@ -305,17 +393,16 @@ impl OnlineClient { block_ref, }; - Ok(ClientAtBlock { - client: online_client_at_block, - marker: PhantomData + Ok(ClientAtBlock { + client: online_client_at_block, + marker: PhantomData, }) } } /// This represents an online client at a specific block. #[doc(hidden)] -pub trait OnlineClientAtBlockT: OfflineClientAtBlockT -{ +pub trait OnlineClientAtBlockT: OfflineClientAtBlockT { /// Return the RPC methods we'll use to interact with the node. fn backend(&self) -> &dyn Backend; /// Return the block hash for the current block. @@ -325,6 +412,7 @@ pub trait OnlineClientAtBlockT: OfflineClientAtBlockT } /// The inner type providing the necessary data to work online at a specific block. +#[derive(Clone)] pub struct OnlineClientAtBlock { metadata: Arc, backend: Arc>, @@ -345,12 +433,18 @@ impl OnlineClientAtBlockT for OnlineClientAtBlock { } impl OfflineClientAtBlockT for OnlineClientAtBlock { - fn metadata(&self) -> &Metadata { + fn metadata_ref(&self) -> &Metadata { &self.metadata } + fn metadata(&self) -> Arc { + self.metadata.clone() + } } -fn get_legacy_types(client: &OnlineClient, spec_version: u32) -> Result, OnlineClientAtBlockError> { +fn get_legacy_types( + client: &OnlineClient, + spec_version: u32, +) -> Result, OnlineClientAtBlockError> { client .inner .config @@ -377,7 +471,11 @@ async fn get_metadata( if let Some(version_to_get) = version_to_get { let version_bytes = version_to_get.encode(); let rpc_response = backend - .call("Metadata_metadata_at_version", Some(&version_bytes), block_hash) + .call( + "Metadata_metadata_at_version", + Some(&version_bytes), + block_hash, + ) .await .map_err(|e| OnlineClientAtBlockError::CannotGetMetadata { block_hash: block_hash.into(), diff --git a/new/src/client/online_client/block_number_or_ref.rs b/new/src/client/online_client/block_number_or_ref.rs index 93b3837659..cc35f95649 100644 --- a/new/src/client/online_client/block_number_or_ref.rs +++ b/new/src/client/online_client/block_number_or_ref.rs @@ -1,28 +1,28 @@ -use crate::config::{ Config, HashFor, Hasher }; use crate::backend::BlockRef; +use crate::config::{Config, HashFor, Hasher}; -/// This represents either a block number or a reference +/// This represents either a block number or a reference /// to a block, which is essentially a block hash. pub enum BlockNumberOrRef { /// A block number. Number(u64), /// A block ref / hash. - BlockRef(BlockRef>) + BlockRef(BlockRef>), } -impl From for BlockNumberOrRef { +impl From for BlockNumberOrRef { fn from(value: u32) -> Self { BlockNumberOrRef::Number(value.into()) } } -impl From for BlockNumberOrRef { +impl From for BlockNumberOrRef { fn from(value: u64) -> Self { BlockNumberOrRef::Number(value) } } -impl From>> for BlockNumberOrRef { +impl From>> for BlockNumberOrRef { fn from(block_ref: BlockRef>) -> Self { BlockNumberOrRef::BlockRef(block_ref) } @@ -31,9 +31,9 @@ impl From>> for BlockNumberOrRef { // Ideally we'd have `impl From> for BlockNumberOrRef` but since our config // could set _any_ hash type, this boils down to `impl From for ..` which is too general. // Thus, we target our current concrete hash type. -impl From for BlockNumberOrRef +impl From for BlockNumberOrRef where - ::Hash: From + ::Hash: From, { fn from(hash: crate::config::substrate::H256) -> Self { BlockNumberOrRef::BlockRef(BlockRef::from_hash(hash.into())) diff --git a/new/src/client/online_client/blocks.rs b/new/src/client/online_client/blocks.rs new file mode 100644 index 0000000000..aea34eede9 --- /dev/null +++ b/new/src/client/online_client/blocks.rs @@ -0,0 +1,76 @@ +use crate::backend::{BlockRef, StreamOfResults}; +use crate::client::{ClientAtBlock, OnlineClient, OnlineClientAtBlock}; +use crate::config::{Config, HashFor, Header}; +use crate::error::{BlocksError, OnlineClientAtBlockError}; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A stream of blocks. +pub struct Blocks { + client: OnlineClient, + stream: StreamOfResults<(T::Header, BlockRef>)>, +} + +impl Blocks { + pub(crate) fn from_headers_stream( + client: OnlineClient, + stream: StreamOfResults<(T::Header, BlockRef>)>, + ) -> Self { + Blocks { client, stream } + } +} + +impl Stream for Blocks { + type Item = Result, BlocksError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = match self.stream.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(item)) => item, + }; + + let res = match item { + Ok((block_header, block_ref)) => Ok(Block { + block_ref, + block_header, + client: self.client.clone(), + }), + Err(e) => Err(BlocksError::CannotGetBlockHeader(e)), + }; + + Poll::Ready(Some(res)) + } +} + +/// A block from the stream of blocks. +pub struct Block { + block_ref: BlockRef>, + block_header: T::Header, + client: OnlineClient, +} + +impl Block { + /// The block hash + pub fn hash(&self) -> HashFor { + self.block_ref.hash() + } + + /// The block number. + pub fn number(&self) -> u64 { + self.block_header.number() + } + + /// The block header. + pub fn header(&self) -> &T::Header { + &self.block_header + } + + /// Instantiate a client at this block. + pub async fn client( + &self, + ) -> Result, T>, OnlineClientAtBlockError> { + self.client.at_block(self.block_ref.clone()).await + } +} diff --git a/new/src/config.rs b/new/src/config.rs index 0e13713bfb..20818bf150 100644 --- a/new/src/config.rs +++ b/new/src/config.rs @@ -19,10 +19,10 @@ use codec::{Decode, Encode}; use core::fmt::Debug; use scale_decode::DecodeAsType; use scale_encode::EncodeAsType; -use serde::{Serialize, de::DeserializeOwned}; -use subxt_metadata::Metadata; -use std::{fmt::Display, marker::PhantomData, sync::Arc}; use scale_info_legacy::TypeRegistrySet; +use serde::{Serialize, de::DeserializeOwned}; +use std::{fmt::Display, marker::PhantomData, sync::Arc}; +use subxt_metadata::Metadata; use subxt_rpcs::RpcConfig; pub use default_extrinsic_params::{DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder}; @@ -32,7 +32,7 @@ pub use substrate::{SubstrateConfig, SubstrateExtrinsicParams, SubstrateExtrinsi pub use transaction_extensions::TransactionExtension; /// Configuration for a given chain and the runtimes within. This consists of the -/// type information needed to work at the head of the chain (namely submitting +/// type information needed to work at the head of the chain (namely submitting /// transactions), as well as functionality which we might wish to customize for a /// given chain. pub trait Config: Clone + Debug + Sized + Send + Sync + 'static { @@ -72,27 +72,20 @@ pub trait Config: Clone + Debug + Sized + Send + Sync + 'static { /// The [`crate::client::OnlineClient`] will look this up on chain if it's not available here, and then /// call [`Config::set_metadata_for_spec_version`] to give the configuration the opportunity to cache it. /// The [`crate::client::OfflineClient`] will error if this is not available for the required spec version. - fn metadata_for_spec_version( - &self, - _spec_version: u32, - ) -> Option> { + fn metadata_for_spec_version(&self, _spec_version: u32) -> Option> { None } /// Set some metadata for a given spec version. the [`crate::client::OnlineClient`] will call this if it has /// to retrieve metadata from the chain, to give this the opportunity to cache it. The configuration can /// do nothing if it prefers. - fn set_metadata_for_spec_version( - &self, - _spec_version: u32, - _metadata: Arc, - ) {} + fn set_metadata_for_spec_version(&self, _spec_version: u32, _metadata: Arc) {} /// Return legacy types (ie types to use with Runtimes that return pre-V14 metadata) for a given spec version. - /// If this returns `None`, [`subxt`] will return an error if type definitions are needed to access some older + /// If this returns `None`, [`subxt`] will return an error if type definitions are needed to access some older /// block. - /// - /// This doesn't need to live for long; it will be used to translate any older metadata returned from the node + /// + /// This doesn't need to live for long; it will be used to translate any older metadata returned from the node /// into our [`Metadata`] type, which will then be used. fn legacy_types_for_spec_version<'this>( &'this self, @@ -105,14 +98,14 @@ pub trait Config: Clone + Debug + Sized + Send + Sync + 'static { /// `RpcConfigFor` can be used anywhere which requires an implementation of [`subxt_rpcs::RpcConfig`]. /// This is only needed at the type level, and so there is no way to construct this. pub struct RpcConfigFor { - marker: PhantomData + marker: PhantomData, } -impl RpcConfig for RpcConfigFor { +impl RpcConfig for RpcConfigFor { type Hash = HashFor; type Header = T::Header; type AccountId = T::AccountId; -} +} /// Given some [`Config`], this returns the type of hash used. pub type HashFor = <::Hasher as Hasher>::Hash; diff --git a/new/src/config/extrinsic_params.rs b/new/src/config/extrinsic_params.rs index 0789dca22e..8261d015d4 100644 --- a/new/src/config/extrinsic_params.rs +++ b/new/src/config/extrinsic_params.rs @@ -11,9 +11,9 @@ use crate::{ config::{Config, HashFor}, error::ExtrinsicParamsError, }; -use subxt_metadata::Metadata; -use std::sync::Arc; use core::any::Any; +use std::sync::Arc; +use subxt_metadata::Metadata; /// This provides access to some relevant client state in transaction extensions, /// and is just a combination of some of the available properties. diff --git a/new/src/config/polkadot.rs b/new/src/config/polkadot.rs index d64e0a0364..b5f907fd6d 100644 --- a/new/src/config/polkadot.rs +++ b/new/src/config/polkadot.rs @@ -7,11 +7,11 @@ use super::{Config, DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder}; use crate::config::substrate::{SubstrateConfig, SubstrateConfigBuilder}; -use std::sync::Arc; use scale_info_legacy::TypeRegistrySet; +use std::sync::Arc; use subxt_metadata::Metadata; -pub use crate::config::substrate::{ SpecVersionForRange, SubstrateHeader }; +pub use crate::config::substrate::{SpecVersionForRange, SubstrateHeader}; pub use crate::utils::{AccountId32, MultiAddress, MultiSignature}; pub use primitive_types::{H256, U256}; @@ -79,18 +79,11 @@ impl Config for PolkadotConfig { self.0.spec_version_for_block_number(block_number) } - fn metadata_for_spec_version( - &self, - spec_version: u32, - ) -> Option> { + fn metadata_for_spec_version(&self, spec_version: u32) -> Option> { self.0.metadata_for_spec_version(spec_version) } - fn set_metadata_for_spec_version( - &self, - spec_version: u32, - metadata: Arc, - ) { + fn set_metadata_for_spec_version(&self, spec_version: u32, metadata: Arc) { self.0.set_metadata_for_spec_version(spec_version, metadata) } } diff --git a/new/src/config/substrate.rs b/new/src/config/substrate.rs index 29bb0dc3fe..c116dab15f 100644 --- a/new/src/config/substrate.rs +++ b/new/src/config/substrate.rs @@ -6,16 +6,16 @@ use super::{Config, DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder, Hasher, Header}; use crate::config::Hash; +use crate::utils::RangeMap; pub use crate::utils::{AccountId32, MultiAddress, MultiSignature}; use codec::{Decode, Encode}; pub use primitive_types::{H256, U256}; -use serde::{Deserialize, Serialize}; -use subxt_metadata::Metadata; -use crate::utils::RangeMap; use scale_info_legacy::{ChainTypeRegistry, TypeRegistrySet}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex; +use subxt_metadata::Metadata; /// Construct a [`SubstrateConfig`] using this. pub struct SubstrateConfigBuilder { @@ -81,22 +81,19 @@ impl SubstrateConfigBuilder { /// The storage hasher encoding/decoding changed during V9 metadata. By default we support the "new" version /// of things. We can use this option to support the old version of things prior to a given spec version. - pub fn use_old_v9_hashers_before_spec_version( - mut self, - spec_version: u32 - ) -> Self { + pub fn use_old_v9_hashers_before_spec_version(mut self, spec_version: u32) -> Self { self.use_old_v9_hashers_before_spec_version = spec_version; self } /// Construct the [`SubstrateConfig`] from this builder. pub fn build(self) -> SubstrateConfig { - SubstrateConfig { + SubstrateConfig { inner: Arc::new(SubstrateConfigInner { legacy_types: self.legacy_types, spec_version_for_block_number: self.spec_version_for_block_number, metadata_for_spec_version: self.metadata_for_spec_version, - }) + }), } } } @@ -116,7 +113,7 @@ pub struct SpecVersionForRange { /// that have not customized the block hash type). #[derive(Debug, Clone)] pub struct SubstrateConfig { - inner: Arc + inner: Arc, } #[derive(Debug)] @@ -150,15 +147,13 @@ impl Config for SubstrateConfig { } fn spec_version_for_block_number(&self, block_number: u64) -> Option { - self.inner.spec_version_for_block_number + self.inner + .spec_version_for_block_number .get(block_number) .copied() } - fn metadata_for_spec_version( - &self, - spec_version: u32, - ) -> Option> { + fn metadata_for_spec_version(&self, spec_version: u32) -> Option> { self.inner .metadata_for_spec_version .lock() @@ -167,11 +162,7 @@ impl Config for SubstrateConfig { .cloned() } - fn set_metadata_for_spec_version( - &self, - spec_version: u32, - metadata: Arc, - ) { + fn set_metadata_for_spec_version(&self, spec_version: u32, metadata: Arc) { self.inner .metadata_for_spec_version .lock() diff --git a/new/src/config/transaction_extensions.rs b/new/src/config/transaction_extensions.rs index 2fe85aba25..617ccbd3be 100644 --- a/new/src/config/transaction_extensions.rs +++ b/new/src/config/transaction_extensions.rs @@ -7,7 +7,7 @@ //! [`AnyOf`] to configure the set of transaction extensions which are known about //! when interacting with a chain. -use super::extrinsic_params::{ ExtrinsicParams, ClientState }; +use super::extrinsic_params::{ClientState, ExtrinsicParams}; use crate::config::ExtrinsicParamsEncoder; use crate::config::{Config, HashFor}; use crate::error::ExtrinsicParamsError; @@ -16,9 +16,9 @@ use codec::{Compact, Encode}; use core::any::Any; use core::fmt::Debug; use derive_where::derive_where; -use std::collections::HashMap; use scale_decode::DecodeAsType; use scale_info::PortableRegistry; +use std::collections::HashMap; // Re-export this here; it's a bit generically named to be re-exported from ::config. pub use super::extrinsic_params::Params; diff --git a/new/src/error.rs b/new/src/error.rs index 7bc244557a..422f37e5f4 100644 --- a/new/src/error.rs +++ b/new/src/error.rs @@ -7,8 +7,8 @@ mod dispatch_error; mod hex; -use thiserror::Error as DeriveError; use std::borrow::Cow; +use thiserror::Error as DeriveError; #[cfg(feature = "unstable-light-client")] pub use subxt_lightclient::LightClientError; @@ -19,10 +19,10 @@ pub use dispatch_error::{ }; // Re-expose the errors we use from other crates here: -pub use subxt_metadata::Metadata; pub use hex::Hex; pub use scale_decode::Error as DecodeError; pub use scale_encode::Error as EncodeError; +pub use subxt_metadata::Metadata; pub use subxt_metadata::TryFromError as MetadataTryFromError; /// A global error type. Any of the errors exposed here can convert into this @@ -50,14 +50,10 @@ pub enum Error { #[error(transparent)] BackendError(#[from] BackendError), #[error(transparent)] - BlockError(#[from] BlockError), + BlocksError(#[from] BlocksError), #[error(transparent)] AccountNonceError(#[from] AccountNonceError), #[error(transparent)] - RuntimeUpdaterError(#[from] RuntimeUpdaterError), - #[error(transparent)] - RuntimeUpdateeApplyError(#[from] RuntimeUpdateeApplyError), - #[error(transparent)] RuntimeApiError(#[from] RuntimeApiError), #[error(transparent)] EventsError(#[from] EventsError), @@ -143,10 +139,12 @@ impl Error { fn backend_error(&self) -> Option<&BackendError> { match self { - Error::BlockError(e) => e.backend_error(), + // Many of these contain no backend error, but keep the checks next to + // the actual error types to make it harder to miss adding any, and be exhaustive + // here so new error variants are not missed as easily. + Error::BlocksError(e) => e.backend_error(), Error::AccountNonceError(e) => e.backend_error(), Error::OnlineClientError(e) => e.backend_error(), - Error::RuntimeUpdaterError(e) => e.backend_error(), Error::RuntimeApiError(e) => e.backend_error(), Error::EventsError(e) => e.backend_error(), Error::ExtrinsicError(e) => e.backend_error(), @@ -155,8 +153,23 @@ impl Error { Error::TransactionEventsError(e) => e.backend_error(), Error::TransactionFinalizedSuccessError(e) => e.backend_error(), Error::StorageError(e) => e.backend_error(), - // Any errors that **don't** return a BackendError anywhere will return None: - _ => None, + Error::OfflineClientAtBlockError(e) => e.backend_error(), + Error::OnlineClientAtBlockError(e) => e.backend_error(), + Error::ExtrinsicDecodeErrorAt(e) => e.backend_error(), + Error::ConstantError(e) => e.backend_error(), + Error::CustomValueError(e) => e.backend_error(), + Error::StorageKeyError(e) => e.backend_error(), + Error::StorageValueError(e) => e.backend_error(), + Error::TransactionStatusError(e) => e.backend_error(), + Error::ModuleErrorDetailsError(e) => e.backend_error(), + Error::ModuleErrorDecodeError(e) => e.backend_error(), + Error::DispatchErrorDecodeError(e) => e.backend_error(), + // BackendError is always a BackendError: + Error::BackendError(e) => Some(e), + // Other errors come from different crates so can never contain a BackendError: + Error::OtherRpcClientError(_) => None, + Error::OtherCodecError(_) => None, + Error::Other(_) => None, } } } @@ -182,6 +195,12 @@ pub enum OfflineClientAtBlockError { }, } +impl OfflineClientAtBlockError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + #[derive(Debug, thiserror::Error)] #[non_exhaustive] #[allow(missing_docs)] @@ -219,11 +238,39 @@ impl OnlineClientError { } } +/// Errors constructing streams of blocks. +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum BlocksError { + #[error("Cannot construct block stream: cannot get the current block: {0}")] + CannotGetCurrentBlock(OnlineClientAtBlockError), + #[error("Cannot construct block stream: cannot get block header stream: {0}")] + CannotGetBlockHeaderStream(BackendError), + #[error("Error streaming blocks: cannot get the next block header: {0}")] + CannotGetBlockHeader(BackendError), +} + +impl BlocksError { + fn backend_error(&self) -> Option<&BackendError> { + match self { + BlocksError::CannotGetCurrentBlock(e) => e.backend_error(), + BlocksError::CannotGetBlockHeaderStream(e) => Some(e), + BlocksError::CannotGetBlockHeader(e) => Some(e), + } + } +} + /// Errors constructing an online client at a specific block number. #[allow(missing_docs)] #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum OnlineClientAtBlockError { + #[error("Cannot construct OnlineClientAtBlock: cannot get the current block: {reason}")] + CannotGetCurrentBlock { + /// The error we encountered. + reason: BackendError, + }, #[error( "Cannot construct OnlineClientAtBlock: failed to get block hash from node for block {block_number}: {reason}" )] @@ -238,19 +285,25 @@ pub enum OnlineClientAtBlockError { /// The block number for which a block was not found. block_number: u64, }, - #[error("Cannot construct OnlineClientAtBlock: cannot get the block header for block {block_hash}: {reason}")] + #[error( + "Cannot construct OnlineClientAtBlock: cannot get the block header for block {block_hash}: {reason}" + )] CannotGetBlockHeader { - /// Block hash that we failed to fetch the header for. + /// Block hash that we failed to fetch the header for. block_hash: Hex, /// The error we encountered. reason: BackendError, }, - #[error("Cannot construct OnlineClientAtBlock: cannot find the block header for block {block_hash}")] + #[error( + "Cannot construct OnlineClientAtBlock: cannot find the block header for block {block_hash}" + )] BlockHeaderNotFound { - /// Block hash that we failed to find the header for. + /// Block hash that we failed to find the header for. block_hash: Hex, }, - #[error("Cannot construct OnlineClientAtBlock: failed to obtain spec version for block {block_hash}: {reason}")] + #[error( + "Cannot construct OnlineClientAtBlock: failed to obtain spec version for block {block_hash}: {reason}" + )] CannotGetSpecVersion { /// The block hash for which we failed to obtain the spec version. block_hash: Hex, @@ -275,41 +328,59 @@ pub enum OnlineClientAtBlockError { /// The error we encountered. reason: String, }, - #[error("Cannot construct OnlineClientAtBlock: Metadata V{version} (required at block {block_hash} is not supported.")] + #[error( + "Cannot construct OnlineClientAtBlock: Metadata V{version} (required at block {block_hash} is not supported." + )] UnsupportedMetadataVersion { /// The block hash that requires the unsupported version. block_hash: Hex, /// The unsupported metadata version. version: u32, }, - #[error("Cannot construct OnlineClientAtBlock: No legacy types were provided but we're trying to access a block that requires them.")] + #[error( + "Cannot construct OnlineClientAtBlock: No legacy types were provided but we're trying to access a block that requires them." + )] MissingLegacyTypes, - #[error("Cannot construct OnlineClientAtBlock: unable to convert legacy metadata (required at block {block_hash}): {reason}")] + #[error( + "Cannot construct OnlineClientAtBlock: unable to convert legacy metadata (required at block {block_hash}): {reason}" + )] CannotConvertLegacyMetadata { /// The block hash that requires legacy types. block_hash: Hex, /// The metadata version. metadata_version: u32, /// Reason the conversion failed. - reason: subxt_metadata::LegacyFromError + reason: subxt_metadata::LegacyFromError, }, - #[error("Cannot construct OnlineClientAtBlock: unable to convert modern metadata (required at block {block_hash}): {reason}")] + #[error( + "Cannot construct OnlineClientAtBlock: unable to convert modern metadata (required at block {block_hash}): {reason}" + )] CannotConvertModernMetadata { /// The block hash that requires legacy types. block_hash: Hex, /// The metadata version. metadata_version: u32, /// Reason the conversion failed. - reason: subxt_metadata::TryFromError - } + reason: subxt_metadata::TryFromError, + }, // #[error( + // "Cannot construct OnlineClientAtBlock: cannot inject types from metadata: failure to parse a type found in the metadata: {parse_error}" + // )] + // CannotInjectMetadataTypes { + // /// Error parsing a type found in the metadata. + // parse_error: scale_info_legacy::lookup_name::ParseError, + // }, +} - // #[error( - // "Cannot construct OnlineClientAtBlock: cannot inject types from metadata: failure to parse a type found in the metadata: {parse_error}" - // )] - // CannotInjectMetadataTypes { - // /// Error parsing a type found in the metadata. - // parse_error: scale_info_legacy::lookup_name::ParseError, - // }, +impl OnlineClientAtBlockError { + fn backend_error(&self) -> Option<&BackendError> { + match self { + OnlineClientAtBlockError::CannotGetCurrentBlock { reason } + | OnlineClientAtBlockError::CannotGetBlockHash { reason, .. } + | OnlineClientAtBlockError::CannotGetBlockHeader { reason, .. } + | OnlineClientAtBlockError::CannotGetSpecVersion { reason, .. } => Some(reason), + _ => None, + } + } } #[derive(Debug, thiserror::Error)] @@ -362,7 +433,7 @@ impl From for BackendError { #[allow(missing_docs)] pub enum CombinedBackendError { #[error("Could not obtain the list of RPC methods to determine which backends can be used")] - CouldNotObtainRpcMethodList(subxt_rpcs::Error) + CouldNotObtainRpcMethodList(subxt_rpcs::Error), } /// An RPC error. Since we are generic over the RPC client that is used, @@ -382,49 +453,6 @@ pub enum RpcError { SubscriptionDropped, } -/// Block error -#[derive(Debug, thiserror::Error)] -#[non_exhaustive] -#[allow(missing_docs)] -pub enum BlockError { - #[error( - "Could not find the block body with hash {block_hash} (perhaps it was on a non-finalized fork?)" - )] - BlockNotFound { block_hash: Hex }, - #[error("Could not download the block header with hash {block_hash}: {reason}")] - CouldNotGetBlockHeader { - block_hash: Hex, - reason: BackendError, - }, - #[error("Could not download the latest block header: {0}")] - CouldNotGetLatestBlock(BackendError), - #[error("Could not subscribe to all blocks: {0}")] - CouldNotSubscribeToAllBlocks(BackendError), - #[error("Could not subscribe to best blocks: {0}")] - CouldNotSubscribeToBestBlocks(BackendError), - #[error("Could not subscribe to finalized blocks: {0}")] - CouldNotSubscribeToFinalizedBlocks(BackendError), - #[error("Error getting account nonce at block {block_hash}")] - AccountNonceError { - block_hash: Hex, - account_id: Hex, - reason: AccountNonceError, - }, -} - -impl BlockError { - fn backend_error(&self) -> Option<&BackendError> { - match self { - BlockError::CouldNotGetBlockHeader { reason: e, .. } - | BlockError::CouldNotGetLatestBlock(e) - | BlockError::CouldNotSubscribeToAllBlocks(e) - | BlockError::CouldNotSubscribeToBestBlocks(e) - | BlockError::CouldNotSubscribeToFinalizedBlocks(e) => Some(e), - _ => None, - } - } -} - #[derive(Debug, thiserror::Error)] #[non_exhaustive] #[allow(missing_docs)] @@ -446,56 +474,6 @@ impl AccountNonceError { } } -#[derive(Debug, thiserror::Error)] -#[non_exhaustive] -#[allow(missing_docs)] -pub enum RuntimeUpdaterError { - #[error("Error subscribing to runtime updates: The update stream ended unexpectedly")] - UnexpectedEndOfUpdateStream, - #[error("Error subscribing to runtime updates: The finalized block stream ended unexpectedly")] - UnexpectedEndOfBlockStream, - #[error("Error subscribing to runtime updates: Can't stream runtime version: {0}")] - CannotStreamRuntimeVersion(BackendError), - #[error("Error subscribing to runtime updates: Can't get next runtime version in stream: {0}")] - CannotGetNextRuntimeVersion(BackendError), - #[error("Error subscribing to runtime updates: Cannot stream finalized blocks: {0}")] - CannotStreamFinalizedBlocks(BackendError), - #[error("Error subscribing to runtime updates: Cannot get next finalized block in stream: {0}")] - CannotGetNextFinalizedBlock(BackendError), - #[error("Cannot fetch new metadata for runtime update: {0}")] - CannotFetchNewMetadata(BackendError), - #[error( - "Error subscribing to runtime updates: Cannot find the System.LastRuntimeUpgrade storage entry" - )] - CantFindSystemLastRuntimeUpgrade, - #[error("Error subscribing to runtime updates: Cannot fetch last runtime upgrade: {0}")] - CantFetchLastRuntimeUpgrade(StorageError), - #[error("Error subscribing to runtime updates: Cannot decode last runtime upgrade: {0}")] - CannotDecodeLastRuntimeUpgrade(StorageValueError), -} - -impl RuntimeUpdaterError { - fn backend_error(&self) -> Option<&BackendError> { - match self { - RuntimeUpdaterError::CannotStreamRuntimeVersion(e) - | RuntimeUpdaterError::CannotGetNextRuntimeVersion(e) - | RuntimeUpdaterError::CannotStreamFinalizedBlocks(e) - | RuntimeUpdaterError::CannotGetNextFinalizedBlock(e) - | RuntimeUpdaterError::CannotFetchNewMetadata(e) => Some(e), - _ => None, - } - } -} - -/// Error that can occur during upgrade. -#[non_exhaustive] -#[derive(Debug, thiserror::Error)] -#[allow(missing_docs)] -pub enum RuntimeUpdateeApplyError { - #[error("The proposed runtime update is the same as the current version")] - SameVersion, -} - /// Error working with Runtime APIs #[non_exhaustive] #[derive(Debug, thiserror::Error)] @@ -704,6 +682,12 @@ pub enum CustomValueError { CouldNotDecodeCustomValue(frame_decode::custom_values::CustomValueDecodeError), } +impl CustomValueError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + /// Error working with View Functions. #[non_exhaustive] #[derive(Debug, thiserror::Error)] @@ -781,6 +765,12 @@ pub enum TransactionStatusError { Dropped(String), } +impl TransactionStatusError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + /// Error fetching events for a just-submitted transaction #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -876,6 +866,12 @@ pub enum ModuleErrorDetailsError { }, } +impl ModuleErrorDetailsError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + /// Error decoding the [`ModuleError`] #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -883,6 +879,12 @@ pub enum ModuleErrorDetailsError { #[error("Could not decode the DispatchError::Module payload into the given type: {0}")] pub struct ModuleErrorDecodeError(scale_decode::Error); +impl ModuleErrorDecodeError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + /// Error decoding the [`DispatchError`] #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -901,6 +903,12 @@ pub enum DispatchErrorDecodeError { }, } +impl DispatchErrorDecodeError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + /// Error working with storage. #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -984,6 +992,12 @@ pub enum ConstantError { ConstantInfoError(frame_decode::constants::ConstantInfoError<'static>), } +impl ConstantError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + #[derive(Debug, DeriveError)] #[non_exhaustive] #[allow(missing_docs)] @@ -1006,6 +1020,12 @@ pub enum StorageKeyError { }, } +impl StorageKeyError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + #[derive(Debug, DeriveError)] #[non_exhaustive] #[allow(missing_docs)] @@ -1018,6 +1038,12 @@ pub enum StorageValueError { LeftoverBytes { bytes: Vec }, } +impl StorageValueError { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + #[derive(Debug, thiserror::Error)] #[non_exhaustive] #[allow(missing_docs)] @@ -1027,6 +1053,12 @@ pub struct ExtrinsicDecodeErrorAt { pub error: ExtrinsicDecodeErrorAtReason, } +impl ExtrinsicDecodeErrorAt { + fn backend_error(&self) -> Option<&BackendError> { + None + } +} + #[derive(Debug, thiserror::Error)] #[non_exhaustive] #[allow(missing_docs)] diff --git a/new/src/error/dispatch_error.rs b/new/src/error/dispatch_error.rs index c7e9900321..f29dcae575 100644 --- a/new/src/error/dispatch_error.rs +++ b/new/src/error/dispatch_error.rs @@ -6,10 +6,10 @@ //! something fails in trying to submit/execute a transaction. use super::{DispatchErrorDecodeError, ModuleErrorDecodeError, ModuleErrorDetailsError}; -use subxt_metadata::Metadata; use core::fmt::Debug; use scale_decode::{DecodeAsType, TypeResolver, visitor::DecodeAsTypeResult}; use std::{borrow::Cow, marker::PhantomData}; +use subxt_metadata::Metadata; /// An error dispatching a transaction. #[derive(Debug, thiserror::Error, PartialEq, Eq)] diff --git a/new/src/lib.rs b/new/src/lib.rs index b648a7a9db..80ce58a3da 100644 --- a/new/src/lib.rs +++ b/new/src/lib.rs @@ -32,11 +32,11 @@ mod only_used_in_docs_or_tests { // #[macro_use] // mod macros; -pub mod config; +pub mod backend; pub mod client; +pub mod config; pub mod error; pub mod utils; -pub mod backend; // pub mod book; // pub mod blocks; // pub mod constants; diff --git a/new/src/utils.rs b/new/src/utils.rs index ef92650ea5..17264ce025 100644 --- a/new/src/utils.rs +++ b/new/src/utils.rs @@ -10,22 +10,22 @@ pub mod bits; mod era; mod multi_address; mod multi_signature; +mod range_map; mod static_type; mod unchecked_extrinsic; mod wrapper_opaque; mod yesnomaybe; -mod range_map; use codec::{Compact, Decode, Encode}; use derive_where::derive_where; -pub use range_map::{ RangeMap, RangeMapBuilder, RangeMapError }; pub use account_id::AccountId32; pub use account_id20::AccountId20; pub use era::Era; pub use multi_address::MultiAddress; pub use multi_signature::MultiSignature; pub use primitive_types::{H160, H256, H512}; +pub use range_map::{RangeMap, RangeMapBuilder, RangeMapError}; pub use static_type::Static; pub use unchecked_extrinsic::UncheckedExtrinsic; pub use wrapper_opaque::WrapperKeepOpaque; diff --git a/new/src/utils/range_map.rs b/new/src/utils/range_map.rs index a98674a288..2bb800c75f 100644 --- a/new/src/utils/range_map.rs +++ b/new/src/utils/range_map.rs @@ -106,11 +106,11 @@ pub enum RangeMapError { EmptyRange(K), /// An error indicating that the proposed block range overlaps with an existing one. #[error("Overlapping block ranges are not allowed: proposed range is {}..{}, but we already have {}..{}", proposed.0, proposed.1, existing.0, existing.1)] - OverlappingRanges { + OverlappingRanges { /// The range being proposed / added. - proposed: (K, K), + proposed: (K, K), /// The existing range which overlaps. - existing: (K, K) + existing: (K, K), }, } diff --git a/new/src/utils/unchecked_extrinsic.rs b/new/src/utils/unchecked_extrinsic.rs index 16ce942a2f..eef9803c7a 100644 --- a/new/src/utils/unchecked_extrinsic.rs +++ b/new/src/utils/unchecked_extrinsic.rs @@ -9,10 +9,10 @@ //! runtime APIs. Deriving `EncodeAsType` would lead to the inner //! bytes to be re-encoded (length prefixed). -use core::marker::PhantomData; -use codec::{Decode, Encode}; -use scale_decode::{DecodeAsType, IntoVisitor, TypeResolver, Visitor, visitor::DecodeAsTypeResult}; use super::{Encoded, Static}; +use codec::{Decode, Encode}; +use core::marker::PhantomData; +use scale_decode::{DecodeAsType, IntoVisitor, TypeResolver, Visitor, visitor::DecodeAsTypeResult}; /// The unchecked extrinsic from substrate. #[derive(Clone, Debug, Eq, PartialEq, Encode)] diff --git a/rpcs/src/methods/chain_head.rs b/rpcs/src/methods/chain_head.rs index 69863cd3d5..5aba9f8dbb 100644 --- a/rpcs/src/methods/chain_head.rs +++ b/rpcs/src/methods/chain_head.rs @@ -15,8 +15,8 @@ use serde::{Deserialize, Deserializer, Serialize}; use std::collections::{HashMap, VecDeque}; use std::task::Poll; -/// An interface to call the new ["chainHead" RPC methods](https://paritytech.github.io/json-rpc-interface-spec/). -/// This interface is instantiated with some `T: RpcConfig` trait which determines some of the types that +/// An interface to call the new ["chainHead" RPC methods](https://paritytech.github.io/json-rpc-interface-spec/). +/// This interface is instantiated with some `T: RpcConfig` trait which determines some of the types that /// the RPC methods will take or hand back. #[derive_where(Clone, Debug)] pub struct ChainHeadRpcMethods { @@ -730,8 +730,8 @@ pub struct ArchiveStorageQuery { /// The type of the storage query. #[serde(rename = "type")] pub query_type: StorageQueryType, - /// This parameter is optional and should be a string containing the hexadecimal-encoded key - /// from which the storage iteration should resume. This parameter is only valid in the context + /// This parameter is optional and should be a string containing the hexadecimal-encoded key + /// from which the storage iteration should resume. This parameter is only valid in the context /// of `descendantsValues` and `descendantsHashes`. #[serde(skip_serializing_if = "Option::is_none")] pub pagination_start_key: Option,