Add archive RPCs to subxt-rpcs (#1940)

* Add archive RPCs to subxt-rpcs

* Add tests for archive RPCs

* ".unwrap_*" to ".as_*"

* clippy

* Add proper unsub method for archive_unstable_storage
This commit is contained in:
James Wilson
2025-03-05 15:23:50 +00:00
committed by GitHub
parent e59eef21b4
commit 23c0651c57
5 changed files with 511 additions and 11 deletions
+265 -9
View File
@@ -179,7 +179,7 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> {
Ok(response)
}
/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the runtime API result.
/// Call the `chainHead_v1_call` method and return an operation ID to obtain the runtime API result.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
@@ -224,20 +224,16 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> {
/// Return the genesis hash.
pub async fn chainspec_v1_genesis_hash(&self) -> Result<T::Hash, Error> {
let hash = self
.client
self.client
.request("chainSpec_v1_genesisHash", rpc_params![])
.await?;
Ok(hash)
.await
}
/// Return a string containing the human-readable name of the chain.
pub async fn chainspec_v1_chain_name(&self) -> Result<String, Error> {
let hash = self
.client
self.client
.request("chainSpec_v1_chainName", rpc_params![])
.await?;
Ok(hash)
.await
}
/// Returns the JSON payload found in the chain specification under the key properties.
@@ -295,6 +291,132 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> {
.request("transaction_v1_stop", rpc_params![operation_id])
.await
}
/// Fetch the block body (ie the extrinsics in the block) given its hash.
///
/// Returns an array of the hexadecimal-encoded scale-encoded extrinsics found in the block,
/// or `None` if the block wasn't found.
pub async fn archive_unstable_body(
&self,
block_hash: T::Hash,
) -> Result<Option<Vec<Bytes>>, Error> {
self.client
.request("archive_unstable_body", rpc_params![block_hash])
.await
}
/// Call the `archive_unstable_call` method and return the response.
pub async fn archive_unstable_call(
&self,
block_hash: T::Hash,
function: &str,
call_parameters: &[u8],
) -> Result<ArchiveCallResult, Error> {
use serde::de::Error as _;
// We deserialize to this intermediate shape, since
// we can't have a boolean tag to denote variants.
#[derive(Deserialize)]
struct Response {
success: bool,
value: Option<Bytes>,
error: Option<String>,
// This was accidentally used instead of value in Substrate,
// so to support those impls we try it here if needed:
result: Option<Bytes>,
}
let res: Response = self
.client
.request(
"archive_unstable_call",
rpc_params![block_hash, function, to_hex(call_parameters)],
)
.await?;
let value = res.value.or(res.result);
match (res.success, value, res.error) {
(true, Some(value), _) => Ok(ArchiveCallResult::Success(value)),
(false, _, err) => Ok(ArchiveCallResult::Error(err.unwrap_or(String::new()))),
(true, None, _) => {
let m = "archive_unstable_call: 'success: true' response should have `value: 0x1234` alongside it";
Err(Error::Deserialization(serde_json::Error::custom(m)))
}
}
}
/// Return the finalized block height of the chain.
pub async fn archive_unstable_finalized_height(&self) -> Result<usize, Error> {
self.client
.request("archive_unstable_finalizedHeight", rpc_params![])
.await
}
/// Return the genesis hash.
pub async fn archive_unstable_genesis_hash(&self) -> Result<T::Hash, Error> {
self.client
.request("archive_unstable_genesisHash", rpc_params![])
.await
}
/// Given a block height, return the hashes of the zero or more blocks at that height.
/// For blocks older than the latest finalized block, only one entry will be returned. For blocks
/// newer than the latest finalized block, it's possible to have 0, 1 or multiple blocks at
/// that height given that forks could occur.
pub async fn archive_unstable_hash_by_height(
&self,
height: usize,
) -> Result<Vec<T::Hash>, Error> {
self.client
.request("archive_unstable_hashByHeight", rpc_params![height])
.await
}
/// Fetch the header for a block with the given hash, or `None` if no block with that hash exists.
pub async fn archive_unstable_header(
&self,
block_hash: T::Hash,
) -> Result<Option<T::Header>, Error> {
let maybe_encoded_header: Option<Bytes> = self
.client
.request("archive_unstable_header", rpc_params![block_hash])
.await?;
let Some(encoded_header) = maybe_encoded_header else {
return Ok(None);
};
let header =
<T::Header as codec::Decode>::decode(&mut &*encoded_header.0).map_err(Error::Decode)?;
Ok(Some(header))
}
/// Query the node storage and return a subscription which streams corresponding storage events back.
pub async fn archive_unstable_storage(
&self,
block_hash: T::Hash,
items: impl IntoIterator<Item = StorageQuery<&[u8]>>,
child_key: Option<&[u8]>,
) -> Result<ArchiveStorageSubscription<T::Hash>, Error> {
let items: Vec<StorageQuery<String>> = items
.into_iter()
.map(|item| StorageQuery {
key: to_hex(item.key),
query_type: item.query_type,
})
.collect();
let sub = self
.client
.subscribe(
"archive_unstable_storage",
rpc_params![block_hash, items, child_key.map(to_hex)],
"archive_unstable_stopStorage",
)
.await?;
Ok(ArchiveStorageSubscription { sub, done: false })
}
}
/// This represents events generated by the `follow` method.
@@ -754,6 +876,140 @@ pub struct TransactionBlockDetails<Hash> {
pub index: u64,
}
/// The response from calling `archive_call`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ArchiveCallResult {
/// The bytes returned from successfully making a call
Success(Bytes),
/// An error returned if the call was not successful.
Error(String),
}
impl ArchiveCallResult {
/// Return the bytes on success, or `None` if not an [`ArchiveCallResult::Success`].
pub fn as_success(self) -> Option<Bytes> {
match self {
ArchiveCallResult::Success(bytes) => Some(bytes),
_ => None,
}
}
/// Return the error message on call failure, or `None` if not an [`ArchiveCallResult::Error`].
pub fn as_error(self) -> Option<String> {
match self {
ArchiveCallResult::Success(_) => None,
ArchiveCallResult::Error(e) => Some(e),
}
}
}
/// A subscription which returns follow events, and ends when a Stop event occurs.
pub struct ArchiveStorageSubscription<Hash> {
sub: RpcSubscription<ArchiveStorageEvent<Hash>>,
done: bool,
}
impl<Hash: BlockHash> ArchiveStorageSubscription<Hash> {
/// Fetch the next item in the stream.
pub async fn next(&mut self) -> Option<<Self as Stream>::Item> {
<Self as StreamExt>::next(self).await
}
/// Fetch the subscription ID for the stream.
pub fn subscription_id(&self) -> Option<&str> {
self.sub.subscription_id()
}
}
impl<Hash: BlockHash> Stream for ArchiveStorageSubscription<Hash> {
type Item = <RpcSubscription<ArchiveStorageEvent<Hash>> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
let res = self.sub.poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(ArchiveStorageEvent::Done | ArchiveStorageEvent::Error(..)))) =
&res
{
// No more events will occur after "done" or "error" events.
self.done = true;
}
res
}
}
/// Responses returned from [`ArchiveStorageSubscription`].
#[derive(Debug, Deserialize)]
#[serde(tag = "event")]
pub enum ArchiveStorageEvent<Hash> {
/// A storage response for one of the requested items.
#[serde(rename = "storage")]
Item(ArchiveStorageEventItem<Hash>),
/// A human-readable error indicating what went wrong. No more storage events
/// will be emitted after this.
#[serde(rename = "storageError")]
Error(ArchiveStorageEventError),
/// No more storage events will be emitted after this.
#[serde(rename = "storageDone")]
Done,
}
impl<Hash> ArchiveStorageEvent<Hash> {
/// Return a storage item or `None` if not an [`ArchiveStorageEvent::Item`].
pub fn as_item(self) -> Option<ArchiveStorageEventItem<Hash>> {
match self {
ArchiveStorageEvent::Item(item) => Some(item),
_ => None,
}
}
/// Return a storage error or `None` if not an [`ArchiveStorageEvent::Error`].
pub fn as_error(self) -> Option<ArchiveStorageEventError> {
match self {
ArchiveStorageEvent::Error(e) => Some(e),
_ => None,
}
}
/// Is this an [`ArchiveStorageEvent::Done`].
pub fn is_done(self) -> bool {
matches!(self, ArchiveStorageEvent::Done)
}
}
/// Something went wrong during the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ArchiveStorageEventError {
/// The human readable error message indicating what went wrong.
pub error: String,
}
/// A storage item returned from the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ArchiveStorageEventItem<Hash> {
/// String containing the hexadecimal-encoded key of the storage entry.
pub key: Bytes,
/// String containing the hexadecimal-encoded value of the storage entry.
/// Returned when the request type is [`StorageQueryType::Value`] or [`StorageQueryType::DescendantsValues`].
pub value: Option<Bytes>,
/// String containing the hexadecimal-encoded hash of the storage entry.
/// Returned when the request type is [`StorageQueryType::Hash`] or [`StorageQueryType::DescendantsHashes`].
pub hash: Option<Hash>,
/// String containing the hexadecimal-encoded Merkle value of the closest descendant of key (including branch nodes).
/// Returned when the request type is [`StorageQueryType::ClosestDescendantMerkleValue`].
pub closest_descendant_merkle_value: Option<Bytes>,
/// String containing the hexadecimal-encoded key of the child trie of the "default" namespace if the storage entry
/// is part of a child trie. If the storage entry is part of the main trie, this field is not present.
pub child_trie_key: Option<Bytes>,
}
/// Hex-serialized shim for `Vec<u8>`.
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Hash, PartialOrd, Ord, Debug)]
pub struct Bytes(#[serde(with = "impl_serde::serialize")] pub Vec<u8>);