diff --git a/rpcs/src/methods/chain_head.rs b/rpcs/src/methods/chain_head.rs index 6b7c958172..6bebe73803 100644 --- a/rpcs/src/methods/chain_head.rs +++ b/rpcs/src/methods/chain_head.rs @@ -179,7 +179,7 @@ impl ChainHeadRpcMethods { Ok(response) } - /// Call the `chainHead_v1_storage` method and return an operation ID to obtain the runtime API result. + /// Call the `chainHead_v1_call` method and return an operation ID to obtain the runtime API result. /// /// The response events are provided on the `chainHead_follow` subscription and identified by /// the returned operation ID. @@ -224,20 +224,16 @@ impl ChainHeadRpcMethods { /// Return the genesis hash. pub async fn chainspec_v1_genesis_hash(&self) -> Result { - let hash = self - .client + self.client .request("chainSpec_v1_genesisHash", rpc_params![]) - .await?; - Ok(hash) + .await } /// Return a string containing the human-readable name of the chain. pub async fn chainspec_v1_chain_name(&self) -> Result { - let hash = self - .client + self.client .request("chainSpec_v1_chainName", rpc_params![]) - .await?; - Ok(hash) + .await } /// Returns the JSON payload found in the chain specification under the key properties. @@ -295,6 +291,132 @@ impl ChainHeadRpcMethods { .request("transaction_v1_stop", rpc_params![operation_id]) .await } + + /// Fetch the block body (ie the extrinsics in the block) given its hash. + /// + /// Returns an array of the hexadecimal-encoded scale-encoded extrinsics found in the block, + /// or `None` if the block wasn't found. + pub async fn archive_unstable_body( + &self, + block_hash: T::Hash, + ) -> Result>, Error> { + self.client + .request("archive_unstable_body", rpc_params![block_hash]) + .await + } + + /// Call the `archive_unstable_call` method and return the response. + pub async fn archive_unstable_call( + &self, + block_hash: T::Hash, + function: &str, + call_parameters: &[u8], + ) -> Result { + use serde::de::Error as _; + + // We deserialize to this intermediate shape, since + // we can't have a boolean tag to denote variants. + #[derive(Deserialize)] + struct Response { + success: bool, + value: Option, + error: Option, + // This was accidentally used instead of value in Substrate, + // so to support those impls we try it here if needed: + result: Option, + } + + let res: Response = self + .client + .request( + "archive_unstable_call", + rpc_params![block_hash, function, to_hex(call_parameters)], + ) + .await?; + + let value = res.value.or(res.result); + match (res.success, value, res.error) { + (true, Some(value), _) => Ok(ArchiveCallResult::Success(value)), + (false, _, err) => Ok(ArchiveCallResult::Error(err.unwrap_or(String::new()))), + (true, None, _) => { + let m = "archive_unstable_call: 'success: true' response should have `value: 0x1234` alongside it"; + Err(Error::Deserialization(serde_json::Error::custom(m))) + } + } + } + + /// Return the finalized block height of the chain. + pub async fn archive_unstable_finalized_height(&self) -> Result { + self.client + .request("archive_unstable_finalizedHeight", rpc_params![]) + .await + } + + /// Return the genesis hash. + pub async fn archive_unstable_genesis_hash(&self) -> Result { + self.client + .request("archive_unstable_genesisHash", rpc_params![]) + .await + } + + /// Given a block height, return the hashes of the zero or more blocks at that height. + /// For blocks older than the latest finalized block, only one entry will be returned. For blocks + /// newer than the latest finalized block, it's possible to have 0, 1 or multiple blocks at + /// that height given that forks could occur. + pub async fn archive_unstable_hash_by_height( + &self, + height: usize, + ) -> Result, Error> { + self.client + .request("archive_unstable_hashByHeight", rpc_params![height]) + .await + } + + /// Fetch the header for a block with the given hash, or `None` if no block with that hash exists. + pub async fn archive_unstable_header( + &self, + block_hash: T::Hash, + ) -> Result, Error> { + let maybe_encoded_header: Option = self + .client + .request("archive_unstable_header", rpc_params![block_hash]) + .await?; + + let Some(encoded_header) = maybe_encoded_header else { + return Ok(None); + }; + + let header = + ::decode(&mut &*encoded_header.0).map_err(Error::Decode)?; + Ok(Some(header)) + } + + /// Query the node storage and return a subscription which streams corresponding storage events back. + pub async fn archive_unstable_storage( + &self, + block_hash: T::Hash, + items: impl IntoIterator>, + child_key: Option<&[u8]>, + ) -> Result, Error> { + let items: Vec> = items + .into_iter() + .map(|item| StorageQuery { + key: to_hex(item.key), + query_type: item.query_type, + }) + .collect(); + + let sub = self + .client + .subscribe( + "archive_unstable_storage", + rpc_params![block_hash, items, child_key.map(to_hex)], + "archive_unstable_stopStorage", + ) + .await?; + + Ok(ArchiveStorageSubscription { sub, done: false }) + } } /// This represents events generated by the `follow` method. @@ -754,6 +876,140 @@ pub struct TransactionBlockDetails { pub index: u64, } +/// The response from calling `archive_call`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ArchiveCallResult { + /// The bytes returned from successfully making a call + Success(Bytes), + /// An error returned if the call was not successful. + Error(String), +} + +impl ArchiveCallResult { + /// Return the bytes on success, or `None` if not an [`ArchiveCallResult::Success`]. + pub fn as_success(self) -> Option { + match self { + ArchiveCallResult::Success(bytes) => Some(bytes), + _ => None, + } + } + + /// Return the error message on call failure, or `None` if not an [`ArchiveCallResult::Error`]. + pub fn as_error(self) -> Option { + match self { + ArchiveCallResult::Success(_) => None, + ArchiveCallResult::Error(e) => Some(e), + } + } +} + +/// A subscription which returns follow events, and ends when a Stop event occurs. +pub struct ArchiveStorageSubscription { + sub: RpcSubscription>, + done: bool, +} + +impl ArchiveStorageSubscription { + /// Fetch the next item in the stream. + pub async fn next(&mut self) -> Option<::Item> { + ::next(self).await + } + /// Fetch the subscription ID for the stream. + pub fn subscription_id(&self) -> Option<&str> { + self.sub.subscription_id() + } +} + +impl Stream for ArchiveStorageSubscription { + type Item = > as Stream>::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.done { + return Poll::Ready(None); + } + + let res = self.sub.poll_next_unpin(cx); + + if let Poll::Ready(Some(Ok(ArchiveStorageEvent::Done | ArchiveStorageEvent::Error(..)))) = + &res + { + // No more events will occur after "done" or "error" events. + self.done = true; + } + + res + } +} + +/// Responses returned from [`ArchiveStorageSubscription`]. +#[derive(Debug, Deserialize)] +#[serde(tag = "event")] +pub enum ArchiveStorageEvent { + /// A storage response for one of the requested items. + #[serde(rename = "storage")] + Item(ArchiveStorageEventItem), + /// A human-readable error indicating what went wrong. No more storage events + /// will be emitted after this. + #[serde(rename = "storageError")] + Error(ArchiveStorageEventError), + /// No more storage events will be emitted after this. + #[serde(rename = "storageDone")] + Done, +} + +impl ArchiveStorageEvent { + /// Return a storage item or `None` if not an [`ArchiveStorageEvent::Item`]. + pub fn as_item(self) -> Option> { + match self { + ArchiveStorageEvent::Item(item) => Some(item), + _ => None, + } + } + + /// Return a storage error or `None` if not an [`ArchiveStorageEvent::Error`]. + pub fn as_error(self) -> Option { + match self { + ArchiveStorageEvent::Error(e) => Some(e), + _ => None, + } + } + + /// Is this an [`ArchiveStorageEvent::Done`]. + pub fn is_done(self) -> bool { + matches!(self, ArchiveStorageEvent::Done) + } +} + +/// Something went wrong during the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ArchiveStorageEventError { + /// The human readable error message indicating what went wrong. + pub error: String, +} + +/// A storage item returned from the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ArchiveStorageEventItem { + /// String containing the hexadecimal-encoded key of the storage entry. + pub key: Bytes, + /// String containing the hexadecimal-encoded value of the storage entry. + /// Returned when the request type is [`StorageQueryType::Value`] or [`StorageQueryType::DescendantsValues`]. + pub value: Option, + /// String containing the hexadecimal-encoded hash of the storage entry. + /// Returned when the request type is [`StorageQueryType::Hash`] or [`StorageQueryType::DescendantsHashes`]. + pub hash: Option, + /// String containing the hexadecimal-encoded Merkle value of the closest descendant of key (including branch nodes). + /// Returned when the request type is [`StorageQueryType::ClosestDescendantMerkleValue`]. + pub closest_descendant_merkle_value: Option, + /// String containing the hexadecimal-encoded key of the child trie of the "default" namespace if the storage entry + /// is part of a child trie. If the storage entry is part of the main trie, this field is not present. + pub child_trie_key: Option, +} + /// Hex-serialized shim for `Vec`. #[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Hash, PartialOrd, Ord, Debug)] pub struct Bytes(#[serde(with = "impl_serde::serialize")] pub Vec); diff --git a/testing/integration-tests/src/full_client/client/archive_rpcs.rs b/testing/integration-tests/src/full_client/client/archive_rpcs.rs new file mode 100644 index 0000000000..6e77f244cd --- /dev/null +++ b/testing/integration-tests/src/full_client/client/archive_rpcs.rs @@ -0,0 +1,239 @@ +// Copyright 2019-2025 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! Just sanity checking some of the new RPC methods to try and +//! catch differences as the implementations evolve. + +use crate::{ + subxt_test, test_context, + utils::{node_runtime, TestNodeProcess}, +}; +use codec::Encode; +use futures::{Stream, StreamExt}; +use subxt::{ + blocks::Block, + client::OnlineClient, + config::{Config, Hasher}, + utils::AccountId32, + SubstrateConfig, +}; +use subxt_rpcs::methods::chain_head::{ + ArchiveStorageEventItem, Bytes, FollowEvent, Initialized, MethodResponse, RuntimeEvent, + RuntimeVersionEvent, StorageQuery, StorageQueryType, +}; + +use subxt_signer::sr25519::dev; + +async fn fetch_finalized_blocks( + ctx: &TestNodeProcess, + n: usize, +) -> impl Stream>> { + ctx.client() + .blocks() + .subscribe_finalized() + .await + .expect("issue subscribing to finalized in fetch_finalized_blocks") + .take(n) + .map(|r| r.expect("issue fetching block in fetch_finalized_blocks")) +} + +#[subxt_test] +async fn archive_unstable_body() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + let mut blocks = fetch_finalized_blocks(&ctx, 3).await; + + while let Some(block) = blocks.next().await { + let subxt_block_bodies = block + .extrinsics() + .await + .unwrap() + .iter() + .map(|e| e.bytes().to_vec()); + let archive_block_bodies = rpc + .archive_unstable_body(block.hash()) + .await + .unwrap() + .into_iter() + .flatten() + .map(|e| e.0); + + // chainHead and archive methods should return same block bodies + for (a, b) in subxt_block_bodies.zip(archive_block_bodies) { + assert_eq!(a, b); + } + } +} + +#[subxt_test] +async fn archive_unstable_call() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + let mut blocks = fetch_finalized_blocks(&ctx, 3).await; + + while let Some(block) = blocks.next().await { + let subxt_metadata_versions = block + .runtime_api() + .await + .unwrap() + .call(node_runtime::apis().metadata().metadata_versions()) + .await + .unwrap() + .encode(); + let archive_metadata_versions = rpc + .archive_unstable_call(block.hash(), "Metadata_metadata_versions", &[]) + .await + .unwrap() + .as_success() + .unwrap() + .0; + + assert_eq!(subxt_metadata_versions, archive_metadata_versions); + } +} + +#[subxt_test] +async fn archive_unstable_finalized_height() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + let mut blocks = fetch_finalized_blocks(&ctx, 3).await; + + while let Some(block) = blocks.next().await { + let subxt_block_height = block.number() as usize; + let archive_block_height = rpc.archive_unstable_finalized_height().await.unwrap(); + + // Note: may be prone to race if call is super slow for some reason, since a new + // block may have been finalized since subxt reported it. + assert_eq!(subxt_block_height, archive_block_height); + } +} + +#[subxt_test] +async fn archive_unstable_genesis_hash() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + + let chain_head_genesis_hash = rpc.chainspec_v1_genesis_hash().await.unwrap(); + let archive_genesis_hash = rpc.archive_unstable_genesis_hash().await.unwrap(); + + assert_eq!(chain_head_genesis_hash, archive_genesis_hash); +} + +#[subxt_test] +async fn archive_unstable_hash_by_height() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + let mut blocks = fetch_finalized_blocks(&ctx, 3).await; + + while let Some(block) = blocks.next().await { + let subxt_block_height = block.number() as usize; + let subxt_block_hash = block.hash(); + + let archive_block_hash = rpc + .archive_unstable_hash_by_height(subxt_block_height) + .await + .unwrap(); + + // Should only ever be 1 finalized block hash. + assert_eq!(archive_block_hash.len(), 1); + assert_eq!(subxt_block_hash, archive_block_hash[0]); + } +} + +#[subxt_test] +async fn archive_unstable_header() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + let mut blocks = fetch_finalized_blocks(&ctx, 3).await; + + while let Some(block) = blocks.next().await { + let block_hash = block.hash(); + + let subxt_block_header = block.header(); + let archive_block_header = rpc + .archive_unstable_header(block_hash) + .await + .unwrap() + .unwrap(); + + assert_eq!(subxt_block_header, &archive_block_header); + } +} + +#[subxt_test] +async fn archive_unstable_storage() { + let ctx = test_context().await; + let rpc = ctx.chainhead_rpc_methods().await; + let api = ctx.client(); + let mut blocks = fetch_finalized_blocks(&ctx, 3).await; + + while let Some(block) = blocks.next().await { + let block_hash = block.hash(); + let account_info_addr = { + let alice: AccountId32 = dev::alice().public_key().into(); + let addr = node_runtime::storage().system().account(alice); + api.storage().address_bytes(&addr).unwrap() + }; + + // Fetch raw value using Subxt to compare against + let subxt_account_info = api + .storage() + .at(block.reference()) + .fetch_raw(account_info_addr.clone()) + .await + .unwrap() + .unwrap(); + + // Construct archive query; ask for item then hash of item. + let storage_query = vec![ + StorageQuery { + key: account_info_addr.as_slice(), + query_type: StorageQueryType::Value, + }, + StorageQuery { + key: account_info_addr.as_slice(), + query_type: StorageQueryType::Hash, + }, + ]; + + let mut res = rpc + .archive_unstable_storage(block_hash, storage_query, None) + .await + .unwrap(); + + // Expect item back first in archive response + let query_item = res.next().await.unwrap().unwrap().as_item().unwrap(); + + assert_eq!( + query_item, + ArchiveStorageEventItem { + key: Bytes(account_info_addr.clone()), + value: Some(Bytes(subxt_account_info.clone())), + hash: None, + closest_descendant_merkle_value: None, + child_trie_key: None + } + ); + + // Expect item hash back next + let query_item_hash = res.next().await.unwrap().unwrap().as_item().unwrap(); + + assert_eq!( + query_item_hash, + ArchiveStorageEventItem { + key: Bytes(account_info_addr), + value: None, + hash: Some(::Hasher::hash( + &subxt_account_info + )), + closest_descendant_merkle_value: None, + child_trie_key: None + } + ); + + // Expect nothing else back after + assert!(res.next().await.unwrap().unwrap().is_done()); + assert!(res.next().await.is_none()); + } +} diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/chain_head_rpcs.rs similarity index 100% rename from testing/integration-tests/src/full_client/client/unstable_rpcs.rs rename to testing/integration-tests/src/full_client/client/chain_head_rpcs.rs diff --git a/testing/integration-tests/src/full_client/client/mod.rs b/testing/integration-tests/src/full_client/client/mod.rs index 62aa124893..197f6086c6 100644 --- a/testing/integration-tests/src/full_client/client/mod.rs +++ b/testing/integration-tests/src/full_client/client/mod.rs @@ -18,10 +18,12 @@ use subxt::{ }; use subxt_signer::sr25519::dev; +#[cfg(fullclient)] +mod archive_rpcs; #[cfg(fullclient)] mod legacy_rpcs; -mod unstable_rpcs; +mod chain_head_rpcs; #[cfg(fullclient)] #[subxt_test] diff --git a/testing/substrate-runner/src/lib.rs b/testing/substrate-runner/src/lib.rs index e2d9a2a022..e3506c1842 100644 --- a/testing/substrate-runner/src/lib.rs +++ b/testing/substrate-runner/src/lib.rs @@ -130,7 +130,10 @@ impl SubstrateNodeBuilder { .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .arg("--dev") - .arg("--port=0"); + .arg("--port=0") + // To test archive_* RPC-v2 methods we need the node in archive mode: + .arg("--blocks-pruning=archive-canonical") + .arg("--state-pruning=archive-canonical"); for (key, val) in custom_flags { let arg = match val {