// Copyright 2019-2025 Parity Technologies (UK) Ltd. // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. //! This module exposes a backend trait for Subxt which allows us to get and set //! the necessary information (probably from a JSON-RPC API, but that's up to the //! implementation). pub mod chain_head; pub mod legacy; pub mod utils; use crate::config::{Config, HashFor}; use crate::error::BackendError; use async_trait::async_trait; use codec::{Decode, Encode}; use futures::{Stream, StreamExt}; use std::pin::Pin; use std::sync::Arc; use pezkuwi_subxt_core::client::RuntimeVersion; use pezkuwi_subxt_metadata::Metadata; /// Some re-exports from the [`pezkuwi_subxt_rpcs`] crate, also accessible in full via [`crate::ext::pezkuwi_subxt_rpcs`]. pub mod rpc { pub use pezkuwi_subxt_rpcs::client::{RawRpcFuture, RawRpcSubscription, RawValue, RpcParams}; pub use pezkuwi_subxt_rpcs::{RpcClient, RpcClientT, rpc_params}; crate::macros::cfg_reconnecting_rpc_client! { /// An RPC client that automatically reconnects. /// /// # Example /// /// ```rust,no_run,standalone_crate /// use std::time::Duration; /// use futures::StreamExt; /// use subxt::backend::rpc::reconnecting_rpc_client::{RpcClient, ExponentialBackoff}; /// use subxt::{OnlineClient, PolkadotConfig}; /// /// #[tokio::main] /// async fn main() { /// let rpc = RpcClient::builder() /// .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) /// .build("ws://localhost:9944".to_string()) /// .await /// .unwrap(); /// /// let subxt_client: OnlineClient = OnlineClient::from_rpc_client(rpc.clone()).await.unwrap(); /// let mut blocks_sub = subxt_client.blocks().subscribe_finalized().await.unwrap(); /// /// while let Some(block) = blocks_sub.next().await { /// let block = match block { /// Ok(b) => b, /// Err(e) => { /// if e.is_disconnected_will_reconnect() { /// println!("The RPC connection was lost and we may have missed a few blocks"); /// continue; /// } else { /// panic!("Error: {}", e); /// } /// } /// }; /// println!("Block #{} ({})", block.number(), block.hash()); /// } /// } /// ``` pub use pezkuwi_subxt_rpcs::client::reconnecting_rpc_client; } } /// Prevent the backend trait being implemented externally. #[doc(hidden)] pub(crate) mod sealed { pub trait Sealed {} } /// This trait exposes the interface that Subxt will use to communicate with /// a backend. Its goal is to be as minimal as possible. #[async_trait] pub trait Backend: sealed::Sealed + Send + Sync + 'static { /// Fetch values from storage. async fn storage_fetch_values( &self, keys: Vec>, at: HashFor, ) -> Result, BackendError>; /// Fetch keys underneath the given key from storage. async fn storage_fetch_descendant_keys( &self, key: Vec, at: HashFor, ) -> Result>, BackendError>; /// Fetch values underneath the given key from storage. async fn storage_fetch_descendant_values( &self, key: Vec, at: HashFor, ) -> Result, BackendError>; /// Fetch the genesis hash async fn genesis_hash(&self) -> Result, BackendError>; /// Get a block header async fn block_header(&self, at: HashFor) -> Result, BackendError>; /// Return the extrinsics found in the block. Each extrinsic is represented /// by a vector of bytes which has _not_ been SCALE decoded (in other words, the /// first bytes in the vector will decode to the compact encoded length of the extrinsic) async fn block_body(&self, at: HashFor) -> Result>>, BackendError>; /// Get the most recent finalized block hash. /// Note: needed only in blocks client for finalized block stream; can prolly be removed. async fn latest_finalized_block_ref(&self) -> Result>, BackendError>; /// Get information about the current runtime. async fn current_runtime_version(&self) -> Result; /// A stream of all new runtime versions as they occur. async fn stream_runtime_version(&self) -> Result, BackendError>; /// A stream of all new block headers as they arrive. async fn stream_all_block_headers( &self, hasher: T::Hasher, ) -> Result>)>, BackendError>; /// A stream of best block headers. async fn stream_best_block_headers( &self, hasher: T::Hasher, ) -> Result>)>, BackendError>; /// A stream of finalized block headers. async fn stream_finalized_block_headers( &self, hasher: T::Hasher, ) -> Result>)>, BackendError>; /// Submit a transaction. This will return a stream of events about it. async fn submit_transaction( &self, bytes: &[u8], ) -> Result>>, BackendError>; /// Make a call to some runtime API. async fn call( &self, method: &str, call_parameters: Option<&[u8]>, at: HashFor, ) -> Result, BackendError>; } /// helpful utility methods derived from those provided on [`Backend`] #[async_trait] pub trait BackendExt: Backend { /// Fetch a single value from storage. async fn storage_fetch_value( &self, key: Vec, at: HashFor, ) -> Result>, BackendError> { self.storage_fetch_values(vec![key], at) .await? .next() .await .transpose() .map(|o| o.map(|s| s.value)) } /// The same as a [`Backend::call()`], but it will also attempt to decode the /// result into the given type, which is a fairly common operation. async fn call_decoding( &self, method: &str, call_parameters: Option<&[u8]>, at: HashFor, ) -> Result { let bytes = self.call(method, call_parameters, at).await?; let res = D::decode(&mut &*bytes).map_err(BackendError::CouldNotScaleDecodeRuntimeResponse)?; Ok(res) } /// Return the metadata at some version. async fn metadata_at_version( &self, version: u32, at: HashFor, ) -> Result { let param = version.encode(); let opaque: Option = self .call_decoding("Metadata_metadata_at_version", Some(¶m), at) .await?; let Some(opaque) = opaque else { return Err(BackendError::MetadataVersionNotFound(version)); }; let metadata: Metadata = Decode::decode(&mut &opaque.0[..]).map_err(BackendError::CouldNotDecodeMetadata)?; Ok(metadata) } /// Return V14 metadata from the legacy `Metadata_metadata` call. async fn legacy_metadata(&self, at: HashFor) -> Result { let opaque: frame_metadata::OpaqueMetadata = self.call_decoding("Metadata_metadata", None, at).await?; let metadata: Metadata = Decode::decode(&mut &opaque.0[..]).map_err(BackendError::CouldNotDecodeMetadata)?; Ok(metadata) } } #[async_trait] impl + ?Sized, T: Config> BackendExt for B {} /// An opaque struct which, while alive, indicates that some references to a block /// still exist. This gives the backend the opportunity to keep the corresponding block /// details around for a while if it likes and is able to. No guarantees can be made about /// how long the corresponding details might be available for, but if no references to a block /// exist, then the backend is free to discard any details for it. #[derive(Clone)] pub struct BlockRef { hash: H, // We keep this around so that when it is dropped, it has the // opportunity to tell the backend. _pointer: Option>, } impl From for BlockRef { fn from(value: H) -> Self { BlockRef::from_hash(value) } } impl PartialEq for BlockRef { fn eq(&self, other: &Self) -> bool { self.hash == other.hash } } impl Eq for BlockRef {} // Manual implementation to work around https://github.com/mcarton/rust-derivative/issues/115. impl PartialOrd for BlockRef { fn partial_cmp(&self, other: &Self) -> Option { self.hash.partial_cmp(&other.hash) } } impl Ord for BlockRef { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.hash.cmp(&other.hash) } } impl std::fmt::Debug for BlockRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("BlockRef").field(&self.hash).finish() } } impl std::hash::Hash for BlockRef { fn hash(&self, state: &mut Hasher) { self.hash.hash(state); } } impl BlockRef { /// A [`BlockRef`] that doesn't reference a given block, but does have an associated hash. /// This is used in the legacy backend, which has no notion of pinning blocks. pub fn from_hash(hash: H) -> Self { Self { hash, _pointer: None, } } /// Construct a [`BlockRef`] from an instance of the underlying trait. It's expected /// that the [`Backend`] implementation will call this if it wants to track which blocks /// are potentially in use. pub fn new(hash: H, inner: P) -> Self { Self { hash, _pointer: Some(Arc::new(inner)), } } /// Return the hash of the referenced block. pub fn hash(&self) -> H where H: Copy, { self.hash } } /// A trait that a [`Backend`] can implement to know when some block /// can be unpinned: when this is dropped, there are no remaining references /// to the block that it's associated with. pub trait BlockRefT: Send + Sync + 'static {} /// A stream of some item. pub struct StreamOf(Pin + Send + 'static>>); impl Stream for StreamOf { type Item = T; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { self.0.poll_next_unpin(cx) } } impl std::fmt::Debug for StreamOf { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("StreamOf").field(&"").finish() } } impl StreamOf { /// Construct a new stream. pub fn new(inner: Pin + Send + 'static>>) -> Self { StreamOf(inner) } /// Returns the next item in the stream. This is just a wrapper around /// [`StreamExt::next()`] so that you can avoid the extra import. pub async fn next(&mut self) -> Option { StreamExt::next(self).await } } /// A stream of [`Result`]. pub type StreamOfResults = StreamOf>; /// The status of the transaction. /// /// If the status is [`TransactionStatus::InFinalizedBlock`], [`TransactionStatus::Error`], /// [`TransactionStatus::Invalid`] or [`TransactionStatus::Dropped`], then no future /// events will be emitted. #[derive(Debug, Clone, PartialEq, Eq)] pub enum TransactionStatus { /// Transaction is part of the future queue. Validated, /// The transaction has been broadcast to other nodes. Broadcasted, /// Transaction is no longer in a best block. NoLongerInBestBlock, /// Transaction has been included in block with given hash. InBestBlock { /// Block hash the transaction is in. hash: BlockRef, }, /// Transaction has been finalized by a finality-gadget, e.g GRANDPA InFinalizedBlock { /// Block hash the transaction is in. hash: BlockRef, }, /// Something went wrong in the node. Error { /// Human readable message; what went wrong. message: String, }, /// Transaction is invalid (bad nonce, signature etc). Invalid { /// Human readable message; why was it invalid. message: String, }, /// The transaction was dropped. Dropped { /// Human readable message; why was it dropped. message: String, }, } /// A response from calls like [`Backend::storage_fetch_values`] or /// [`Backend::storage_fetch_descendant_values`]. #[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Debug)] pub struct StorageResponse { /// The key. pub key: Vec, /// The associated value. pub value: Vec, } #[cfg(test)] mod test { use super::*; use crate::backend::StorageResponse; use core::convert::Infallible; use futures::StreamExt; use primitive_types::H256; use rpc::RpcClientT; use std::collections::{HashMap, VecDeque}; use pezkuwi_subxt_core::{Config, config::DefaultExtrinsicParams}; use pezkuwi_subxt_rpcs::client::{ MockRpcClient, mock_rpc_client::{Json, MockRpcClientBuilder}, }; fn random_hash() -> H256 { H256::random() } fn disconnected_will_reconnect() -> pezkuwi_subxt_rpcs::Error { pezkuwi_subxt_rpcs::Error::DisconnectedWillReconnect("..".into()) } fn storage_response>, V: Into>>(key: K, value: V) -> StorageResponse where Vec: From, { StorageResponse { key: key.into(), value: value.into(), } } // Define dummy config enum Conf {} impl Config for Conf { type AccountId = crate::utils::AccountId32; type Address = crate::utils::MultiAddress; type Signature = crate::utils::MultiSignature; type Hasher = crate::config::substrate::BlakeTwo256; type Header = crate::config::substrate::SubstrateHeader; type ExtrinsicParams = DefaultExtrinsicParams; type AssetId = u32; } mod legacy { use super::*; use crate::{ backend::legacy::{LegacyBackend, rpc_methods::RuntimeVersion}, error::RpcError, }; use crate::backend::Backend; fn client_runtime_version(num: u32) -> crate::client::RuntimeVersion { crate::client::RuntimeVersion { spec_version: num, transaction_version: num, } } fn runtime_version(num: u32) -> RuntimeVersion { RuntimeVersion { spec_version: num, transaction_version: num, other: HashMap::new(), } } #[tokio::test] async fn storage_fetch_values() { // Map from storage key to responses, given out in order, when that key is requested. let mut values: HashMap<&str, VecDeque<_>> = HashMap::from_iter([ ( "ID1", VecDeque::from_iter([ Err(disconnected_will_reconnect()), Ok(Json(hex::encode("Data1"))), ]), ), ( "ID2", VecDeque::from_iter([ Err(disconnected_will_reconnect()), Ok(Json(hex::encode("Data2"))), ]), ), ("ID3", VecDeque::from_iter([Ok(Json(hex::encode("Data3")))])), ]); let rpc_client = MockRpcClient::builder() .method_handler("state_getStorage", move |params| { // Decode the storage key as first item from sequence of params: let params = params.map(|p| p.get().to_string()); let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); let key: sp_core::Bytes = rpc_params.sequence().next().unwrap(); let key = std::str::from_utf8(&key.0).unwrap(); // Fetch the response to use from our map, popping it from the front. let values = values.get_mut(key).unwrap(); let value = values.pop_front().unwrap(); async move { value } }) .build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); let response = backend .storage_fetch_values( ["ID1".into(), "ID2".into(), "ID3".into()].into(), random_hash(), ) .await .unwrap(); let response = response .map(|x| x.unwrap()) .collect::>() .await; let expected = vec![ storage_response("ID1", "Data1"), storage_response("ID2", "Data2"), storage_response("ID3", "Data3"), ]; assert_eq!(expected, response) } #[tokio::test] async fn storage_fetch_value() { let rpc_client = MockRpcClient::builder() .method_handler_once("state_getStorage", async move |_params| { // Return "disconnected" error on first call Err::(disconnected_will_reconnect()) }) .method_handler_once("state_getStorage", async move |_param| { // Return some hex encoded storage value on the next one Json(hex::encode("Data1")) }) .build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); let response = backend .storage_fetch_value("ID1".into(), random_hash()) .await .unwrap(); let response = response.unwrap(); assert_eq!("Data1".to_owned(), String::from_utf8(response).unwrap()) } /// This test should cover the logic of the following methods: /// - `genesis_hash` /// - `block_header` /// - `block_body` /// - `latest_finalized_block` /// - `current_runtime_version` /// - `current_runtime_version` /// - `call` /// The test covers them because they follow the simple pattern of: /// ```rust,no_run,standalone_crate /// async fn THE_THING(&self) -> Result, BackendError> { /// retry(|| ).await /// } /// ``` #[tokio::test] async fn simple_fetch() { let hash = random_hash(); let rpc_client = MockRpcClient::builder() .method_handler_once("chain_getBlockHash", async move |_params| { // Return "disconnected" error on first call Err::(disconnected_will_reconnect()) }) .method_handler_once("chain_getBlockHash", async move |_params| { // Return the blockhash on next call Json(hash) }) .build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); let response = backend.genesis_hash().await.unwrap(); assert_eq!(hash, response) } /// This test should cover the logic of the following methods: /// - `stream_runtime_version` /// - `stream_all_block_headers` /// - `stream_best_block_headers` /// The test covers them because they follow the simple pattern of: /// ```rust,no_run,standalone_crate /// async fn stream_the_thing( /// &self, /// ) -> Result>)>, BackendError> { /// let methods = self.methods.clone(); /// let retry_sub = retry_stream(move || { /// let methods = methods.clone(); /// Box::pin(async move { /// methods.do_the_thing().await? /// }); /// Ok(StreamOf(Box::pin(sub))) /// }) /// }) /// .await?; /// Ok(retry_sub) /// } /// ``` #[tokio::test] async fn stream_simple() { // Each time the subscription is called, it will pop the first set // of values from this and return them one after the other. let mut data = VecDeque::from_iter([ vec![ Ok(Json(runtime_version(0))), Err(disconnected_will_reconnect()), Ok(Json(runtime_version(1))), ], vec![ Err(disconnected_will_reconnect()), Ok(Json(runtime_version(2))), Ok(Json(runtime_version(3))), ], vec![ Ok(Json(runtime_version(4))), Ok(Json(runtime_version(5))), Err(pezkuwi_subxt_rpcs::Error::Client("..".into())), ], ]); let rpc_client = MockRpcClient::builder() .subscription_handler("state_subscribeRuntimeVersion", move |_params, _unsub| { let res = data.pop_front().unwrap(); async move { res } }) .build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); let mut results = backend.stream_runtime_version().await.unwrap(); assert_eq!( results.next().await.unwrap().unwrap(), client_runtime_version(0) ); assert_eq!( results.next().await.unwrap().unwrap(), client_runtime_version(4) ); assert_eq!( results.next().await.unwrap().unwrap(), client_runtime_version(5) ); assert!(matches!( results.next().await.unwrap(), Err(BackendError::Rpc(RpcError::ClientError( pezkuwi_subxt_rpcs::Error::Client(_) ))) )); assert!(results.next().await.is_none()); } } mod unstable_backend { use pezkuwi_subxt_rpcs::methods::chain_head::{ self, Bytes, Initialized, MethodResponse, MethodResponseStarted, OperationError, OperationId, OperationStorageItems, RuntimeSpec, RuntimeVersionEvent, }; use tokio::select; use super::chain_head::*; use super::*; fn build_backend( rpc_client: impl RpcClientT, ) -> (ChainHeadBackend, ChainHeadBackendDriver) { let (backend, driver): (ChainHeadBackend, _) = ChainHeadBackend::builder().build(rpc_client); (backend, driver) } fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> ChainHeadBackend { ChainHeadBackend::builder().build_with_background_driver(rpc_client) } fn runtime_spec() -> RuntimeSpec { let spec = serde_json::json!({ "specName": "westend", "implName": "parity-westend", "specVersion": 9122, "implVersion": 0, "transactionVersion": 7, "apis": { "0xdf6acb689907609b": 3, "0x37e397fc7c91f5e4": 1, "0x40fe3ad401f8959a": 5, "0xd2bc9897eed08f15": 3, "0xf78b278be53f454c": 2, "0xaf2c0297a23e6d3d": 1, "0x49eaaf1b548a0cb0": 1, "0x91d5df18b0d2cf58": 1, "0xed99c5acb25eedf5": 3, "0xcbca25e39f142387": 2, "0x687ad44ad37f03c2": 1, "0xab3c0572291feb8b": 1, "0xbc9d89904f5b923f": 1, "0x37c8bb1350a9a2a8": 1 } }); serde_json::from_value(spec).expect("Mock runtime spec should be the right shape") } type FollowEvent = chain_head::FollowEvent>; /// Build a mock client which can handle `chainHead_v1_follow` subscriptions. /// Messages from the provided receiver are sent to the latest active subscription. fn mock_client_builder( recv: tokio::sync::mpsc::UnboundedReceiver, ) -> MockRpcClientBuilder { mock_client_builder_with_ids(recv, 0..) } fn mock_client_builder_with_ids( recv: tokio::sync::mpsc::UnboundedReceiver, ids: I, ) -> MockRpcClientBuilder where I: IntoIterator + Send, I::IntoIter: Send + Sync + 'static, { use pezkuwi_subxt_rpcs::client::mock_rpc_client::AndThen; use pezkuwi_subxt_rpcs::{Error, UserError}; let recv = Arc::new(tokio::sync::Mutex::new(recv)); let mut ids = ids.into_iter(); MockRpcClient::builder().subscription_handler( "chainHead_v1_follow", move |_params, _unsub| { let recv = recv.clone(); let id = ids.next(); // For each new follow subscription, we take messages from `recv` and pipe them to the output // for the subscription (after an Initialized event). if the output is dropped/closed, we stop pulling // messages from `recv`, waiting for a new chainHEad_v1_follow subscription. let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); tokio::spawn(async move { let mut recv_guard = recv.lock().await; loop { select! { // Channel closed, so stop pulling from `recv`. _ = tx.closed() => { break }, // Relay messages from `recv` unless some error sending. Some(msg) = recv_guard.recv() => { if tx.send(Json(msg)).is_err() { break } } } } }); async move { if let Some(id) = id { let follow_event = FollowEvent::Initialized(Initialized::> { finalized_block_hashes: vec![random_hash()], finalized_block_runtime: Some(chain_head::RuntimeEvent::Valid( RuntimeVersionEvent { spec: runtime_spec(), }, )), }); let res = AndThen( // First send an initialized event with new ID (vec![Json(follow_event)], subscription_id(id)), // Next, send any events provided via the recv channel rx, ); Ok(res) } else { // Ran out of subscription IDs; return an error. Err(Error::User(UserError::method_not_found())) } } }, ) } fn subscription_id(id: usize) -> String { format!("chainHeadFollowSubscriptionId{id}") } fn response_started(id: &str) -> MethodResponse { MethodResponse::Started(MethodResponseStarted { operation_id: id.to_owned(), discarded_items: None, }) } fn operation_error(id: &str) -> FollowEvent { FollowEvent::OperationError(OperationError { operation_id: id.to_owned(), error: "error".to_owned(), }) } fn limit_reached() -> MethodResponse { MethodResponse::LimitReached } fn storage_done(id: &str) -> FollowEvent { FollowEvent::OperationStorageDone(OperationId { operation_id: id.to_owned(), }) } fn storage_result(key: &str, value: &str) -> chain_head::StorageResult { chain_head::StorageResult { key: Bytes(key.to_owned().into()), result: chain_head::StorageResultType::Value(Bytes(value.to_owned().into())), } } fn storage_items(id: &str, items: &[chain_head::StorageResult]) -> FollowEvent { FollowEvent::OperationStorageItems(OperationStorageItems { operation_id: id.to_owned(), items: VecDeque::from(items.to_owned()), }) } fn operation_continue(id: &str) -> FollowEvent { FollowEvent::OperationWaitingForContinue(OperationId { operation_id: id.to_owned(), }) } fn follow_event_stop() -> FollowEvent { FollowEvent::Stop } #[tokio::test] async fn storage_fetch_values_returns_stream_with_single_error() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let rpc_client = mock_client_builder(rx) .method_handler_once("chainHead_v1_storage", move |_params| { tokio::spawn(async move { // Wait a little and then send an error response on the // chainHead_follow subscription: tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; tx.send(operation_error("Id1")).unwrap(); }); async move { Json(response_started("Id1")) } }) .build(); let backend = build_backend_spawn_background(rpc_client); // Test // This request should encounter an error. let mut response = backend .storage_fetch_values( ["ID1".into(), "ID2".into(), "ID3".into()].into(), random_hash(), ) .await .unwrap(); assert!( response .next() .await .unwrap() .is_err_and(|e| matches!(e, BackendError::Other(e) if e == "error")) ); assert!(response.next().await.is_none()); } /// Tests that the method will retry on failed query #[tokio::test] async fn storage_fetch_values_retry_query() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let rpc_client = mock_client_builder(rx) .method_handler_once("chainHead_v1_storage", async move |_params| { // First call; return DisconnectedWillReconnect Err::(disconnected_will_reconnect()) }) .method_handler_once("chainHead_v1_storage", async move |_params| { // Otherwise, return that we'll start sending a response, and spawn // task to send the relevant response via chainHead_follow. tokio::spawn(async move { tx.send(storage_items( "Id1", &[ storage_result("ID1", "Data1"), storage_result("ID2", "Data2"), storage_result("ID3", "Data3"), ], )) .unwrap(); tx.send(storage_done("Id1")).unwrap(); }); Ok(Json(response_started("Id1"))) }) .build(); // Despite DisconnectedWillReconnect we try again transparently // and get the data we asked for. let backend = build_backend_spawn_background(rpc_client); let response = backend .storage_fetch_values( ["ID1".into(), "ID2".into(), "ID3".into()].into(), random_hash(), ) .await .unwrap(); let response = response .map(|x| x.unwrap()) .collect::>() .await; assert_eq!( vec![ storage_response("ID1", "Data1"), storage_response("ID2", "Data2"), storage_response("ID3", "Data3"), ], response ) } #[tokio::test] async fn storage_fetch_values_retry_chainhead_continue() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let tx2 = tx.clone(); let rpc_client = mock_client_builder(rx) .method_handler_once("chainHead_v1_storage", async move |_params| { // First call; return DisconnectedWillReconnect Err::(disconnected_will_reconnect()) }) .method_handler_once("chainHead_v1_storage", async move |_params| { // Next call, return a storage item and then a "waiting for continue". tokio::spawn(async move { tx.send(storage_items("Id1", &[storage_result("ID1", "Data1")])) .unwrap(); tx.send(operation_continue("Id1")).unwrap(); }); Ok(Json(response_started("Id1"))) }) .method_handler_once("chainHead_v1_continue", async move |_params| { // First call; return DisconnectedWillReconnect Err::(disconnected_will_reconnect()) }) .method_handler_once("chainHead_v1_continue", async move |_params| { // Next call; acknowledge the "continue" and return remaining storage items. tokio::spawn(async move { tx2.send(storage_items("Id1", &[storage_result("ID2", "Data2")])) .unwrap(); tx2.send(storage_items("Id1", &[storage_result("ID3", "Data3")])) .unwrap(); tx2.send(storage_done("Id1")).unwrap(); }); Ok(Json(())) }) .build(); let backend = build_backend_spawn_background(rpc_client); // We should success, transparently handling `continue`s and `DisconnectWillReconnects`. let response = backend .storage_fetch_values( ["ID1".into(), "ID2".into(), "ID3".into()].into(), random_hash(), ) .await .unwrap(); let response = response .map(|x| x.unwrap()) .collect::>() .await; assert_eq!( vec![ storage_response("ID1", "Data1"), storage_response("ID2", "Data2"), storage_response("ID3", "Data3"), ], response ) } #[tokio::test] async fn simple_fetch() { let hash = random_hash(); let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); let rpc_client = mock_client_builder(rx) .method_handler_once("chainSpec_v1_genesisHash", async move |_params| { // First call, return disconnected error. Err::(disconnected_will_reconnect()) }) .method_handler_once("chainSpec_v1_genesisHash", async move |_params| { // Next call, return the hash. Ok(Json(hash)) }) .build(); // Test // This request should encounter an error on `request` and do a retry. let backend = build_backend_spawn_background(rpc_client); let response_hash = backend.genesis_hash().await.unwrap(); assert_eq!(hash, response_hash) } // Check that the backend will resubscribe on Stop, and handle a change in subscription ID. // see https://github.com/paritytech/subxt/issues/1567 #[tokio::test] async fn stale_subscription_id_failure() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let rpc_client = mock_client_builder_with_ids(rx, [1, 2]) .method_handler("chainHead_v1_storage", move |params| { // Decode the follow subscription ID which is the first param. let this_sub_id = { let params = params.as_ref().map(|p| p.get()); let rpc_params = jsonrpsee::types::Params::new(params); rpc_params.sequence().next::().unwrap() }; // While it's equal to `subscription_id(1)`, it means we are seeing the first // chainHead_follow subscription ID. error until we see an updated ID. let is_wrong_sub_id = this_sub_id == subscription_id(1); async move { if is_wrong_sub_id { Json(limit_reached()) } else { Json(response_started("some_id")) } } }) .build(); let (backend, mut driver): (ChainHeadBackend, _) = build_backend(rpc_client); // Send a "FollowEvent::Stop" via chainhead_follow, and advance the driver just enough // that this message has been processed. tx.send(follow_event_stop()).unwrap(); let _ = driver.next().await.unwrap(); // If we make a storage call at this point, we'll still be passing the "old" subscription // ID, because the driver hasn't advanced enough to start a new chainhead_follow subscription, // and will therefore fail with a "limit reached" response (to emulate what would happen if // the chainHead_v1_storage call was made with the wrong subscription ID). let response = backend .storage_fetch_values(["ID1".into()].into(), random_hash()) .await; assert!(matches!(response, Err(e) if e.is_rpc_limit_reached())); // Advance the driver until a new chainHead_follow subscription has been started up. let _ = driver.next().await.unwrap(); let _ = driver.next().await.unwrap(); let _ = driver.next().await.unwrap(); // Now, the ChainHeadBackend will use a new subscription ID and work. (If the driver // advanced in the background automatically, this would happen automatically for us). let response = backend .storage_fetch_values(["ID1".into()].into(), random_hash()) .await; assert!(response.is_ok()); } } }