Add ability to stream blocks and tidy errors

This commit is contained in:
James Wilson
2025-12-02 13:11:36 +00:00
parent a6a384c5bc
commit e4028e5a77
27 changed files with 888 additions and 643 deletions
+8 -4
View File
@@ -216,7 +216,7 @@ where
> { > {
use futures::stream::StreamExt; use futures::stream::StreamExt;
use subxt_rpcs::methods::chain_head::{ use subxt_rpcs::methods::chain_head::{
ArchiveStorageEvent, StorageQuery, StorageQueryType, ArchiveStorageEvent, ArchiveStorageQuery, StorageQueryType,
}; };
let expected_num_keys = with_info!(info = &*self.info => { let expected_num_keys = with_info!(info = &*self.info => {
@@ -235,9 +235,10 @@ where
let block_hash = self.client.block_hash(); let block_hash = self.client.block_hash();
let key_bytes = self.key(keys)?; let key_bytes = self.key(keys)?;
let items = std::iter::once(StorageQuery { let items = std::iter::once(ArchiveStorageQuery {
key: &*key_bytes, key: &*key_bytes,
query_type: StorageQueryType::DescendantsValues, query_type: StorageQueryType::DescendantsValues,
pagination_start_key: None,
}); });
let sub = self let sub = self
@@ -297,11 +298,14 @@ where
T: Config + 'atblock, T: Config + 'atblock,
Client: OnlineClientAtBlockT<'atblock, T>, Client: OnlineClientAtBlockT<'atblock, T>,
{ {
use subxt_rpcs::methods::chain_head::{ArchiveStorageEvent, StorageQuery, StorageQueryType}; use subxt_rpcs::methods::chain_head::{
ArchiveStorageEvent, ArchiveStorageQuery, StorageQueryType,
};
let query = StorageQuery { let query = ArchiveStorageQuery {
key: key_bytes, key: key_bytes,
query_type: StorageQueryType::Value, query_type: StorageQueryType::Value,
pagination_start_key: None,
}; };
let mut response_stream = client let mut response_stream = client
+10 -20
View File
@@ -6,10 +6,10 @@
//! the necessary information (probably from a JSON-RPC API, but that's up to the //! the necessary information (probably from a JSON-RPC API, but that's up to the
//! implementation). //! implementation).
mod chain_head;
mod archive; mod archive;
mod legacy; mod chain_head;
mod combined; mod combined;
mod legacy;
mod utils; mod utils;
use crate::config::{Config, HashFor}; use crate::config::{Config, HashFor};
@@ -22,23 +22,10 @@ use std::sync::Arc;
use subxt_metadata::Metadata; use subxt_metadata::Metadata;
// Expose our various backends. // Expose our various backends.
pub use chain_head::{ pub use archive::ArchiveBackend;
ChainHeadBackend, pub use chain_head::{ChainHeadBackend, ChainHeadBackendBuilder, ChainHeadBackendDriver};
ChainHeadBackendBuilder, pub use combined::{CombinedBackend, CombinedBackendBuilder, CombinedBackendDriver};
ChainHeadBackendDriver, pub use legacy::{LegacyBackend, LegacyBackendBuilder};
};
pub use archive::{
ArchiveBackend
};
pub use legacy::{
LegacyBackend,
LegacyBackendBuilder
};
pub use combined::{
CombinedBackend,
CombinedBackendBuilder,
CombinedBackendDriver
};
/// Prevent the backend trait being implemented externally. /// Prevent the backend trait being implemented externally.
#[doc(hidden)] #[doc(hidden)]
@@ -78,7 +65,10 @@ pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
/// multiple block hashes correspond to the given number (ie if the number is greater /// multiple block hashes correspond to the given number (ie if the number is greater
/// than that of the latest finalized block and some forks exist). Nevertheless, it could /// than that of the latest finalized block and some forks exist). Nevertheless, it could
/// still return the hash to a block on some fork that is pruned. /// still return the hash to a block on some fork that is pruned.
async fn block_number_to_hash(&self, number: u64) -> Result<Option<BlockRef<HashFor<T>>>, BackendError>; async fn block_number_to_hash(
&self,
number: u64,
) -> Result<Option<BlockRef<HashFor<T>>>, BackendError>;
/// Get a block header. /// Get a block header.
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError>; async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError>;
+70 -48
View File
@@ -12,20 +12,17 @@
mod storage_stream; mod storage_stream;
use subxt_rpcs::methods::ChainHeadRpcMethods;
use crate::backend::{ use crate::backend::{
Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, utils::retry,
TransactionStatus, utils::retry,
}; };
use crate::config::{Config, HashFor, RpcConfigFor}; use crate::config::{Config, HashFor, RpcConfigFor};
use crate::error::BackendError; use crate::error::BackendError;
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use subxt_rpcs::RpcClient;
use subxt_rpcs::methods::chain_head::{
ArchiveStorageQuery, ArchiveCallResult, StorageQueryType,
};
use storage_stream::ArchiveStorageStream; use storage_stream::ArchiveStorageStream;
use subxt_rpcs::RpcClient;
use subxt_rpcs::methods::ChainHeadRpcMethods;
use subxt_rpcs::methods::chain_head::{ArchiveCallResult, ArchiveStorageQuery, StorageQueryType};
/// The archive backend. /// The archive backend.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -36,7 +33,7 @@ pub struct ArchiveBackend<T: Config> {
impl<T: Config> ArchiveBackend<T> { impl<T: Config> ArchiveBackend<T> {
/// Configure and construct an [`ArchiveBackend`] and the associated [`ChainHeadBackendDriver`]. /// Configure and construct an [`ArchiveBackend`] and the associated [`ChainHeadBackendDriver`].
pub fn new(client: impl Into<RpcClient>,) -> ArchiveBackend<T> { pub fn new(client: impl Into<RpcClient>) -> ArchiveBackend<T> {
let methods = ChainHeadRpcMethods::new(client.into()); let methods = ChainHeadRpcMethods::new(client.into());
ArchiveBackend { methods } ArchiveBackend { methods }
@@ -50,7 +47,8 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
keys: Vec<Vec<u8>>, keys: Vec<Vec<u8>>,
at: HashFor<T>, at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> { ) -> Result<StreamOfResults<StorageResponse>, BackendError> {
let queries = keys.into_iter() let queries = keys
.into_iter()
.map(|key| ArchiveStorageQuery { .map(|key| ArchiveStorageQuery {
key: key, key: key,
query_type: StorageQueryType::Value, query_type: StorageQueryType::Value,
@@ -58,12 +56,17 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
}) })
.collect(); .collect();
let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| { let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries)
match item { .map(|item| match item {
Err(e) => Some(Err(e)), Err(e) => Some(Err(e)),
Ok(item) => item.value.map(|val| Ok(StorageResponse { key: item.key.0, value: val.0 })) Ok(item) => item.value.map(|val| {
} Ok(StorageResponse {
}).filter_map(async |item| item); key: item.key.0,
value: val.0,
})
}),
})
.filter_map(async |item| item);
Ok(StreamOf(Box::pin(stream))) Ok(StreamOf(Box::pin(stream)))
} }
@@ -74,19 +77,18 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
at: HashFor<T>, at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError> { ) -> Result<StreamOfResults<Vec<u8>>, BackendError> {
let queries = std::iter::once(ArchiveStorageQuery { let queries = std::iter::once(ArchiveStorageQuery {
key: key, key: key,
// Just ask for the hash and then ignore it and return keys // Just ask for the hash and then ignore it and return keys
query_type: StorageQueryType::DescendantsHashes, query_type: StorageQueryType::DescendantsHashes,
pagination_start_key: None, pagination_start_key: None,
}) })
.collect(); .collect();
let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| { let stream =
match item { ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| match item {
Err(e) => Err(e), Err(e) => Err(e),
Ok(item) => Ok(item.key.0) Ok(item) => Ok(item.key.0),
} });
});
Ok(StreamOf(Box::pin(stream))) Ok(StreamOf(Box::pin(stream)))
} }
@@ -97,18 +99,23 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
at: HashFor<T>, at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> { ) -> Result<StreamOfResults<StorageResponse>, BackendError> {
let queries = std::iter::once(ArchiveStorageQuery { let queries = std::iter::once(ArchiveStorageQuery {
key: key, key: key,
query_type: StorageQueryType::DescendantsValues, query_type: StorageQueryType::DescendantsValues,
pagination_start_key: None, pagination_start_key: None,
}) })
.collect(); .collect();
let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries).map(|item| { let stream = ArchiveStorageStream::new(at, self.methods.clone(), queries)
match item { .map(|item| match item {
Err(e) => Some(Err(e)), Err(e) => Some(Err(e)),
Ok(item) => item.value.map(|val| Ok(StorageResponse { key: item.key.0, value: val.0 })) Ok(item) => item.value.map(|val| {
} Ok(StorageResponse {
}).filter_map(async |item| item); key: item.key.0,
value: val.0,
})
}),
})
.filter_map(async |item| item);
Ok(StreamOf(Box::pin(stream))) Ok(StreamOf(Box::pin(stream)))
} }
@@ -121,9 +128,15 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
.await .await
} }
async fn block_number_to_hash(&self, number: u64) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> { async fn block_number_to_hash(
&self,
number: u64,
) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> {
retry(|| async { retry(|| async {
let mut hashes = self.methods.archive_v1_hash_by_height(number as usize).await?; let mut hashes = self
.methods
.archive_v1_hash_by_height(number as usize)
.await?;
if let (Some(hash), None) = (hashes.pop(), hashes.pop()) { if let (Some(hash), None) = (hashes.pop(), hashes.pop()) {
// One hash; return it. // One hash; return it.
Ok(Some(BlockRef::from_hash(hash))) Ok(Some(BlockRef::from_hash(hash)))
@@ -131,7 +144,8 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
// More than one; return None. // More than one; return None.
Ok(None) Ok(None)
} }
}).await })
.await
} }
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> { async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
@@ -147,9 +161,7 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
let Some(exts) = self.methods.archive_v1_body(at).await? else { let Some(exts) = self.methods.archive_v1_body(at).await? else {
return Ok(None); return Ok(None);
}; };
Ok(Some( Ok(Some(exts.into_iter().map(|ext| ext.0).collect()))
exts.into_iter().map(|ext| ext.0).collect()
))
}) })
.await .await
} }
@@ -159,7 +171,9 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
let height = self.methods.archive_v1_finalized_height().await?; let height = self.methods.archive_v1_finalized_height().await?;
let mut hashes = self.methods.archive_v1_hash_by_height(height).await?; let mut hashes = self.methods.archive_v1_hash_by_height(height).await?;
let Some(hash) = hashes.pop() else { let Some(hash) = hashes.pop() else {
return Err(BackendError::Other("Multiple hashes not expected at a finalized height".into())) return Err(BackendError::Other(
"Multiple hashes not expected at a finalized height".into(),
));
}; };
Ok(BlockRef::from_hash(hash)) Ok(BlockRef::from_hash(hash))
}) })
@@ -170,21 +184,27 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
&self, &self,
_hasher: T::Hasher, _hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> { ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
Err(BackendError::Other("The archive backend cannot stream block headers".into())) Err(BackendError::Other(
"The archive backend cannot stream block headers".into(),
))
} }
async fn stream_best_block_headers( async fn stream_best_block_headers(
&self, &self,
_hasher: T::Hasher, _hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> { ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
Err(BackendError::Other("The archive backend cannot stream block headers".into())) Err(BackendError::Other(
"The archive backend cannot stream block headers".into(),
))
} }
async fn stream_finalized_block_headers( async fn stream_finalized_block_headers(
&self, &self,
_hasher: T::Hasher, _hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> { ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
Err(BackendError::Other("The archive backend cannot stream block headers".into())) Err(BackendError::Other(
"The archive backend cannot stream block headers".into(),
))
} }
async fn submit_transaction( async fn submit_transaction(
@@ -201,7 +221,10 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
call_parameters: Option<&[u8]>, call_parameters: Option<&[u8]>,
at: HashFor<T>, at: HashFor<T>,
) -> Result<Vec<u8>, BackendError> { ) -> Result<Vec<u8>, BackendError> {
let res = self.methods.archive_v1_call(at, method, call_parameters.unwrap_or(&[])).await?; let res = self
.methods
.archive_v1_call(at, method, call_parameters.unwrap_or(&[]))
.await?;
match res { match res {
ArchiveCallResult::Success(bytes) => Ok(bytes.0), ArchiveCallResult::Success(bytes) => Ok(bytes.0),
ArchiveCallResult::Error(e) => Err(BackendError::other(e)), ArchiveCallResult::Error(e) => Err(BackendError::other(e)),
@@ -209,5 +232,4 @@ impl<T: Config> Backend<T> for ArchiveBackend<T> {
} }
} }
impl<T: Config> crate::backend::sealed::Sealed for ArchiveBackend<T> {} impl<T: Config> crate::backend::sealed::Sealed for ArchiveBackend<T> {}
+46 -41
View File
@@ -1,13 +1,15 @@
use std::collections::VecDeque;
use subxt_rpcs::Error as RpcError;
use subxt_rpcs::methods::chain_head::{ArchiveStorageQuery, ArchiveStorageSubscription, ArchiveStorageEvent, ArchiveStorageEventItem};
use std::pin::Pin;
use std::future::Future;
use futures::{FutureExt, Stream, StreamExt};
use std::task::{Context, Poll};
use crate::error::BackendError;
use crate::config::{Config, HashFor, RpcConfigFor}; use crate::config::{Config, HashFor, RpcConfigFor};
use crate::error::BackendError;
use futures::{FutureExt, Stream, StreamExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use subxt_rpcs::Error as RpcError;
use subxt_rpcs::methods::ChainHeadRpcMethods; use subxt_rpcs::methods::ChainHeadRpcMethods;
use subxt_rpcs::methods::chain_head::{
ArchiveStorageEvent, ArchiveStorageEventItem, ArchiveStorageQuery, ArchiveStorageSubscription,
};
pub struct ArchiveStorageStream<T: Config> { pub struct ArchiveStorageStream<T: Config> {
at: HashFor<T>, at: HashFor<T>,
@@ -19,15 +21,21 @@ pub struct ArchiveStorageStream<T: Config> {
enum StreamState<T: Config> { enum StreamState<T: Config> {
GetSubscription { GetSubscription {
current_query: ArchiveStorageQuery<Vec<u8>>, current_query: ArchiveStorageQuery<Vec<u8>>,
sub_fut: Pin<Box<dyn Future<Output = Result<ArchiveStorageSubscription<HashFor<T>>, RpcError>> + Send + 'static>> sub_fut: Pin<
Box<
dyn Future<Output = Result<ArchiveStorageSubscription<HashFor<T>>, RpcError>>
+ Send
+ 'static,
>,
>,
}, },
RunSubscription { RunSubscription {
current_query: ArchiveStorageQuery<Vec<u8>>, current_query: ArchiveStorageQuery<Vec<u8>>,
sub: ArchiveStorageSubscription<HashFor<T>> sub: ArchiveStorageSubscription<HashFor<T>>,
}, },
} }
impl <T: Config> ArchiveStorageStream<T> { impl<T: Config> ArchiveStorageStream<T> {
/// Fetch descendant keys. /// Fetch descendant keys.
pub fn new( pub fn new(
at: HashFor<T>, at: HashFor<T>,
@@ -69,27 +77,23 @@ impl<T: Config> Stream for ArchiveStorageStream<T> {
pagination_start_key: query.pagination_start_key.as_deref(), pagination_start_key: query.pagination_start_key.as_deref(),
}); });
methods.archive_v1_storage( methods.archive_v1_storage(at, query, None).await
at,
query,
None
).await
}; };
this.state = Some(StreamState::GetSubscription { this.state = Some(StreamState::GetSubscription {
current_query, current_query,
sub_fut: Box::pin(sub_fut) sub_fut: Box::pin(sub_fut),
}); });
}, }
// We're getting our subscription stream for the current query. // We're getting our subscription stream for the current query.
Some(StreamState::GetSubscription { current_query, mut sub_fut }) => { Some(StreamState::GetSubscription {
current_query,
mut sub_fut,
}) => {
match sub_fut.poll_unpin(cx) { match sub_fut.poll_unpin(cx) {
Poll::Ready(Ok(sub)) => { Poll::Ready(Ok(sub)) => {
this.state = Some(StreamState::RunSubscription { this.state = Some(StreamState::RunSubscription { current_query, sub });
current_query, }
sub
});
},
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
if e.is_disconnected_will_reconnect() { if e.is_disconnected_will_reconnect() {
// Push the query back onto the queue to try again // Push the query back onto the queue to try again
@@ -98,19 +102,22 @@ impl<T: Config> Stream for ArchiveStorageStream<T> {
} }
this.state = None; this.state = None;
return Poll::Ready(Some(Err(e.into()))) return Poll::Ready(Some(Err(e.into())));
} }
Poll::Pending => { Poll::Pending => {
this.state = Some(StreamState::GetSubscription { this.state = Some(StreamState::GetSubscription {
current_query, current_query,
sub_fut sub_fut,
}); });
return Poll::Pending return Poll::Pending;
}, }
} }
}, }
// Running the subscription and returning results. // Running the subscription and returning results.
Some(StreamState::RunSubscription { current_query, mut sub }) => { Some(StreamState::RunSubscription {
current_query,
mut sub,
}) => {
match sub.poll_next_unpin(cx) { match sub.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(val))) => { Poll::Ready(Some(Ok(val))) => {
match val { match val {
@@ -123,9 +130,9 @@ impl<T: Config> Stream for ArchiveStorageStream<T> {
// At the time of writing, it's not clear if paginationStartKey // At the time of writing, it's not clear if paginationStartKey
// starts from the key itself or the first key after it: // starts from the key itself or the first key after it:
// https://github.com/paritytech/json-rpc-interface-spec/issues/176 // https://github.com/paritytech/json-rpc-interface-spec/issues/176
pagination_start_key: Some(item.key.0.clone()) pagination_start_key: Some(item.key.0.clone()),
}, },
sub sub,
}); });
// We treat `paginationStartKey` as being the key we want results to begin _after_. // We treat `paginationStartKey` as being the key we want results to begin _after_.
@@ -140,17 +147,17 @@ impl<T: Config> Stream for ArchiveStorageStream<T> {
} }
return Poll::Ready(Some(Ok(item))); return Poll::Ready(Some(Ok(item)));
}, }
ArchiveStorageEvent::Error(e) => { ArchiveStorageEvent::Error(e) => {
this.state = None; this.state = None;
return Poll::Ready(Some(Err(BackendError::other(e.error)))) return Poll::Ready(Some(Err(BackendError::other(e.error))));
}, }
ArchiveStorageEvent::Done => { ArchiveStorageEvent::Done => {
this.state = None; this.state = None;
continue; continue;
}, }
} }
}, }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(e))) => {
if e.is_disconnected_will_reconnect() { if e.is_disconnected_will_reconnect() {
// Put the current query back into the queue and retry. // Put the current query back into the queue and retry.
@@ -167,11 +174,9 @@ impl<T: Config> Stream for ArchiveStorageStream<T> {
this.state = None; this.state = None;
continue; continue;
} }
Poll::Pending => { Poll::Pending => return Poll::Pending,
return Poll::Pending
},
} }
}, }
} }
} }
} }
+28 -24
View File
@@ -13,11 +13,10 @@ mod follow_stream_driver;
mod follow_stream_unpin; mod follow_stream_unpin;
mod storage_items; mod storage_items;
use subxt_rpcs::methods::ChainHeadRpcMethods;
use self::follow_stream_driver::FollowStreamFinalizedHeads; use self::follow_stream_driver::FollowStreamFinalizedHeads;
use crate::backend::{ use crate::backend::{
Backend, BlockRef, BlockRefT, StorageResponse, StreamOf, StreamOfResults, Backend, BlockRef, BlockRefT, StorageResponse, StreamOf, StreamOfResults, TransactionStatus,
TransactionStatus, utils::retry, utils::retry,
}; };
use crate::config::{Config, Hash, HashFor, RpcConfigFor}; use crate::config::{Config, Hash, HashFor, RpcConfigFor};
use crate::error::{BackendError, RpcError}; use crate::error::{BackendError, RpcError};
@@ -29,6 +28,7 @@ use std::collections::HashMap;
use std::task::Poll; use std::task::Poll;
use storage_items::StorageItems; use storage_items::StorageItems;
use subxt_rpcs::RpcClient; use subxt_rpcs::RpcClient;
use subxt_rpcs::methods::ChainHeadRpcMethods;
use subxt_rpcs::methods::chain_head::{ use subxt_rpcs::methods::chain_head::{
FollowEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType, FollowEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType,
}; };
@@ -373,8 +373,13 @@ impl<T: Config> Backend<T> for ChainHeadBackend<T> {
.await .await
} }
async fn block_number_to_hash(&self, _number: u64) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> { async fn block_number_to_hash(
Err(BackendError::other("The ChainHead V1 RPCs do not support obtaining a block hash from a number.")) &self,
_number: u64,
) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> {
Err(BackendError::other(
"The ChainHead V1 RPCs do not support obtaining a block hash from a number.",
))
} }
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> { async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
@@ -567,24 +572,29 @@ pub(crate) async fn submit_transaction_ignoring_follow_events<T: Config>(
RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted, RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
RpcTransactionStatus::BestChainBlockIncluded { block: None } => { RpcTransactionStatus::BestChainBlockIncluded { block: None } => {
TransactionStatus::NoLongerInBestBlock TransactionStatus::NoLongerInBestBlock
}, }
RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => { RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => {
TransactionStatus::InBestBlock { hash: BlockRef::from_hash(block.hash) } TransactionStatus::InBestBlock {
}, hash: BlockRef::from_hash(block.hash),
}
}
RpcTransactionStatus::Finalized { block } => { RpcTransactionStatus::Finalized { block } => {
TransactionStatus::InFinalizedBlock { hash: BlockRef::from_hash(block.hash) } TransactionStatus::InFinalizedBlock {
}, hash: BlockRef::from_hash(block.hash),
}
}
RpcTransactionStatus::Error { error } => { RpcTransactionStatus::Error { error } => {
TransactionStatus::Error { message: error } TransactionStatus::Error { message: error }
}, }
RpcTransactionStatus::Invalid { error } => { RpcTransactionStatus::Invalid { error } => {
TransactionStatus::Invalid { message: error } TransactionStatus::Invalid { message: error }
}, }
RpcTransactionStatus::Dropped { error } => { RpcTransactionStatus::Dropped { error } => {
TransactionStatus::Dropped { message: error } TransactionStatus::Dropped { message: error }
}, }
} }
}).map_err(Into::into) })
.map_err(Into::into)
}); });
Ok(StreamOf(Box::pin(tx_progress))) Ok(StreamOf(Box::pin(tx_progress)))
@@ -644,9 +654,7 @@ async fn submit_transaction_tracking_follow_events<T: Config>(
// Poll for a follow event, and error if the stream has unexpectedly ended. // Poll for a follow event, and error if the stream has unexpectedly ended.
let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) { let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(None) => { Poll::Ready(None) => {
return Poll::Ready(err_other( return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly"));
"chainHead_follow stream ended unexpectedly",
));
} }
Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev), Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
@@ -671,10 +679,8 @@ async fn submit_transaction_tracking_follow_events<T: Config>(
} }
FollowEvent::Finalized(ev) => { FollowEvent::Finalized(ev) => {
for block_ref in ev.finalized_block_hashes { for block_ref in ev.finalized_block_hashes {
seen_blocks.insert( seen_blocks
block_ref.hash(), .insert(block_ref.hash(), (SeenBlockMarker::Finalized, block_ref));
(SeenBlockMarker::Finalized, block_ref),
);
} }
} }
FollowEvent::Stop => { FollowEvent::Stop => {
@@ -694,9 +700,7 @@ async fn submit_transaction_tracking_follow_events<T: Config>(
// If we have a finalized hash, we are done looking for tx events and we are just waiting // If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized). // for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash { if let Some(hash) = &finalized_hash {
if let Some((SeenBlockMarker::Finalized, block_ref)) = if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash) {
seen_blocks.remove(hash)
{
// Found it! Hand back the event with a pinned block. We're done. // Found it! Hand back the event with a pinned block. We're done.
done = true; done = true;
let ev = TransactionStatus::InFinalizedBlock { let ev = TransactionStatus::InFinalizedBlock {
+4 -2
View File
@@ -103,7 +103,9 @@ impl<Hash> FollowStream<Hash> {
} }
/// Create a new [`FollowStream`] given the RPC methods. /// Create a new [`FollowStream`] given the RPC methods.
pub fn from_methods<T: Config>(methods: ChainHeadRpcMethods<RpcConfigFor<T>>) -> FollowStream<HashFor<T>> { pub fn from_methods<T: Config>(
methods: ChainHeadRpcMethods<RpcConfigFor<T>>,
) -> FollowStream<HashFor<T>> {
FollowStream { FollowStream {
stream_getter: Box::new(move || { stream_getter: Box::new(move || {
let methods = methods.clone(); let methods = methods.clone();
@@ -113,7 +115,7 @@ impl<Hash> FollowStream<Hash> {
// Extract the subscription ID: // Extract the subscription ID:
let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else { let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else {
return Err(BackendError::other( return Err(BackendError::other(
"Subscription ID expected for chainHead_follow response, but not given" "Subscription ID expected for chainHead_follow response, but not given",
)); ));
}; };
// Map stream errors into the higher level subxt one: // Map stream errors into the higher level subxt one:
+136 -126
View File
@@ -7,19 +7,16 @@
use crate::backend::chain_head::ChainHeadBackendDriver; use crate::backend::chain_head::ChainHeadBackendDriver;
use crate::backend::{ use crate::backend::{
legacy::LegacyBackend, Backend, BlockRef, StorageResponse, StreamOfResults, TransactionStatus,
chain_head::ChainHeadBackend, archive::ArchiveBackend, chain_head::ChainHeadBackend, legacy::LegacyBackend,
archive::ArchiveBackend,
Backend, BlockRef, StorageResponse, StreamOfResults,
TransactionStatus,
}; };
use crate::config::{Config, HashFor}; use crate::config::{Config, HashFor};
use crate::error::{BackendError, CombinedBackendError}; use crate::error::{BackendError, CombinedBackendError};
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt;
use subxt_rpcs::RpcClient;
use futures::Stream; use futures::Stream;
use futures::StreamExt;
use std::task::Poll; use std::task::Poll;
use subxt_rpcs::RpcClient;
pub struct CombinedBackendBuilder<T: Config> { pub struct CombinedBackendBuilder<T: Config> {
archive: BackendChoice<ArchiveBackend<T>>, archive: BackendChoice<ArchiveBackend<T>>,
@@ -33,7 +30,7 @@ enum BackendChoice<V> {
UseDefault, UseDefault,
} }
impl <T: Config> CombinedBackendBuilder<T> { impl<T: Config> CombinedBackendBuilder<T> {
/// Create a new [`CombinedBackendBuilder`]. /// Create a new [`CombinedBackendBuilder`].
pub fn new() -> Self { pub fn new() -> Self {
CombinedBackendBuilder { CombinedBackendBuilder {
@@ -86,7 +83,10 @@ impl <T: Config> CombinedBackendBuilder<T> {
/// ///
/// If you just want to run the driver in the background until completion in on the default runtime, /// If you just want to run the driver in the background until completion in on the default runtime,
/// use [`CombinedBackendBuilder::build_with_background_driver`] instead. /// use [`CombinedBackendBuilder::build_with_background_driver`] instead.
pub async fn build(self, rpc_client: impl Into<RpcClient>) -> Result<(CombinedBackend<T>, CombinedBackendDriver<T>), CombinedBackendError> { pub async fn build(
self,
rpc_client: impl Into<RpcClient>,
) -> Result<(CombinedBackend<T>, CombinedBackendDriver<T>), CombinedBackendError> {
let rpc_client = rpc_client.into(); let rpc_client = rpc_client.into();
// What does the thing wer're talking to actually know about? // What does the thing wer're talking to actually know about?
@@ -98,7 +98,9 @@ impl <T: Config> CombinedBackendBuilder<T> {
let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_")); let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_"));
let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1")); let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1"));
let mut combined_driver = CombinedBackendDriver { chainhead_driver: None }; let mut combined_driver = CombinedBackendDriver {
chainhead_driver: None,
};
let archive = if has_archive_methods { let archive = if has_archive_methods {
match self.archive { match self.archive {
@@ -106,19 +108,24 @@ impl <T: Config> CombinedBackendBuilder<T> {
BackendChoice::UseDefault => Some(ArchiveBackend::new(rpc_client.clone())), BackendChoice::UseDefault => Some(ArchiveBackend::new(rpc_client.clone())),
BackendChoice::DontUse => None, BackendChoice::DontUse => None,
} }
} else { None }; } else {
None
};
let chainhead = if has_chainhead_methods { let chainhead = if has_chainhead_methods {
match self.chainhead { match self.chainhead {
BackendChoice::Use(b) => Some(b), BackendChoice::Use(b) => Some(b),
BackendChoice::UseDefault => { BackendChoice::UseDefault => {
let (chainhead, chainhead_driver) = ChainHeadBackend::builder().build(rpc_client.clone()); let (chainhead, chainhead_driver) =
ChainHeadBackend::builder().build(rpc_client.clone());
combined_driver.chainhead_driver = Some(chainhead_driver); combined_driver.chainhead_driver = Some(chainhead_driver);
Some(chainhead) Some(chainhead)
}, }
BackendChoice::DontUse => None, BackendChoice::DontUse => None,
} }
} else { None }; } else {
None
};
let legacy = match self.legacy { let legacy = match self.legacy {
BackendChoice::Use(b) => Some(b), BackendChoice::Use(b) => Some(b),
@@ -129,7 +136,7 @@ impl <T: Config> CombinedBackendBuilder<T> {
let combined = CombinedBackend { let combined = CombinedBackend {
archive, archive,
chainhead, chainhead,
legacy legacy,
}; };
Ok((combined, combined_driver)) Ok((combined, combined_driver))
@@ -141,10 +148,11 @@ impl <T: Config> CombinedBackendBuilder<T> {
/// - On non-wasm targets, this will spawn the driver on `tokio`. /// - On non-wasm targets, this will spawn the driver on `tokio`.
/// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`. /// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`.
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
pub async fn build_with_background_driver(self, client: impl Into<RpcClient>) -> Result<CombinedBackend<T>, CombinedBackendError> { pub async fn build_with_background_driver(
let (backend, mut driver) = self self,
.build(client) client: impl Into<RpcClient>,
.await?; ) -> Result<CombinedBackend<T>, CombinedBackendError> {
let (backend, mut driver) = self.build(client).await?;
super::utils::spawn(async move { super::utils::spawn(async move {
// NOTE: we need to poll the driver until it's done i.e returns None // NOTE: we need to poll the driver until it's done i.e returns None
@@ -166,10 +174,10 @@ impl <T: Config> CombinedBackendBuilder<T> {
/// that the [`CombinedBackend`] can make progress. It does not need polling /// that the [`CombinedBackend`] can make progress. It does not need polling
/// if [`CombinedBackendDriver::needs_polling`] returns `false`. /// if [`CombinedBackendDriver::needs_polling`] returns `false`.
pub struct CombinedBackendDriver<T: Config> { pub struct CombinedBackendDriver<T: Config> {
chainhead_driver: Option<ChainHeadBackendDriver<T>> chainhead_driver: Option<ChainHeadBackendDriver<T>>,
} }
impl <T: Config> CombinedBackendDriver<T> { impl<T: Config> CombinedBackendDriver<T> {
pub fn needs_polling(&self) -> bool { pub fn needs_polling(&self) -> bool {
self.chainhead_driver.is_some() self.chainhead_driver.is_some()
} }
@@ -183,7 +191,7 @@ impl<T: Config> Stream for CombinedBackendDriver<T> {
) -> std::task::Poll<Option<Self::Item>> { ) -> std::task::Poll<Option<Self::Item>> {
match &mut self.chainhead_driver { match &mut self.chainhead_driver {
Some(driver) => driver.poll_next_unpin(cx), Some(driver) => driver.poll_next_unpin(cx),
None => Poll::Ready(None) None => Poll::Ready(None),
} }
} }
} }
@@ -196,7 +204,7 @@ pub struct CombinedBackend<T: Config> {
legacy: Option<LegacyBackend<T>>, legacy: Option<LegacyBackend<T>>,
} }
impl <T: Config> CombinedBackend<T> { impl<T: Config> CombinedBackend<T> {
/// Configure and construct a [`CombinedBackend`]. /// Configure and construct a [`CombinedBackend`].
pub fn builder() -> CombinedBackendBuilder<T> { pub fn builder() -> CombinedBackendBuilder<T> {
CombinedBackendBuilder::new() CombinedBackendBuilder::new()
@@ -243,13 +251,11 @@ impl<T: Config> Backend<T> for CombinedBackend<T> {
keys: Vec<Vec<u8>>, keys: Vec<Vec<u8>>,
at: HashFor<T>, at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> { ) -> Result<StreamOfResults<StorageResponse>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.storage_fetch_values(keys.clone(), at).await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.storage_fetch_values(keys.clone(), at).await
}).await
} }
async fn storage_fetch_descendant_keys( async fn storage_fetch_descendant_keys(
@@ -257,13 +263,11 @@ impl<T: Config> Backend<T> for CombinedBackend<T> {
key: Vec<u8>, key: Vec<u8>,
at: HashFor<T>, at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError> { ) -> Result<StreamOfResults<Vec<u8>>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.storage_fetch_descendant_keys(key.clone(), at).await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.storage_fetch_descendant_keys(key.clone(), at).await
}).await
} }
async fn storage_fetch_descendant_values( async fn storage_fetch_descendant_values(
@@ -271,123 +275,130 @@ impl<T: Config> Backend<T> for CombinedBackend<T> {
key: Vec<u8>, key: Vec<u8>,
at: HashFor<T>, at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> { ) -> Result<StreamOfResults<StorageResponse>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.storage_fetch_descendant_values(key.clone(), at).await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.storage_fetch_descendant_values(key.clone(), at).await
}).await
} }
async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError> { async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.genesis_hash().await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.genesis_hash().await
}).await
} }
async fn block_number_to_hash(&self, number: u64) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> { async fn block_number_to_hash(
try_backends(&[ &self,
self.archive(), number: u64,
self.legacy(), ) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> {
// chainHead last as it cannot handle this request and will fail, so it's here try_backends(
// just to hand back a more relevant error in case the above two backends aren't &[
// enabled or have some issue. self.archive(),
self.chainhead() self.legacy(),
], async |b: &dyn Backend<T>| { // chainHead last as it cannot handle this request and will fail, so it's here
b.block_number_to_hash(number).await // just to hand back a more relevant error in case the above two backends aren't
}).await // enabled or have some issue.
self.chainhead(),
],
async |b: &dyn Backend<T>| b.block_number_to_hash(number).await,
)
.await
} }
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> { async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.block_header(at).await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.block_header(at).await
}).await
} }
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError> { async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.block_body(at).await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.block_body(at).await
}).await
} }
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError> { async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError> {
try_backends(&[ try_backends(
// Prioritize chainHead backend since it's streaming these things; save another call. &[
self.chainhead(), // Prioritize chainHead backend since it's streaming these things; save another call.
self.archive(), self.chainhead(),
self.legacy() self.archive(),
], async |b: &dyn Backend<T>| { self.legacy(),
b.latest_finalized_block_ref().await ],
}).await async |b: &dyn Backend<T>| b.latest_finalized_block_ref().await,
)
.await
} }
async fn stream_all_block_headers( async fn stream_all_block_headers(
&self, &self,
hasher: T::Hasher, hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> { ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
try_backends(&[ try_backends(
// Ignore archive backend; it doesn't support this. &[
self.chainhead(), // Ignore archive backend; it doesn't support this.
self.legacy() self.chainhead(),
], async |b: &dyn Backend<T>| { self.legacy(),
b.stream_all_block_headers(hasher.clone()).await ],
}).await async |b: &dyn Backend<T>| b.stream_all_block_headers(hasher.clone()).await,
)
.await
} }
async fn stream_best_block_headers( async fn stream_best_block_headers(
&self, &self,
hasher: T::Hasher, hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> { ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
try_backends(&[ try_backends(
// Ignore archive backend; it doesn't support this. &[
self.chainhead(), // Ignore archive backend; it doesn't support this.
self.legacy() self.chainhead(),
], async |b: &dyn Backend<T>| { self.legacy(),
b.stream_best_block_headers(hasher.clone()).await ],
}).await async |b: &dyn Backend<T>| b.stream_best_block_headers(hasher.clone()).await,
)
.await
} }
async fn stream_finalized_block_headers( async fn stream_finalized_block_headers(
&self, &self,
hasher: T::Hasher, hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> { ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
try_backends(&[ try_backends(
// Ignore archive backend; it doesn't support this. &[
self.chainhead(), // Ignore archive backend; it doesn't support this.
self.legacy() self.chainhead(),
], async |b: &dyn Backend<T>| { self.legacy(),
b.stream_finalized_block_headers(hasher.clone()).await ],
}).await async |b: &dyn Backend<T>| b.stream_finalized_block_headers(hasher.clone()).await,
)
.await
} }
async fn submit_transaction( async fn submit_transaction(
&self, &self,
extrinsic: &[u8], extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError> { ) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError> {
try_backends(&[ try_backends(
// chainHead first as it does the same as the archive backend, but with better &[
// guarantees around the block handed back being pinned & ready to access. // chainHead first as it does the same as the archive backend, but with better
self.chainhead(), // guarantees around the block handed back being pinned & ready to access.
self.legacy(), self.chainhead(),
// archive last just incase chainHead & legacy fail or aren't provided for some self.legacy(),
// reason. // archive last just incase chainHead & legacy fail or aren't provided for some
self.archive(), // reason.
], async |b: &dyn Backend<T>| { self.archive(),
b.submit_transaction(extrinsic).await ],
}).await async |b: &dyn Backend<T>| b.submit_transaction(extrinsic).await,
)
.await
} }
async fn call( async fn call(
@@ -396,20 +407,18 @@ impl<T: Config> Backend<T> for CombinedBackend<T> {
call_parameters: Option<&[u8]>, call_parameters: Option<&[u8]>,
at: HashFor<T>, at: HashFor<T>,
) -> Result<Vec<u8>, BackendError> { ) -> Result<Vec<u8>, BackendError> {
try_backends(&[ try_backends(
self.archive(), &[self.archive(), self.chainhead(), self.legacy()],
self.chainhead(), async |b: &dyn Backend<T>| b.call(method, call_parameters, at).await,
self.legacy() )
], async |b: &dyn Backend<T>| { .await
b.call(method, call_parameters, at).await
}).await
} }
} }
/// Call one backend after the other in the list until we get a successful result back. /// Call one backend after the other in the list until we get a successful result back.
async fn try_backends<'s, 'b, T, Func, Fut, O>( async fn try_backends<'s, 'b, T, Func, Fut, O>(
backends: &'s [Option<&'b dyn Backend<T>>], backends: &'s [Option<&'b dyn Backend<T>>],
mut f: Func mut f: Func,
) -> Result<O, BackendError> ) -> Result<O, BackendError>
where where
'b: 's, 'b: 's,
@@ -417,13 +426,14 @@ where
Func: FnMut(&'b dyn Backend<T>) -> Fut, Func: FnMut(&'b dyn Backend<T>) -> Fut,
Fut: Future<Output = Result<O, BackendError>> + 'b, Fut: Future<Output = Result<O, BackendError>> + 'b,
{ {
static NO_AVAILABLE_BACKEND: &str = "None of the configured backends are capable of handling this request"; static NO_AVAILABLE_BACKEND: &str =
"None of the configured backends are capable of handling this request";
let mut err = BackendError::other(NO_AVAILABLE_BACKEND); let mut err = BackendError::other(NO_AVAILABLE_BACKEND);
for backend in backends.into_iter().filter_map(|b| *b) { for backend in backends.into_iter().filter_map(|b| *b) {
match f(backend).await { match f(backend).await {
Ok(res) => return Ok(res), Ok(res) => return Ok(res),
Err(e) => { err = e } Err(e) => err = e,
} }
} }
+12 -9
View File
@@ -7,21 +7,20 @@
mod descendant_streams; mod descendant_streams;
use subxt_rpcs::methods::legacy::{ TransactionStatus as RpcTransactionStatus, LegacyRpcMethods };
use crate::backend::utils::{retry, retry_stream}; use crate::backend::utils::{retry, retry_stream};
use crate::backend::{ use crate::backend::{
Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, Backend, BlockRef, StorageResponse, StreamOf, StreamOfResults, TransactionStatus,
TransactionStatus,
}; };
use crate::config::{Config, HashFor, Hasher, Header, RpcConfigFor}; use crate::config::{Config, HashFor, Hasher, Header, RpcConfigFor};
use crate::error::BackendError; use crate::error::BackendError;
use async_trait::async_trait; use async_trait::async_trait;
use codec::Encode;
use descendant_streams::{StorageFetchDescendantKeysStream, StorageFetchDescendantValuesStream};
use futures::TryStreamExt; use futures::TryStreamExt;
use futures::{Future, Stream, StreamExt, future, future::Either, stream}; use futures::{Future, Stream, StreamExt, future, future::Either, stream};
use subxt_rpcs::RpcClient; use subxt_rpcs::RpcClient;
use subxt_rpcs::methods::legacy::NumberOrHex; use subxt_rpcs::methods::legacy::NumberOrHex;
use codec::Encode; use subxt_rpcs::methods::legacy::{LegacyRpcMethods, TransactionStatus as RpcTransactionStatus};
use descendant_streams::{StorageFetchDescendantKeysStream, StorageFetchDescendantValuesStream};
/// Configure and build an [`LegacyBackend`]. /// Configure and build an [`LegacyBackend`].
pub struct LegacyBackendBuilder<T> { pub struct LegacyBackendBuilder<T> {
@@ -135,7 +134,7 @@ impl<T: Config> Backend<T> for LegacyBackend<T> {
self.methods.clone(), self.methods.clone(),
key, key,
at, at,
self.storage_page_size self.storage_page_size,
); );
let keys = keys.flat_map(|keys| { let keys = keys.flat_map(|keys| {
@@ -163,7 +162,7 @@ impl<T: Config> Backend<T> for LegacyBackend<T> {
self.methods.clone(), self.methods.clone(),
key, key,
at, at,
self.storage_page_size self.storage_page_size,
); );
Ok(StreamOf(Box::pin(values_stream))) Ok(StreamOf(Box::pin(values_stream)))
@@ -177,7 +176,10 @@ impl<T: Config> Backend<T> for LegacyBackend<T> {
.await .await
} }
async fn block_number_to_hash(&self, number: u64) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> { async fn block_number_to_hash(
&self,
number: u64,
) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> {
retry(|| async { retry(|| async {
let number_or_hash = NumberOrHex::Number(number); let number_or_hash = NumberOrHex::Number(number);
let hash = self let hash = self
@@ -186,7 +188,8 @@ impl<T: Config> Backend<T> for LegacyBackend<T> {
.await? .await?
.map(BlockRef::from_hash); .map(BlockRef::from_hash);
Ok(hash) Ok(hash)
}).await })
.await
} }
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> { async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
+6 -6
View File
@@ -1,12 +1,12 @@
use crate::backend::utils::retry; use super::LegacyRpcMethods;
use crate::backend::StorageResponse; use crate::backend::StorageResponse;
use crate::backend::utils::retry;
use crate::config::{Config, HashFor, RpcConfigFor}; use crate::config::{Config, HashFor, RpcConfigFor};
use crate::error::BackendError; use crate::error::BackendError;
use futures::{Future, FutureExt, Stream, StreamExt}; use futures::{Future, FutureExt, Stream, StreamExt};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use super::LegacyRpcMethods;
/// This provides a stream of values given some prefix `key`. It /// This provides a stream of values given some prefix `key`. It
/// internally manages pagination and such. /// internally manages pagination and such.
@@ -26,7 +26,7 @@ pub struct StorageFetchDescendantKeysStream<T: Config> {
done: bool, done: bool,
} }
impl <T: Config> StorageFetchDescendantKeysStream<T> { impl<T: Config> StorageFetchDescendantKeysStream<T> {
/// Fetch descendant keys. /// Fetch descendant keys.
pub fn new( pub fn new(
methods: LegacyRpcMethods<RpcConfigFor<T>>, methods: LegacyRpcMethods<RpcConfigFor<T>>,
@@ -91,7 +91,7 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
// Error getting keys? Return it. // Error getting keys? Return it.
return Poll::Ready(Some(Err(e))); return Poll::Ready(Some(Err(e)));
}, }
Poll::Pending => { Poll::Pending => {
this.keys_fut = Some(keys_fut); this.keys_fut = Some(keys_fut);
return Poll::Pending; return Poll::Pending;
@@ -142,7 +142,7 @@ pub struct StorageFetchDescendantValuesStream<T: Config> {
results: VecDeque<(Vec<u8>, Vec<u8>)>, results: VecDeque<(Vec<u8>, Vec<u8>)>,
} }
impl <T: Config> StorageFetchDescendantValuesStream<T> { impl<T: Config> StorageFetchDescendantValuesStream<T> {
/// Fetch descendant values. /// Fetch descendant values.
pub fn new( pub fn new(
methods: LegacyRpcMethods<RpcConfigFor<T>>, methods: LegacyRpcMethods<RpcConfigFor<T>>,
@@ -201,7 +201,7 @@ impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
continue; continue;
} }
return Poll::Ready(Some(Err(e))) return Poll::Ready(Some(Err(e)));
} }
Poll::Pending => { Poll::Pending => {
this.results_fut = Some(results_fut); this.results_fut = Some(results_fut);
+3 -5
View File
@@ -148,7 +148,7 @@ where
match &mut self.state { match &mut self.state {
RetrySubscriptionState::Init => { RetrySubscriptionState::Init => {
self.state = RetrySubscriptionState::Pending((self.resubscribe)()); self.state = RetrySubscriptionState::Pending((self.resubscribe)());
}, }
RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) { RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) {
Poll::Ready(Some(Err(err))) => { Poll::Ready(Some(Err(err))) => {
if err.is_disconnected_will_reconnect() { if err.is_disconnected_will_reconnect() {
@@ -158,7 +158,7 @@ where
} }
Poll::Ready(None) => { Poll::Ready(None) => {
self.state = RetrySubscriptionState::Done; self.state = RetrySubscriptionState::Done;
return Poll::Ready(None) return Poll::Ready(None);
} }
Poll::Ready(Some(Ok(val))) => { Poll::Ready(Some(Ok(val))) => {
return Poll::Ready(Some(Ok(val))); return Poll::Ready(Some(Ok(val)));
@@ -182,9 +182,7 @@ where
return Poll::Pending; return Poll::Pending;
} }
}, },
RetrySubscriptionState::Done => { RetrySubscriptionState::Done => return Poll::Ready(None),
return Poll::Ready(None)
}
}; };
} }
} }
+24 -2
View File
@@ -1,15 +1,17 @@
mod offline_client; mod offline_client;
mod online_client; mod online_client;
use crate::config::{Config, HashFor};
use core::marker::PhantomData; use core::marker::PhantomData;
use subxt_metadata::Metadata;
// We keep these traits internal, so that we can mess with them later if needed, // We keep these traits internal, so that we can mess with them later if needed,
// and instead only the concrete types are public which wrap these trait impls. // and instead only the concrete types are public which wrap these trait impls.
pub(crate) use offline_client::OfflineClientAtBlockT; pub(crate) use offline_client::OfflineClientAtBlockT;
pub(crate) use online_client::OnlineClientAtBlockT; pub(crate) use online_client::OnlineClientAtBlockT;
pub use offline_client::OfflineClient; pub use offline_client::{OfflineClient, OfflineClientAtBlock};
pub use online_client::OnlineClient; pub use online_client::{OnlineClient, OnlineClientAtBlock};
/// This represents a client at a specific block number. /// This represents a client at a specific block number.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -27,3 +29,23 @@ impl<Client, T> ClientAtBlock<Client, T> {
} }
} }
} }
impl<Client, T> ClientAtBlock<Client, T>
where
T: Config,
Client: OfflineClientAtBlockT,
{
pub fn metadata_ref(&self) -> &Metadata {
self.client.metadata_ref()
}
}
impl<Client, T> ClientAtBlock<Client, T>
where
T: Config,
Client: OnlineClientAtBlockT<T>,
{
pub fn block_hash(&self) -> HashFor<T> {
self.client.block_hash()
}
}
+14 -12
View File
@@ -1,8 +1,8 @@
use crate::config::Config;
use crate::client::ClientAtBlock; use crate::client::ClientAtBlock;
use crate::config::Config;
use crate::error::OfflineClientAtBlockError; use crate::error::OfflineClientAtBlockError;
use subxt_metadata::Metadata;
use std::sync::Arc; use std::sync::Arc;
use subxt_metadata::Metadata;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OfflineClient<T: Config> { pub struct OfflineClient<T: Config> {
@@ -13,9 +13,7 @@ pub struct OfflineClient<T: Config> {
impl<T: Config> OfflineClient<T> { impl<T: Config> OfflineClient<T> {
/// Create a new [`OfflineClient`] with the given configuration. /// Create a new [`OfflineClient`] with the given configuration.
pub fn new(config: T) -> Self { pub fn new(config: T) -> Self {
OfflineClient { OfflineClient { config }
config,
}
} }
/// Pick the block height at which to operate. This references data from the /// Pick the block height at which to operate. This references data from the
@@ -35,25 +33,29 @@ impl<T: Config> OfflineClient<T> {
.metadata_for_spec_version(spec_version) .metadata_for_spec_version(spec_version)
.ok_or(OfflineClientAtBlockError::MetadataNotFound { spec_version })?; .ok_or(OfflineClientAtBlockError::MetadataNotFound { spec_version })?;
Ok(ClientAtBlock::new(OfflineClientAtBlock { Ok(ClientAtBlock::new(OfflineClientAtBlock { metadata }))
metadata,
}))
} }
} }
#[derive(Clone)]
pub struct OfflineClientAtBlock { pub struct OfflineClientAtBlock {
metadata: Arc<Metadata>, metadata: Arc<Metadata>,
} }
/// This represents an offline-only client at a specific block. /// This represents an offline-only client at a specific block.
#[doc(hidden)] #[doc(hidden)]
pub trait OfflineClientAtBlockT { pub trait OfflineClientAtBlockT: Clone {
/// Get the metadata appropriate for this block. /// Get a reference to the metadata appropriate for this block.
fn metadata(&self) -> &Metadata; fn metadata_ref(&self) -> &Metadata;
/// Get a clone of the metadata appropriate for this block.
fn metadata(&self) -> Arc<Metadata>;
} }
impl OfflineClientAtBlockT for OfflineClientAtBlock { impl OfflineClientAtBlockT for OfflineClientAtBlock {
fn metadata(&self) -> &Metadata { fn metadata_ref(&self) -> &Metadata {
&self.metadata &self.metadata
} }
fn metadata(&self) -> Arc<Metadata> {
self.metadata.clone()
}
} }
+206 -108
View File
@@ -1,17 +1,19 @@
mod block_number_or_ref; mod block_number_or_ref;
mod blocks;
use core::marker::PhantomData;
use super::ClientAtBlock; use super::ClientAtBlock;
use super::OfflineClientAtBlockT; use super::OfflineClientAtBlockT;
use crate::config::{ Config, HashFor, Header, Hasher }; use crate::backend::{Backend, BlockRef, CombinedBackend};
use crate::error::OnlineClientAtBlockError; use crate::config::{Config, HashFor, Hasher, Header};
use crate::backend::{ Backend, CombinedBackend, BlockRef }; use crate::error::{BlocksError, OnlineClientAtBlockError};
use blocks::Blocks;
use codec::{Compact, Decode, Encode}; use codec::{Compact, Decode, Encode};
use core::marker::PhantomData;
use frame_metadata::{RuntimeMetadata, RuntimeMetadataPrefixed}; use frame_metadata::{RuntimeMetadata, RuntimeMetadataPrefixed};
use scale_info_legacy::TypeRegistrySet; use scale_info_legacy::TypeRegistrySet;
use std::sync::Arc; use std::sync::Arc;
use subxt_rpcs::RpcClient;
use subxt_metadata::Metadata; use subxt_metadata::Metadata;
use subxt_rpcs::RpcClient;
#[cfg(feature = "jsonrpsee")] #[cfg(feature = "jsonrpsee")]
#[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))] #[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))]
@@ -32,7 +34,7 @@ struct OnlineClientInner<T: Config> {
backend: Arc<dyn Backend<T>>, backend: Arc<dyn Backend<T>>,
} }
impl <T: Config> std::fmt::Debug for OnlineClientInner<T> { impl<T: Config> std::fmt::Debug for OnlineClientInner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OnlineClientInner") f.debug_struct("OnlineClientInner")
.field("config", &"<config>") .field("config", &"<config>")
@@ -63,9 +65,9 @@ impl<T: Config> OnlineClient<T> {
url: url_str.to_string(), url: url_str.to_string(),
})?; })?;
if !Self::is_url_secure(&url) { if !Self::is_url_secure(&url) {
return Err(OnlineClientError::RpcError( return Err(OnlineClientError::RpcError(subxt_rpcs::Error::InsecureUrl(
subxt_rpcs::Error::InsecureUrl(url_str.to_string()), url_str.to_string(),
)); )));
} }
OnlineClient::from_insecure_url(config, url).await OnlineClient::from_insecure_url(config, url).await
} }
@@ -110,18 +112,98 @@ impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] by providing an underlying [`Backend`] /// Construct a new [`OnlineClient`] by providing an underlying [`Backend`]
/// implementation to power it. /// implementation to power it.
pub fn from_backend( pub fn from_backend(config: T, backend: impl Into<Arc<dyn Backend<T>>>) -> OnlineClient<T> {
config: T,
backend: impl Into<Arc<dyn Backend<T>>>,
) -> OnlineClient<T> {
OnlineClient { OnlineClient {
inner: Arc::new(OnlineClientInner { inner: Arc::new(OnlineClientInner {
config, config,
backend: backend.into() backend: backend.into(),
}) }),
} }
} }
/// Obtain a stream of all blocks imported by the node.
///
/// **Note:** You probably want to use [`Self::stream_blocks()`] most of
/// the time. Blocks returned here may be pruned at any time and become inaccessible,
/// leading to errors when trying to work with them.
pub async fn stream_all_blocks(&self) -> Result<Blocks<T>, BlocksError> {
// We need a hasher to know how to hash things. Thus, we need metadata to instantiate
// the hasher, so let's use the current block.
let current_block = self
.at_current_block()
.await
.map_err(BlocksError::CannotGetCurrentBlock)?;
let hasher = current_block.client.hasher.clone();
let stream = self
.inner
.backend
.stream_all_block_headers(hasher)
.await
.map_err(BlocksError::CannotGetBlockHeaderStream)?;
Ok(Blocks::from_headers_stream(self.clone(), stream))
}
/// Obtain a stream of blocks imported by the node onto the current best fork.
///
/// **Note:** You probably want to use [`Self::stream_blocks()`] most of
/// the time. Blocks returned here may be pruned at any time and become inaccessible,
/// leading to errors when trying to work with them.
pub async fn stream_best_blocks(&self) -> Result<Blocks<T>, BlocksError> {
// We need a hasher to know how to hash things. Thus, we need metadata to instantiate
// the hasher, so let's use the current block.
let current_block = self
.at_current_block()
.await
.map_err(BlocksError::CannotGetCurrentBlock)?;
let hasher = current_block.client.hasher.clone();
let stream = self
.inner
.backend
.stream_best_block_headers(hasher)
.await
.map_err(BlocksError::CannotGetBlockHeaderStream)?;
Ok(Blocks::from_headers_stream(self.clone(), stream))
}
/// Obtain a stream of finalized blocks.
pub async fn stream_blocks(&self) -> Result<Blocks<T>, BlocksError> {
// We need a hasher to know how to hash things. Thus, we need metadata to instantiate
// the hasher, so let's use the current block.
let current_block = self
.at_current_block()
.await
.map_err(BlocksError::CannotGetCurrentBlock)?;
let hasher = current_block.client.hasher.clone();
let stream = self
.inner
.backend
.stream_finalized_block_headers(hasher)
.await
.map_err(BlocksError::CannotGetBlockHeaderStream)?;
Ok(Blocks::from_headers_stream(self.clone(), stream))
}
/// Instantiate a client to work at the current finalized block _at the time of instantiation_.
/// This does not track new blocks.
pub async fn at_current_block(
&self,
) -> Result<ClientAtBlock<OnlineClientAtBlock<T>, T>, OnlineClientAtBlockError> {
let latest_block = self
.inner
.backend
.latest_finalized_block_ref()
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetCurrentBlock { reason: e })?;
self.at_block(latest_block).await
}
/// Instantiate a client for working at a specific block. /// Instantiate a client for working at a specific block.
pub async fn at_block( pub async fn at_block(
&self, &self,
@@ -140,13 +222,13 @@ impl<T: Config> OnlineClient<T> {
.await .await
.map_err(|e| OnlineClientAtBlockError::CannotGetBlockHeader { .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHeader {
block_hash: block_hash.into(), block_hash: block_hash.into(),
reason: e reason: e,
})? })?
.ok_or(OnlineClientAtBlockError::BlockHeaderNotFound { .ok_or(OnlineClientAtBlockError::BlockHeaderNotFound {
block_hash: block_hash.into() block_hash: block_hash.into(),
})?; })?;
(block_ref, block_header.number()) (block_ref, block_header.number())
}, }
BlockNumberOrRef::Number(block_num) => { BlockNumberOrRef::Number(block_num) => {
let block_ref = self let block_ref = self
.inner .inner
@@ -155,10 +237,10 @@ impl<T: Config> OnlineClient<T> {
.await .await
.map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash { .map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash {
block_number: block_num, block_number: block_num,
reason: e reason: e,
})? })?
.ok_or(OnlineClientAtBlockError::BlockNotFound { .ok_or(OnlineClientAtBlockError::BlockNotFound {
block_number: block_num block_number: block_num,
})?; })?;
(block_ref, block_num) (block_ref, block_num)
} }
@@ -176,7 +258,7 @@ impl<T: Config> OnlineClient<T> {
.await .await
.map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion { .map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion {
block_hash: block_hash.into(), block_hash: block_hash.into(),
reason: e reason: e,
})?; })?;
#[derive(codec::Decode)] #[derive(codec::Decode)]
@@ -199,101 +281,107 @@ impl<T: Config> OnlineClient<T> {
let metadata = match self.inner.config.metadata_for_spec_version(spec_version) { let metadata = match self.inner.config.metadata_for_spec_version(spec_version) {
Some(metadata) => metadata, Some(metadata) => metadata,
None => { None => {
let metadata: Metadata = match get_metadata(&*self.inner.backend, block_hash).await? { let metadata: Metadata =
m @ RuntimeMetadata::V0(_) | match get_metadata(&*self.inner.backend, block_hash).await? {
m @ RuntimeMetadata::V1(_) | m @ RuntimeMetadata::V0(_)
m @ RuntimeMetadata::V2(_) | | m @ RuntimeMetadata::V1(_)
m @ RuntimeMetadata::V3(_) | | m @ RuntimeMetadata::V2(_)
m @ RuntimeMetadata::V4(_) | | m @ RuntimeMetadata::V3(_)
m @ RuntimeMetadata::V5(_) | | m @ RuntimeMetadata::V4(_)
m @ RuntimeMetadata::V6(_) | | m @ RuntimeMetadata::V5(_)
m @ RuntimeMetadata::V7(_) => { | m @ RuntimeMetadata::V6(_)
return Err(OnlineClientAtBlockError::UnsupportedMetadataVersion { | m @ RuntimeMetadata::V7(_) => {
block_hash: block_hash.into(), return Err(OnlineClientAtBlockError::UnsupportedMetadataVersion {
version: m.version()
})
},
RuntimeMetadata::V8(m) => {
let types = get_legacy_types(self, spec_version)?;
Metadata::from_v8(&m, &types)
.map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 8, version: m.version(),
reason: e });
}
RuntimeMetadata::V8(m) => {
let types = get_legacy_types(self, spec_version)?;
Metadata::from_v8(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 8,
reason: e,
}
})? })?
}, }
RuntimeMetadata::V9(m) => { RuntimeMetadata::V9(m) => {
let types = get_legacy_types(self, spec_version)?; let types = get_legacy_types(self, spec_version)?;
Metadata::from_v9(&m, &types) Metadata::from_v9(&m, &types).map_err(|e| {
.map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 9, metadata_version: 9,
reason: e reason: e,
}
})? })?
}, }
RuntimeMetadata::V10(m) => { RuntimeMetadata::V10(m) => {
let types = get_legacy_types(self, spec_version)?; let types = get_legacy_types(self, spec_version)?;
Metadata::from_v10(&m, &types) Metadata::from_v10(&m, &types).map_err(|e| {
.map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 10, metadata_version: 10,
reason: e reason: e,
}
})? })?
}, }
RuntimeMetadata::V11(m) => { RuntimeMetadata::V11(m) => {
let types = get_legacy_types(self, spec_version)?; let types = get_legacy_types(self, spec_version)?;
Metadata::from_v11(&m, &types) Metadata::from_v11(&m, &types).map_err(|e| {
.map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 11, metadata_version: 11,
reason: e reason: e,
}
})? })?
}, }
RuntimeMetadata::V12(m) => { RuntimeMetadata::V12(m) => {
let types = get_legacy_types(self, spec_version)?; let types = get_legacy_types(self, spec_version)?;
Metadata::from_v12(&m, &types) Metadata::from_v12(&m, &types).map_err(|e| {
.map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 12, metadata_version: 12,
reason: e reason: e,
}
})? })?
}, }
RuntimeMetadata::V13(m) => { RuntimeMetadata::V13(m) => {
let types = get_legacy_types(self, spec_version)?; let types = get_legacy_types(self, spec_version)?;
Metadata::from_v13(&m, &types) Metadata::from_v13(&m, &types).map_err(|e| {
.map_err(|e| OnlineClientAtBlockError::CannotConvertLegacyMetadata { OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 13, metadata_version: 13,
reason: e reason: e,
}
})? })?
}, }
RuntimeMetadata::V14(m) => { RuntimeMetadata::V14(m) => Metadata::from_v14(m).map_err(|e| {
Metadata::from_v14(m) OnlineClientAtBlockError::CannotConvertModernMetadata {
.map_err(|e| OnlineClientAtBlockError::CannotConvertModernMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 14, metadata_version: 14,
reason: e reason: e,
})? }
}, })?,
RuntimeMetadata::V15(m) => { RuntimeMetadata::V15(m) => Metadata::from_v15(m).map_err(|e| {
Metadata::from_v15(m) OnlineClientAtBlockError::CannotConvertModernMetadata {
.map_err(|e| OnlineClientAtBlockError::CannotConvertModernMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 15, metadata_version: 15,
reason: e reason: e,
})? }
}, })?,
RuntimeMetadata::V16(m) => { RuntimeMetadata::V16(m) => Metadata::from_v16(m).map_err(|e| {
Metadata::from_v16(m) OnlineClientAtBlockError::CannotConvertModernMetadata {
.map_err(|e| OnlineClientAtBlockError::CannotConvertModernMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
metadata_version: 16, metadata_version: 16,
reason: e reason: e,
})? }
}, })?,
}; };
let metadata = Arc::new(metadata); let metadata = Arc::new(metadata);
self.inner.config.set_metadata_for_spec_version(spec_version, metadata.clone()); self.inner
.config
.set_metadata_for_spec_version(spec_version, metadata.clone());
metadata metadata
} }
}; };
@@ -307,15 +395,14 @@ impl<T: Config> OnlineClient<T> {
Ok(ClientAtBlock { Ok(ClientAtBlock {
client: online_client_at_block, client: online_client_at_block,
marker: PhantomData marker: PhantomData,
}) })
} }
} }
/// This represents an online client at a specific block. /// This represents an online client at a specific block.
#[doc(hidden)] #[doc(hidden)]
pub trait OnlineClientAtBlockT<T: Config>: OfflineClientAtBlockT pub trait OnlineClientAtBlockT<T: Config>: OfflineClientAtBlockT {
{
/// Return the RPC methods we'll use to interact with the node. /// Return the RPC methods we'll use to interact with the node.
fn backend(&self) -> &dyn Backend<T>; fn backend(&self) -> &dyn Backend<T>;
/// Return the block hash for the current block. /// Return the block hash for the current block.
@@ -325,6 +412,7 @@ pub trait OnlineClientAtBlockT<T: Config>: OfflineClientAtBlockT
} }
/// The inner type providing the necessary data to work online at a specific block. /// The inner type providing the necessary data to work online at a specific block.
#[derive(Clone)]
pub struct OnlineClientAtBlock<T: Config> { pub struct OnlineClientAtBlock<T: Config> {
metadata: Arc<Metadata>, metadata: Arc<Metadata>,
backend: Arc<dyn Backend<T>>, backend: Arc<dyn Backend<T>>,
@@ -345,12 +433,18 @@ impl<T: Config> OnlineClientAtBlockT<T> for OnlineClientAtBlock<T> {
} }
impl<T: Config> OfflineClientAtBlockT for OnlineClientAtBlock<T> { impl<T: Config> OfflineClientAtBlockT for OnlineClientAtBlock<T> {
fn metadata(&self) -> &Metadata { fn metadata_ref(&self) -> &Metadata {
&self.metadata &self.metadata
} }
fn metadata(&self) -> Arc<Metadata> {
self.metadata.clone()
}
} }
fn get_legacy_types<T: Config>(client: &OnlineClient<T>, spec_version: u32) -> Result<TypeRegistrySet<'_>, OnlineClientAtBlockError> { fn get_legacy_types<T: Config>(
client: &OnlineClient<T>,
spec_version: u32,
) -> Result<TypeRegistrySet<'_>, OnlineClientAtBlockError> {
client client
.inner .inner
.config .config
@@ -377,7 +471,11 @@ async fn get_metadata<T: Config>(
if let Some(version_to_get) = version_to_get { if let Some(version_to_get) = version_to_get {
let version_bytes = version_to_get.encode(); let version_bytes = version_to_get.encode();
let rpc_response = backend let rpc_response = backend
.call("Metadata_metadata_at_version", Some(&version_bytes), block_hash) .call(
"Metadata_metadata_at_version",
Some(&version_bytes),
block_hash,
)
.await .await
.map_err(|e| OnlineClientAtBlockError::CannotGetMetadata { .map_err(|e| OnlineClientAtBlockError::CannotGetMetadata {
block_hash: block_hash.into(), block_hash: block_hash.into(),
@@ -1,5 +1,5 @@
use crate::config::{ Config, HashFor, Hasher };
use crate::backend::BlockRef; use crate::backend::BlockRef;
use crate::config::{Config, HashFor, Hasher};
/// This represents either a block number or a reference /// This represents either a block number or a reference
/// to a block, which is essentially a block hash. /// to a block, which is essentially a block hash.
@@ -7,22 +7,22 @@ pub enum BlockNumberOrRef<T: Config> {
/// A block number. /// A block number.
Number(u64), Number(u64),
/// A block ref / hash. /// A block ref / hash.
BlockRef(BlockRef<HashFor<T>>) BlockRef(BlockRef<HashFor<T>>),
} }
impl <T: Config> From<u32> for BlockNumberOrRef<T> { impl<T: Config> From<u32> for BlockNumberOrRef<T> {
fn from(value: u32) -> Self { fn from(value: u32) -> Self {
BlockNumberOrRef::Number(value.into()) BlockNumberOrRef::Number(value.into())
} }
} }
impl <T: Config> From<u64> for BlockNumberOrRef<T> { impl<T: Config> From<u64> for BlockNumberOrRef<T> {
fn from(value: u64) -> Self { fn from(value: u64) -> Self {
BlockNumberOrRef::Number(value) BlockNumberOrRef::Number(value)
} }
} }
impl <T: Config> From<BlockRef<HashFor<T>>> for BlockNumberOrRef<T> { impl<T: Config> From<BlockRef<HashFor<T>>> for BlockNumberOrRef<T> {
fn from(block_ref: BlockRef<HashFor<T>>) -> Self { fn from(block_ref: BlockRef<HashFor<T>>) -> Self {
BlockNumberOrRef::BlockRef(block_ref) BlockNumberOrRef::BlockRef(block_ref)
} }
@@ -31,9 +31,9 @@ impl <T: Config> From<BlockRef<HashFor<T>>> for BlockNumberOrRef<T> {
// Ideally we'd have `impl From<HashFor<T>> for BlockNumberOrRef<T>` but since our config // Ideally we'd have `impl From<HashFor<T>> for BlockNumberOrRef<T>` but since our config
// could set _any_ hash type, this boils down to `impl From<H> for ..` which is too general. // could set _any_ hash type, this boils down to `impl From<H> for ..` which is too general.
// Thus, we target our current concrete hash type. // Thus, we target our current concrete hash type.
impl <T: Config> From<crate::config::substrate::H256> for BlockNumberOrRef<T> impl<T: Config> From<crate::config::substrate::H256> for BlockNumberOrRef<T>
where where
<T::Hasher as Hasher>::Hash: From<crate::config::substrate::H256> <T::Hasher as Hasher>::Hash: From<crate::config::substrate::H256>,
{ {
fn from(hash: crate::config::substrate::H256) -> Self { fn from(hash: crate::config::substrate::H256) -> Self {
BlockNumberOrRef::BlockRef(BlockRef::from_hash(hash.into())) BlockNumberOrRef::BlockRef(BlockRef::from_hash(hash.into()))
+76
View File
@@ -0,0 +1,76 @@
use crate::backend::{BlockRef, StreamOfResults};
use crate::client::{ClientAtBlock, OnlineClient, OnlineClientAtBlock};
use crate::config::{Config, HashFor, Header};
use crate::error::{BlocksError, OnlineClientAtBlockError};
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
/// A stream of blocks.
pub struct Blocks<T: Config> {
client: OnlineClient<T>,
stream: StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>,
}
impl<T: Config> Blocks<T> {
pub(crate) fn from_headers_stream(
client: OnlineClient<T>,
stream: StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>,
) -> Self {
Blocks { client, stream }
}
}
impl<T: Config> Stream for Blocks<T> {
type Item = Result<Block<T>, BlocksError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = match self.stream.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(item)) => item,
};
let res = match item {
Ok((block_header, block_ref)) => Ok(Block {
block_ref,
block_header,
client: self.client.clone(),
}),
Err(e) => Err(BlocksError::CannotGetBlockHeader(e)),
};
Poll::Ready(Some(res))
}
}
/// A block from the stream of blocks.
pub struct Block<T: Config> {
block_ref: BlockRef<HashFor<T>>,
block_header: T::Header,
client: OnlineClient<T>,
}
impl<T: Config> Block<T> {
/// The block hash
pub fn hash(&self) -> HashFor<T> {
self.block_ref.hash()
}
/// The block number.
pub fn number(&self) -> u64 {
self.block_header.number()
}
/// The block header.
pub fn header(&self) -> &T::Header {
&self.block_header
}
/// Instantiate a client at this block.
pub async fn client(
&self,
) -> Result<ClientAtBlock<OnlineClientAtBlock<T>, T>, OnlineClientAtBlockError> {
self.client.at_block(self.block_ref.clone()).await
}
}
+7 -14
View File
@@ -19,10 +19,10 @@ use codec::{Decode, Encode};
use core::fmt::Debug; use core::fmt::Debug;
use scale_decode::DecodeAsType; use scale_decode::DecodeAsType;
use scale_encode::EncodeAsType; use scale_encode::EncodeAsType;
use serde::{Serialize, de::DeserializeOwned};
use subxt_metadata::Metadata;
use std::{fmt::Display, marker::PhantomData, sync::Arc};
use scale_info_legacy::TypeRegistrySet; use scale_info_legacy::TypeRegistrySet;
use serde::{Serialize, de::DeserializeOwned};
use std::{fmt::Display, marker::PhantomData, sync::Arc};
use subxt_metadata::Metadata;
use subxt_rpcs::RpcConfig; use subxt_rpcs::RpcConfig;
pub use default_extrinsic_params::{DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder}; pub use default_extrinsic_params::{DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder};
@@ -72,21 +72,14 @@ pub trait Config: Clone + Debug + Sized + Send + Sync + 'static {
/// The [`crate::client::OnlineClient`] will look this up on chain if it's not available here, and then /// The [`crate::client::OnlineClient`] will look this up on chain if it's not available here, and then
/// call [`Config::set_metadata_for_spec_version`] to give the configuration the opportunity to cache it. /// call [`Config::set_metadata_for_spec_version`] to give the configuration the opportunity to cache it.
/// The [`crate::client::OfflineClient`] will error if this is not available for the required spec version. /// The [`crate::client::OfflineClient`] will error if this is not available for the required spec version.
fn metadata_for_spec_version( fn metadata_for_spec_version(&self, _spec_version: u32) -> Option<Arc<Metadata>> {
&self,
_spec_version: u32,
) -> Option<Arc<Metadata>> {
None None
} }
/// Set some metadata for a given spec version. the [`crate::client::OnlineClient`] will call this if it has /// Set some metadata for a given spec version. the [`crate::client::OnlineClient`] will call this if it has
/// to retrieve metadata from the chain, to give this the opportunity to cache it. The configuration can /// to retrieve metadata from the chain, to give this the opportunity to cache it. The configuration can
/// do nothing if it prefers. /// do nothing if it prefers.
fn set_metadata_for_spec_version( fn set_metadata_for_spec_version(&self, _spec_version: u32, _metadata: Arc<Metadata>) {}
&self,
_spec_version: u32,
_metadata: Arc<Metadata>,
) {}
/// Return legacy types (ie types to use with Runtimes that return pre-V14 metadata) for a given spec version. /// Return legacy types (ie types to use with Runtimes that return pre-V14 metadata) for a given spec version.
/// If this returns `None`, [`subxt`] will return an error if type definitions are needed to access some older /// If this returns `None`, [`subxt`] will return an error if type definitions are needed to access some older
@@ -105,10 +98,10 @@ pub trait Config: Clone + Debug + Sized + Send + Sync + 'static {
/// `RpcConfigFor<Config>` can be used anywhere which requires an implementation of [`subxt_rpcs::RpcConfig`]. /// `RpcConfigFor<Config>` can be used anywhere which requires an implementation of [`subxt_rpcs::RpcConfig`].
/// This is only needed at the type level, and so there is no way to construct this. /// This is only needed at the type level, and so there is no way to construct this.
pub struct RpcConfigFor<T> { pub struct RpcConfigFor<T> {
marker: PhantomData<T> marker: PhantomData<T>,
} }
impl <T: Config> RpcConfig for RpcConfigFor<T> { impl<T: Config> RpcConfig for RpcConfigFor<T> {
type Hash = HashFor<T>; type Hash = HashFor<T>;
type Header = T::Header; type Header = T::Header;
type AccountId = T::AccountId; type AccountId = T::AccountId;
+2 -2
View File
@@ -11,9 +11,9 @@ use crate::{
config::{Config, HashFor}, config::{Config, HashFor},
error::ExtrinsicParamsError, error::ExtrinsicParamsError,
}; };
use subxt_metadata::Metadata;
use std::sync::Arc;
use core::any::Any; use core::any::Any;
use std::sync::Arc;
use subxt_metadata::Metadata;
/// This provides access to some relevant client state in transaction extensions, /// This provides access to some relevant client state in transaction extensions,
/// and is just a combination of some of the available properties. /// and is just a combination of some of the available properties.
+4 -11
View File
@@ -7,11 +7,11 @@
use super::{Config, DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder}; use super::{Config, DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder};
use crate::config::substrate::{SubstrateConfig, SubstrateConfigBuilder}; use crate::config::substrate::{SubstrateConfig, SubstrateConfigBuilder};
use std::sync::Arc;
use scale_info_legacy::TypeRegistrySet; use scale_info_legacy::TypeRegistrySet;
use std::sync::Arc;
use subxt_metadata::Metadata; use subxt_metadata::Metadata;
pub use crate::config::substrate::{ SpecVersionForRange, SubstrateHeader }; pub use crate::config::substrate::{SpecVersionForRange, SubstrateHeader};
pub use crate::utils::{AccountId32, MultiAddress, MultiSignature}; pub use crate::utils::{AccountId32, MultiAddress, MultiSignature};
pub use primitive_types::{H256, U256}; pub use primitive_types::{H256, U256};
@@ -79,18 +79,11 @@ impl Config for PolkadotConfig {
self.0.spec_version_for_block_number(block_number) self.0.spec_version_for_block_number(block_number)
} }
fn metadata_for_spec_version( fn metadata_for_spec_version(&self, spec_version: u32) -> Option<Arc<Metadata>> {
&self,
spec_version: u32,
) -> Option<Arc<Metadata>> {
self.0.metadata_for_spec_version(spec_version) self.0.metadata_for_spec_version(spec_version)
} }
fn set_metadata_for_spec_version( fn set_metadata_for_spec_version(&self, spec_version: u32, metadata: Arc<Metadata>) {
&self,
spec_version: u32,
metadata: Arc<Metadata>,
) {
self.0.set_metadata_for_spec_version(spec_version, metadata) self.0.set_metadata_for_spec_version(spec_version, metadata)
} }
} }
+10 -19
View File
@@ -6,16 +6,16 @@
use super::{Config, DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder, Hasher, Header}; use super::{Config, DefaultExtrinsicParams, DefaultExtrinsicParamsBuilder, Hasher, Header};
use crate::config::Hash; use crate::config::Hash;
use crate::utils::RangeMap;
pub use crate::utils::{AccountId32, MultiAddress, MultiSignature}; pub use crate::utils::{AccountId32, MultiAddress, MultiSignature};
use codec::{Decode, Encode}; use codec::{Decode, Encode};
pub use primitive_types::{H256, U256}; pub use primitive_types::{H256, U256};
use serde::{Deserialize, Serialize};
use subxt_metadata::Metadata;
use crate::utils::RangeMap;
use scale_info_legacy::{ChainTypeRegistry, TypeRegistrySet}; use scale_info_legacy::{ChainTypeRegistry, TypeRegistrySet};
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use subxt_metadata::Metadata;
/// Construct a [`SubstrateConfig`] using this. /// Construct a [`SubstrateConfig`] using this.
pub struct SubstrateConfigBuilder { pub struct SubstrateConfigBuilder {
@@ -81,10 +81,7 @@ impl SubstrateConfigBuilder {
/// The storage hasher encoding/decoding changed during V9 metadata. By default we support the "new" version /// The storage hasher encoding/decoding changed during V9 metadata. By default we support the "new" version
/// of things. We can use this option to support the old version of things prior to a given spec version. /// of things. We can use this option to support the old version of things prior to a given spec version.
pub fn use_old_v9_hashers_before_spec_version( pub fn use_old_v9_hashers_before_spec_version(mut self, spec_version: u32) -> Self {
mut self,
spec_version: u32
) -> Self {
self.use_old_v9_hashers_before_spec_version = spec_version; self.use_old_v9_hashers_before_spec_version = spec_version;
self self
} }
@@ -96,7 +93,7 @@ impl SubstrateConfigBuilder {
legacy_types: self.legacy_types, legacy_types: self.legacy_types,
spec_version_for_block_number: self.spec_version_for_block_number, spec_version_for_block_number: self.spec_version_for_block_number,
metadata_for_spec_version: self.metadata_for_spec_version, metadata_for_spec_version: self.metadata_for_spec_version,
}) }),
} }
} }
} }
@@ -116,7 +113,7 @@ pub struct SpecVersionForRange {
/// that have not customized the block hash type). /// that have not customized the block hash type).
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SubstrateConfig { pub struct SubstrateConfig {
inner: Arc<SubstrateConfigInner> inner: Arc<SubstrateConfigInner>,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -150,15 +147,13 @@ impl Config for SubstrateConfig {
} }
fn spec_version_for_block_number(&self, block_number: u64) -> Option<u32> { fn spec_version_for_block_number(&self, block_number: u64) -> Option<u32> {
self.inner.spec_version_for_block_number self.inner
.spec_version_for_block_number
.get(block_number) .get(block_number)
.copied() .copied()
} }
fn metadata_for_spec_version( fn metadata_for_spec_version(&self, spec_version: u32) -> Option<Arc<Metadata>> {
&self,
spec_version: u32,
) -> Option<Arc<Metadata>> {
self.inner self.inner
.metadata_for_spec_version .metadata_for_spec_version
.lock() .lock()
@@ -167,11 +162,7 @@ impl Config for SubstrateConfig {
.cloned() .cloned()
} }
fn set_metadata_for_spec_version( fn set_metadata_for_spec_version(&self, spec_version: u32, metadata: Arc<Metadata>) {
&self,
spec_version: u32,
metadata: Arc<Metadata>,
) {
self.inner self.inner
.metadata_for_spec_version .metadata_for_spec_version
.lock() .lock()
+2 -2
View File
@@ -7,7 +7,7 @@
//! [`AnyOf`] to configure the set of transaction extensions which are known about //! [`AnyOf`] to configure the set of transaction extensions which are known about
//! when interacting with a chain. //! when interacting with a chain.
use super::extrinsic_params::{ ExtrinsicParams, ClientState }; use super::extrinsic_params::{ClientState, ExtrinsicParams};
use crate::config::ExtrinsicParamsEncoder; use crate::config::ExtrinsicParamsEncoder;
use crate::config::{Config, HashFor}; use crate::config::{Config, HashFor};
use crate::error::ExtrinsicParamsError; use crate::error::ExtrinsicParamsError;
@@ -16,9 +16,9 @@ use codec::{Compact, Encode};
use core::any::Any; use core::any::Any;
use core::fmt::Debug; use core::fmt::Debug;
use derive_where::derive_where; use derive_where::derive_where;
use std::collections::HashMap;
use scale_decode::DecodeAsType; use scale_decode::DecodeAsType;
use scale_info::PortableRegistry; use scale_info::PortableRegistry;
use std::collections::HashMap;
// Re-export this here; it's a bit generically named to be re-exported from ::config. // Re-export this here; it's a bit generically named to be re-exported from ::config.
pub use super::extrinsic_params::Params; pub use super::extrinsic_params::Params;
+154 -122
View File
@@ -7,8 +7,8 @@
mod dispatch_error; mod dispatch_error;
mod hex; mod hex;
use thiserror::Error as DeriveError;
use std::borrow::Cow; use std::borrow::Cow;
use thiserror::Error as DeriveError;
#[cfg(feature = "unstable-light-client")] #[cfg(feature = "unstable-light-client")]
pub use subxt_lightclient::LightClientError; pub use subxt_lightclient::LightClientError;
@@ -19,10 +19,10 @@ pub use dispatch_error::{
}; };
// Re-expose the errors we use from other crates here: // Re-expose the errors we use from other crates here:
pub use subxt_metadata::Metadata;
pub use hex::Hex; pub use hex::Hex;
pub use scale_decode::Error as DecodeError; pub use scale_decode::Error as DecodeError;
pub use scale_encode::Error as EncodeError; pub use scale_encode::Error as EncodeError;
pub use subxt_metadata::Metadata;
pub use subxt_metadata::TryFromError as MetadataTryFromError; pub use subxt_metadata::TryFromError as MetadataTryFromError;
/// A global error type. Any of the errors exposed here can convert into this /// A global error type. Any of the errors exposed here can convert into this
@@ -50,14 +50,10 @@ pub enum Error {
#[error(transparent)] #[error(transparent)]
BackendError(#[from] BackendError), BackendError(#[from] BackendError),
#[error(transparent)] #[error(transparent)]
BlockError(#[from] BlockError), BlocksError(#[from] BlocksError),
#[error(transparent)] #[error(transparent)]
AccountNonceError(#[from] AccountNonceError), AccountNonceError(#[from] AccountNonceError),
#[error(transparent)] #[error(transparent)]
RuntimeUpdaterError(#[from] RuntimeUpdaterError),
#[error(transparent)]
RuntimeUpdateeApplyError(#[from] RuntimeUpdateeApplyError),
#[error(transparent)]
RuntimeApiError(#[from] RuntimeApiError), RuntimeApiError(#[from] RuntimeApiError),
#[error(transparent)] #[error(transparent)]
EventsError(#[from] EventsError), EventsError(#[from] EventsError),
@@ -143,10 +139,12 @@ impl Error {
fn backend_error(&self) -> Option<&BackendError> { fn backend_error(&self) -> Option<&BackendError> {
match self { match self {
Error::BlockError(e) => e.backend_error(), // Many of these contain no backend error, but keep the checks next to
// the actual error types to make it harder to miss adding any, and be exhaustive
// here so new error variants are not missed as easily.
Error::BlocksError(e) => e.backend_error(),
Error::AccountNonceError(e) => e.backend_error(), Error::AccountNonceError(e) => e.backend_error(),
Error::OnlineClientError(e) => e.backend_error(), Error::OnlineClientError(e) => e.backend_error(),
Error::RuntimeUpdaterError(e) => e.backend_error(),
Error::RuntimeApiError(e) => e.backend_error(), Error::RuntimeApiError(e) => e.backend_error(),
Error::EventsError(e) => e.backend_error(), Error::EventsError(e) => e.backend_error(),
Error::ExtrinsicError(e) => e.backend_error(), Error::ExtrinsicError(e) => e.backend_error(),
@@ -155,8 +153,23 @@ impl Error {
Error::TransactionEventsError(e) => e.backend_error(), Error::TransactionEventsError(e) => e.backend_error(),
Error::TransactionFinalizedSuccessError(e) => e.backend_error(), Error::TransactionFinalizedSuccessError(e) => e.backend_error(),
Error::StorageError(e) => e.backend_error(), Error::StorageError(e) => e.backend_error(),
// Any errors that **don't** return a BackendError anywhere will return None: Error::OfflineClientAtBlockError(e) => e.backend_error(),
_ => None, Error::OnlineClientAtBlockError(e) => e.backend_error(),
Error::ExtrinsicDecodeErrorAt(e) => e.backend_error(),
Error::ConstantError(e) => e.backend_error(),
Error::CustomValueError(e) => e.backend_error(),
Error::StorageKeyError(e) => e.backend_error(),
Error::StorageValueError(e) => e.backend_error(),
Error::TransactionStatusError(e) => e.backend_error(),
Error::ModuleErrorDetailsError(e) => e.backend_error(),
Error::ModuleErrorDecodeError(e) => e.backend_error(),
Error::DispatchErrorDecodeError(e) => e.backend_error(),
// BackendError is always a BackendError:
Error::BackendError(e) => Some(e),
// Other errors come from different crates so can never contain a BackendError:
Error::OtherRpcClientError(_) => None,
Error::OtherCodecError(_) => None,
Error::Other(_) => None,
} }
} }
} }
@@ -182,6 +195,12 @@ pub enum OfflineClientAtBlockError {
}, },
} }
impl OfflineClientAtBlockError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
#[allow(missing_docs)] #[allow(missing_docs)]
@@ -219,11 +238,39 @@ impl OnlineClientError {
} }
} }
/// Errors constructing streams of blocks.
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum BlocksError {
#[error("Cannot construct block stream: cannot get the current block: {0}")]
CannotGetCurrentBlock(OnlineClientAtBlockError),
#[error("Cannot construct block stream: cannot get block header stream: {0}")]
CannotGetBlockHeaderStream(BackendError),
#[error("Error streaming blocks: cannot get the next block header: {0}")]
CannotGetBlockHeader(BackendError),
}
impl BlocksError {
fn backend_error(&self) -> Option<&BackendError> {
match self {
BlocksError::CannotGetCurrentBlock(e) => e.backend_error(),
BlocksError::CannotGetBlockHeaderStream(e) => Some(e),
BlocksError::CannotGetBlockHeader(e) => Some(e),
}
}
}
/// Errors constructing an online client at a specific block number. /// Errors constructing an online client at a specific block number.
#[allow(missing_docs)] #[allow(missing_docs)]
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
pub enum OnlineClientAtBlockError { pub enum OnlineClientAtBlockError {
#[error("Cannot construct OnlineClientAtBlock: cannot get the current block: {reason}")]
CannotGetCurrentBlock {
/// The error we encountered.
reason: BackendError,
},
#[error( #[error(
"Cannot construct OnlineClientAtBlock: failed to get block hash from node for block {block_number}: {reason}" "Cannot construct OnlineClientAtBlock: failed to get block hash from node for block {block_number}: {reason}"
)] )]
@@ -238,19 +285,25 @@ pub enum OnlineClientAtBlockError {
/// The block number for which a block was not found. /// The block number for which a block was not found.
block_number: u64, block_number: u64,
}, },
#[error("Cannot construct OnlineClientAtBlock: cannot get the block header for block {block_hash}: {reason}")] #[error(
"Cannot construct OnlineClientAtBlock: cannot get the block header for block {block_hash}: {reason}"
)]
CannotGetBlockHeader { CannotGetBlockHeader {
/// Block hash that we failed to fetch the header for. /// Block hash that we failed to fetch the header for.
block_hash: Hex, block_hash: Hex,
/// The error we encountered. /// The error we encountered.
reason: BackendError, reason: BackendError,
}, },
#[error("Cannot construct OnlineClientAtBlock: cannot find the block header for block {block_hash}")] #[error(
"Cannot construct OnlineClientAtBlock: cannot find the block header for block {block_hash}"
)]
BlockHeaderNotFound { BlockHeaderNotFound {
/// Block hash that we failed to find the header for. /// Block hash that we failed to find the header for.
block_hash: Hex, block_hash: Hex,
}, },
#[error("Cannot construct OnlineClientAtBlock: failed to obtain spec version for block {block_hash}: {reason}")] #[error(
"Cannot construct OnlineClientAtBlock: failed to obtain spec version for block {block_hash}: {reason}"
)]
CannotGetSpecVersion { CannotGetSpecVersion {
/// The block hash for which we failed to obtain the spec version. /// The block hash for which we failed to obtain the spec version.
block_hash: Hex, block_hash: Hex,
@@ -275,41 +328,59 @@ pub enum OnlineClientAtBlockError {
/// The error we encountered. /// The error we encountered.
reason: String, reason: String,
}, },
#[error("Cannot construct OnlineClientAtBlock: Metadata V{version} (required at block {block_hash} is not supported.")] #[error(
"Cannot construct OnlineClientAtBlock: Metadata V{version} (required at block {block_hash} is not supported."
)]
UnsupportedMetadataVersion { UnsupportedMetadataVersion {
/// The block hash that requires the unsupported version. /// The block hash that requires the unsupported version.
block_hash: Hex, block_hash: Hex,
/// The unsupported metadata version. /// The unsupported metadata version.
version: u32, version: u32,
}, },
#[error("Cannot construct OnlineClientAtBlock: No legacy types were provided but we're trying to access a block that requires them.")] #[error(
"Cannot construct OnlineClientAtBlock: No legacy types were provided but we're trying to access a block that requires them."
)]
MissingLegacyTypes, MissingLegacyTypes,
#[error("Cannot construct OnlineClientAtBlock: unable to convert legacy metadata (required at block {block_hash}): {reason}")] #[error(
"Cannot construct OnlineClientAtBlock: unable to convert legacy metadata (required at block {block_hash}): {reason}"
)]
CannotConvertLegacyMetadata { CannotConvertLegacyMetadata {
/// The block hash that requires legacy types. /// The block hash that requires legacy types.
block_hash: Hex, block_hash: Hex,
/// The metadata version. /// The metadata version.
metadata_version: u32, metadata_version: u32,
/// Reason the conversion failed. /// Reason the conversion failed.
reason: subxt_metadata::LegacyFromError reason: subxt_metadata::LegacyFromError,
}, },
#[error("Cannot construct OnlineClientAtBlock: unable to convert modern metadata (required at block {block_hash}): {reason}")] #[error(
"Cannot construct OnlineClientAtBlock: unable to convert modern metadata (required at block {block_hash}): {reason}"
)]
CannotConvertModernMetadata { CannotConvertModernMetadata {
/// The block hash that requires legacy types. /// The block hash that requires legacy types.
block_hash: Hex, block_hash: Hex,
/// The metadata version. /// The metadata version.
metadata_version: u32, metadata_version: u32,
/// Reason the conversion failed. /// Reason the conversion failed.
reason: subxt_metadata::TryFromError reason: subxt_metadata::TryFromError,
} }, // #[error(
// "Cannot construct OnlineClientAtBlock: cannot inject types from metadata: failure to parse a type found in the metadata: {parse_error}"
// )]
// CannotInjectMetadataTypes {
// /// Error parsing a type found in the metadata.
// parse_error: scale_info_legacy::lookup_name::ParseError,
// },
}
// #[error( impl OnlineClientAtBlockError {
// "Cannot construct OnlineClientAtBlock: cannot inject types from metadata: failure to parse a type found in the metadata: {parse_error}" fn backend_error(&self) -> Option<&BackendError> {
// )] match self {
// CannotInjectMetadataTypes { OnlineClientAtBlockError::CannotGetCurrentBlock { reason }
// /// Error parsing a type found in the metadata. | OnlineClientAtBlockError::CannotGetBlockHash { reason, .. }
// parse_error: scale_info_legacy::lookup_name::ParseError, | OnlineClientAtBlockError::CannotGetBlockHeader { reason, .. }
// }, | OnlineClientAtBlockError::CannotGetSpecVersion { reason, .. } => Some(reason),
_ => None,
}
}
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@@ -362,7 +433,7 @@ impl From<subxt_rpcs::Error> for BackendError {
#[allow(missing_docs)] #[allow(missing_docs)]
pub enum CombinedBackendError { pub enum CombinedBackendError {
#[error("Could not obtain the list of RPC methods to determine which backends can be used")] #[error("Could not obtain the list of RPC methods to determine which backends can be used")]
CouldNotObtainRpcMethodList(subxt_rpcs::Error) CouldNotObtainRpcMethodList(subxt_rpcs::Error),
} }
/// An RPC error. Since we are generic over the RPC client that is used, /// An RPC error. Since we are generic over the RPC client that is used,
@@ -382,49 +453,6 @@ pub enum RpcError {
SubscriptionDropped, SubscriptionDropped,
} }
/// Block error
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[allow(missing_docs)]
pub enum BlockError {
#[error(
"Could not find the block body with hash {block_hash} (perhaps it was on a non-finalized fork?)"
)]
BlockNotFound { block_hash: Hex },
#[error("Could not download the block header with hash {block_hash}: {reason}")]
CouldNotGetBlockHeader {
block_hash: Hex,
reason: BackendError,
},
#[error("Could not download the latest block header: {0}")]
CouldNotGetLatestBlock(BackendError),
#[error("Could not subscribe to all blocks: {0}")]
CouldNotSubscribeToAllBlocks(BackendError),
#[error("Could not subscribe to best blocks: {0}")]
CouldNotSubscribeToBestBlocks(BackendError),
#[error("Could not subscribe to finalized blocks: {0}")]
CouldNotSubscribeToFinalizedBlocks(BackendError),
#[error("Error getting account nonce at block {block_hash}")]
AccountNonceError {
block_hash: Hex,
account_id: Hex,
reason: AccountNonceError,
},
}
impl BlockError {
fn backend_error(&self) -> Option<&BackendError> {
match self {
BlockError::CouldNotGetBlockHeader { reason: e, .. }
| BlockError::CouldNotGetLatestBlock(e)
| BlockError::CouldNotSubscribeToAllBlocks(e)
| BlockError::CouldNotSubscribeToBestBlocks(e)
| BlockError::CouldNotSubscribeToFinalizedBlocks(e) => Some(e),
_ => None,
}
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
#[allow(missing_docs)] #[allow(missing_docs)]
@@ -446,56 +474,6 @@ impl AccountNonceError {
} }
} }
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[allow(missing_docs)]
pub enum RuntimeUpdaterError {
#[error("Error subscribing to runtime updates: The update stream ended unexpectedly")]
UnexpectedEndOfUpdateStream,
#[error("Error subscribing to runtime updates: The finalized block stream ended unexpectedly")]
UnexpectedEndOfBlockStream,
#[error("Error subscribing to runtime updates: Can't stream runtime version: {0}")]
CannotStreamRuntimeVersion(BackendError),
#[error("Error subscribing to runtime updates: Can't get next runtime version in stream: {0}")]
CannotGetNextRuntimeVersion(BackendError),
#[error("Error subscribing to runtime updates: Cannot stream finalized blocks: {0}")]
CannotStreamFinalizedBlocks(BackendError),
#[error("Error subscribing to runtime updates: Cannot get next finalized block in stream: {0}")]
CannotGetNextFinalizedBlock(BackendError),
#[error("Cannot fetch new metadata for runtime update: {0}")]
CannotFetchNewMetadata(BackendError),
#[error(
"Error subscribing to runtime updates: Cannot find the System.LastRuntimeUpgrade storage entry"
)]
CantFindSystemLastRuntimeUpgrade,
#[error("Error subscribing to runtime updates: Cannot fetch last runtime upgrade: {0}")]
CantFetchLastRuntimeUpgrade(StorageError),
#[error("Error subscribing to runtime updates: Cannot decode last runtime upgrade: {0}")]
CannotDecodeLastRuntimeUpgrade(StorageValueError),
}
impl RuntimeUpdaterError {
fn backend_error(&self) -> Option<&BackendError> {
match self {
RuntimeUpdaterError::CannotStreamRuntimeVersion(e)
| RuntimeUpdaterError::CannotGetNextRuntimeVersion(e)
| RuntimeUpdaterError::CannotStreamFinalizedBlocks(e)
| RuntimeUpdaterError::CannotGetNextFinalizedBlock(e)
| RuntimeUpdaterError::CannotFetchNewMetadata(e) => Some(e),
_ => None,
}
}
}
/// Error that can occur during upgrade.
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum RuntimeUpdateeApplyError {
#[error("The proposed runtime update is the same as the current version")]
SameVersion,
}
/// Error working with Runtime APIs /// Error working with Runtime APIs
#[non_exhaustive] #[non_exhaustive]
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@@ -704,6 +682,12 @@ pub enum CustomValueError {
CouldNotDecodeCustomValue(frame_decode::custom_values::CustomValueDecodeError<u32>), CouldNotDecodeCustomValue(frame_decode::custom_values::CustomValueDecodeError<u32>),
} }
impl CustomValueError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
/// Error working with View Functions. /// Error working with View Functions.
#[non_exhaustive] #[non_exhaustive]
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@@ -781,6 +765,12 @@ pub enum TransactionStatusError {
Dropped(String), Dropped(String),
} }
impl TransactionStatusError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
/// Error fetching events for a just-submitted transaction /// Error fetching events for a just-submitted transaction
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
@@ -876,6 +866,12 @@ pub enum ModuleErrorDetailsError {
}, },
} }
impl ModuleErrorDetailsError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
/// Error decoding the [`ModuleError`] /// Error decoding the [`ModuleError`]
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
@@ -883,6 +879,12 @@ pub enum ModuleErrorDetailsError {
#[error("Could not decode the DispatchError::Module payload into the given type: {0}")] #[error("Could not decode the DispatchError::Module payload into the given type: {0}")]
pub struct ModuleErrorDecodeError(scale_decode::Error); pub struct ModuleErrorDecodeError(scale_decode::Error);
impl ModuleErrorDecodeError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
/// Error decoding the [`DispatchError`] /// Error decoding the [`DispatchError`]
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
@@ -901,6 +903,12 @@ pub enum DispatchErrorDecodeError {
}, },
} }
impl DispatchErrorDecodeError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
/// Error working with storage. /// Error working with storage.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
@@ -984,6 +992,12 @@ pub enum ConstantError {
ConstantInfoError(frame_decode::constants::ConstantInfoError<'static>), ConstantInfoError(frame_decode::constants::ConstantInfoError<'static>),
} }
impl ConstantError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
#[derive(Debug, DeriveError)] #[derive(Debug, DeriveError)]
#[non_exhaustive] #[non_exhaustive]
#[allow(missing_docs)] #[allow(missing_docs)]
@@ -1006,6 +1020,12 @@ pub enum StorageKeyError {
}, },
} }
impl StorageKeyError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
#[derive(Debug, DeriveError)] #[derive(Debug, DeriveError)]
#[non_exhaustive] #[non_exhaustive]
#[allow(missing_docs)] #[allow(missing_docs)]
@@ -1018,6 +1038,12 @@ pub enum StorageValueError {
LeftoverBytes { bytes: Vec<u8> }, LeftoverBytes { bytes: Vec<u8> },
} }
impl StorageValueError {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
#[allow(missing_docs)] #[allow(missing_docs)]
@@ -1027,6 +1053,12 @@ pub struct ExtrinsicDecodeErrorAt {
pub error: ExtrinsicDecodeErrorAtReason, pub error: ExtrinsicDecodeErrorAtReason,
} }
impl ExtrinsicDecodeErrorAt {
fn backend_error(&self) -> Option<&BackendError> {
None
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
#[allow(missing_docs)] #[allow(missing_docs)]
+1 -1
View File
@@ -6,10 +6,10 @@
//! something fails in trying to submit/execute a transaction. //! something fails in trying to submit/execute a transaction.
use super::{DispatchErrorDecodeError, ModuleErrorDecodeError, ModuleErrorDetailsError}; use super::{DispatchErrorDecodeError, ModuleErrorDecodeError, ModuleErrorDetailsError};
use subxt_metadata::Metadata;
use core::fmt::Debug; use core::fmt::Debug;
use scale_decode::{DecodeAsType, TypeResolver, visitor::DecodeAsTypeResult}; use scale_decode::{DecodeAsType, TypeResolver, visitor::DecodeAsTypeResult};
use std::{borrow::Cow, marker::PhantomData}; use std::{borrow::Cow, marker::PhantomData};
use subxt_metadata::Metadata;
/// An error dispatching a transaction. /// An error dispatching a transaction.
#[derive(Debug, thiserror::Error, PartialEq, Eq)] #[derive(Debug, thiserror::Error, PartialEq, Eq)]
+2 -2
View File
@@ -32,11 +32,11 @@ mod only_used_in_docs_or_tests {
// #[macro_use] // #[macro_use]
// mod macros; // mod macros;
pub mod config; pub mod backend;
pub mod client; pub mod client;
pub mod config;
pub mod error; pub mod error;
pub mod utils; pub mod utils;
pub mod backend;
// pub mod book; // pub mod book;
// pub mod blocks; // pub mod blocks;
// pub mod constants; // pub mod constants;
+2 -2
View File
@@ -10,22 +10,22 @@ pub mod bits;
mod era; mod era;
mod multi_address; mod multi_address;
mod multi_signature; mod multi_signature;
mod range_map;
mod static_type; mod static_type;
mod unchecked_extrinsic; mod unchecked_extrinsic;
mod wrapper_opaque; mod wrapper_opaque;
mod yesnomaybe; mod yesnomaybe;
mod range_map;
use codec::{Compact, Decode, Encode}; use codec::{Compact, Decode, Encode};
use derive_where::derive_where; use derive_where::derive_where;
pub use range_map::{ RangeMap, RangeMapBuilder, RangeMapError };
pub use account_id::AccountId32; pub use account_id::AccountId32;
pub use account_id20::AccountId20; pub use account_id20::AccountId20;
pub use era::Era; pub use era::Era;
pub use multi_address::MultiAddress; pub use multi_address::MultiAddress;
pub use multi_signature::MultiSignature; pub use multi_signature::MultiSignature;
pub use primitive_types::{H160, H256, H512}; pub use primitive_types::{H160, H256, H512};
pub use range_map::{RangeMap, RangeMapBuilder, RangeMapError};
pub use static_type::Static; pub use static_type::Static;
pub use unchecked_extrinsic::UncheckedExtrinsic; pub use unchecked_extrinsic::UncheckedExtrinsic;
pub use wrapper_opaque::WrapperKeepOpaque; pub use wrapper_opaque::WrapperKeepOpaque;
+1 -1
View File
@@ -110,7 +110,7 @@ pub enum RangeMapError<K: Display> {
/// The range being proposed / added. /// The range being proposed / added.
proposed: (K, K), proposed: (K, K),
/// The existing range which overlaps. /// The existing range which overlaps.
existing: (K, K) existing: (K, K),
}, },
} }
+3 -3
View File
@@ -9,10 +9,10 @@
//! runtime APIs. Deriving `EncodeAsType` would lead to the inner //! runtime APIs. Deriving `EncodeAsType` would lead to the inner
//! bytes to be re-encoded (length prefixed). //! bytes to be re-encoded (length prefixed).
use core::marker::PhantomData;
use codec::{Decode, Encode};
use scale_decode::{DecodeAsType, IntoVisitor, TypeResolver, Visitor, visitor::DecodeAsTypeResult};
use super::{Encoded, Static}; use super::{Encoded, Static};
use codec::{Decode, Encode};
use core::marker::PhantomData;
use scale_decode::{DecodeAsType, IntoVisitor, TypeResolver, Visitor, visitor::DecodeAsTypeResult};
/// The unchecked extrinsic from substrate. /// The unchecked extrinsic from substrate.
#[derive(Clone, Debug, Eq, PartialEq, Encode)] #[derive(Clone, Debug, Eq, PartialEq, Encode)]