diff --git a/subxt/src/rpc/rpc.rs b/subxt/src/rpc/rpc.rs index fee29799d4..befe75ea78 100644 --- a/subxt/src/rpc/rpc.rs +++ b/subxt/src/rpc/rpc.rs @@ -41,7 +41,11 @@ use super::{ rpc_params, - types, + types::{ + self, + ChainHeadEvent, + FollowEvent, + }, RpcClient, RpcClientT, Subscription, @@ -465,6 +469,163 @@ impl Rpc { self.client.request("system_dryRun", params).await?; Ok(types::decode_dry_run_result(&mut &*result_bytes.0)?) } + + /// Subscribe to `chainHead_unstable_follow` to obtain all reported blocks by the chain. + /// + /// The subscription ID can be used to make queries for the + /// block's body ([`chainhead_unstable_body`](Rpc::chainhead_unstable_follow)), + /// block's header ([`chainhead_unstable_header`](Rpc::chainhead_unstable_header)), + /// block's storage ([`chainhead_unstable_storage`](Rpc::chainhead_unstable_storage)) and submitting + /// runtime API calls at this block ([`chainhead_unstable_call`](Rpc::chainhead_unstable_call)). + /// + /// # Note + /// + /// When the user is no longer interested in a block, the user is responsible + /// for calling the [`chainhead_unstable_unpin`](Rpc::chainhead_unstable_unpin) method. + /// Failure to do so will result in the subscription being stopped by generating the `Stop` event. + pub async fn chainhead_unstable_follow( + &self, + runtime_updates: bool, + ) -> Result>, Error> { + let subscription = self + .client + .subscribe( + "chainHead_unstable_follow", + rpc_params![runtime_updates], + "chainHead_unstable_unfollow", + ) + .await?; + + Ok(subscription) + } + + /// Subscribe to `chainHead_unstable_body` to obtain events regarding the block's body. + /// + /// # Note + /// + /// The subscription ID is obtained from an open subscription created by + /// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow). + pub async fn chainhead_unstable_body( + &self, + subscription_id: String, + hash: T::Hash, + ) -> Result>, Error> { + let subscription = self + .client + .subscribe( + "chainHead_unstable_body", + rpc_params![subscription_id, hash], + "chainHead_unstable_stopBody", + ) + .await?; + + Ok(subscription) + } + + /// Get the block's body using the `chainHead_unstable_header` method. + /// + /// # Note + /// + /// The subscription ID is obtained from an open subscription created by + /// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow). + pub async fn chainhead_unstable_header( + &self, + subscription_id: String, + hash: T::Hash, + ) -> Result, Error> { + let header = self + .client + .request( + "chainHead_unstable_header", + rpc_params![subscription_id, hash], + ) + .await?; + + Ok(header) + } + + /// Subscribe to `chainHead_storage` to obtain events regarding the + /// block's storage. + /// + /// # Note + /// + /// The subscription ID is obtained from an open subscription created by + /// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow). + pub async fn chainhead_unstable_storage( + &self, + subscription_id: String, + hash: T::Hash, + key: &[u8], + child_key: Option<&[u8]>, + ) -> Result>>, Error> { + let subscription = self + .client + .subscribe( + "chainHead_unstable_storage", + rpc_params![subscription_id, hash, to_hex(key), child_key.map(to_hex)], + "chainHead_unstable_stopStorage", + ) + .await?; + + Ok(subscription) + } + + /// Subscribe to `chainHead_call` to obtain events regarding the + /// runtime API call. + /// + /// # Note + /// + /// The subscription ID is obtained from an open subscription created by + /// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow). + pub async fn chainhead_unstable_call( + &self, + subscription_id: String, + hash: T::Hash, + function: String, + call_parameters: &[u8], + ) -> Result>, Error> { + let subscription = self + .client + .subscribe( + "chainHead_unstable_call", + rpc_params![subscription_id, hash, function, to_hex(call_parameters)], + "chainHead_unstable_stopCall", + ) + .await?; + + Ok(subscription) + } + + /// Unpin a block reported by the `chainHead_follow` subscription. + /// + /// # Note + /// + /// The subscription ID is obtained from an open subscription created by + /// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow). + pub async fn chainhead_unstable_unpin( + &self, + subscription_id: String, + hash: T::Hash, + ) -> Result<(), Error> { + self.client + .request( + "chainHead_unstable_unpin", + rpc_params![subscription_id, hash], + ) + .await?; + + Ok(()) + } + + /// Get genesis hash obtained from the `chainHead_genesisHash` method. + pub async fn chainhead_unstable_genesishash(&self) -> Result { + let hash = self + .client + .request("chainHead_unstable_genesisHash", rpc_params![]) + .await?; + + Ok(hash) + } } fn to_hex(bytes: impl AsRef<[u8]>) -> String { diff --git a/subxt/src/rpc/types.rs b/subxt/src/rpc/types.rs index c68b17a08f..9772363671 100644 --- a/subxt/src/rpc/types.rs +++ b/subxt/src/rpc/types.rs @@ -398,6 +398,390 @@ pub struct Health { pub should_have_peers: bool, } +/// The operation could not be processed due to an error. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ErrorEvent { + /// Reason of the error. + pub error: String, +} + +/// The runtime specification of the current block. +/// +/// This event is generated for: +/// - the first announced block by the follow subscription +/// - blocks that suffered a change in runtime compared with their parents +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeVersionEvent { + /// The runtime version. + pub spec: RuntimeVersion, +} + +/// The runtime event generated if the `follow` subscription +/// has set the `runtime_updates` flag. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub enum RuntimeEvent { + /// The runtime version of this block. + Valid(RuntimeVersionEvent), + /// The runtime could not be obtained due to an error. + Invalid(ErrorEvent), +} + +/// Contain information about the latest finalized block. +/// +/// # Note +/// +/// This is the first event generated by the `follow` subscription +/// and is submitted only once. +/// +/// If the `runtime_updates` flag is set, then this event contains +/// the `RuntimeEvent`, otherwise the `RuntimeEvent` is not present. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Initialized { + /// The hash of the latest finalized block. + pub finalized_block_hash: Hash, + /// The runtime version of the finalized block. + /// + /// # Note + /// + /// This is present only if the `runtime_updates` flag is set for + /// the `follow` subscription. + pub finalized_block_runtime: Option, +} + +/// Indicate a new non-finalized block. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NewBlock { + /// The hash of the new block. + pub block_hash: Hash, + /// The parent hash of the new block. + pub parent_block_hash: Hash, + /// The runtime version of the new block. + /// + /// # Note + /// + /// This is present only if the `runtime_updates` flag is set for + /// the `follow` subscription. + pub new_runtime: Option, +} + +/// Indicate the block hash of the new best block. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BestBlockChanged { + /// The block hash of the new best block. + pub best_block_hash: Hash, +} + +/// Indicate the finalized and pruned block hashes. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Finalized { + /// Block hashes that are finalized. + pub finalized_block_hashes: Vec, + /// Block hashes that are pruned (removed). + pub pruned_block_hashes: Vec, +} + +/// The event generated by the `chainHead_follow` method. +/// +/// The events are generated in the following order: +/// 1. Initialized - generated only once to signal the +/// latest finalized block +/// 2. NewBlock - a new block was added. +/// 3. BestBlockChanged - indicate that the best block +/// is now the one from this event. The block was +/// announced priorly with the `NewBlock` event. +/// 4. Finalized - State the finalized and pruned blocks. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +pub enum FollowEvent { + /// The latest finalized block. + /// + /// This event is generated only once. + Initialized(Initialized), + /// A new non-finalized block was added. + NewBlock(NewBlock), + /// The best block of the chain. + BestBlockChanged(BestBlockChanged), + /// A list of finalized and pruned blocks. + Finalized(Finalized), + /// The subscription is dropped and no further events + /// will be generated. + Stop, +} + +/// The result of a chain head method. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChainHeadResult { + /// Result of the method. + pub result: T, +} + +/// The event generated by the body / call / storage methods. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +pub enum ChainHeadEvent { + /// The request completed successfully. + Done(ChainHeadResult), + /// The resources requested are inaccessible. + /// + /// Resubmitting the request later might succeed. + Inaccessible(ErrorEvent), + /// An error occurred. This is definitive. + Error(ErrorEvent), + /// The provided subscription ID is stale or invalid. + Disjoint, +} + +/// The transaction was broadcasted to a number of peers. +/// +/// # Note +/// +/// The RPC does not guarantee that the peers have received the +/// transaction. +/// +/// When the number of peers is zero, the event guarantees that +/// shutting down the local node will lead to the transaction +/// not being included in the chain. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionBroadcasted { + /// The number of peers the transaction was broadcasted to. + #[serde(with = "as_string")] + pub num_peers: usize, +} + +/// The transaction was included in a block of the chain. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionBlock { + /// The hash of the block the transaction was included into. + pub hash: Hash, + /// The index (zero-based) of the transaction within the body of the block. + #[serde(with = "as_string")] + pub index: usize, +} + +/// The transaction could not be processed due to an error. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionError { + /// Reason of the error. + pub error: String, +} + +/// The transaction was dropped because of exceeding limits. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionDropped { + /// True if the transaction was broadcasted to other peers and + /// may still be included in the block. + pub broadcasted: bool, + /// Reason of the event. + pub error: String, +} + +/// Possible transaction status events. +/// +/// The status events can be grouped based on their kinds as: +/// +/// 1. Runtime validated the transaction: +/// - `Validated` +/// +/// 2. Inside the `Ready` queue: +/// - `Broadcast` +/// +/// 3. Leaving the pool: +/// - `BestChainBlockIncluded` +/// - `Invalid` +/// +/// 4. Block finalized: +/// - `Finalized` +/// +/// 5. At any time: +/// - `Dropped` +/// - `Error` +/// +/// The subscription's stream is considered finished whenever the following events are +/// received: `Finalized`, `Error`, `Invalid` or `Dropped`. However, the user is allowed +/// to unsubscribe at any moment. +#[derive(Debug, Clone, PartialEq, Deserialize)] +// We need to manually specify the trait bounds for the `Hash` trait to ensure `into` and +// `from` still work. +#[serde(bound(deserialize = "Hash: Deserialize<'de> + Clone"))] +#[serde(from = "TransactionEventIR")] +pub enum TransactionEvent { + /// The transaction was validated by the runtime. + Validated, + /// The transaction was broadcasted to a number of peers. + Broadcasted(TransactionBroadcasted), + /// The transaction was included in a best block of the chain. + /// + /// # Note + /// + /// This may contain `None` if the block is no longer a best + /// block of the chain. + BestChainBlockIncluded(Option>), + /// The transaction was included in a finalized block. + Finalized(TransactionBlock), + /// The transaction could not be processed due to an error. + Error(TransactionError), + /// The transaction is marked as invalid. + Invalid(TransactionError), + /// The client was not capable of keeping track of this transaction. + Dropped(TransactionDropped), +} + +/// Intermediate representation (IR) for the transaction events +/// that handles block events only. +/// +/// The block events require a JSON compatible interpretation similar to: +/// +/// ```json +/// { event: "EVENT", block: { hash: "0xFF", index: 0 } } +/// ``` +/// +/// This IR is introduced to circumvent that the block events need to +/// be serialized/deserialized with "tag" and "content", while other +/// events only require "tag". +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event", content = "block")] +enum TransactionEventBlockIR { + /// The transaction was included in the best block of the chain. + BestChainBlockIncluded(Option>), + /// The transaction was included in a finalized block of the chain. + Finalized(TransactionBlock), +} + +/// Intermediate representation (IR) for the transaction events +/// that handles non-block events only. +/// +/// The non-block events require a JSON compatible interpretation similar to: +/// +/// ```json +/// { event: "EVENT", num_peers: 0 } +/// ``` +/// +/// This IR is introduced to circumvent that the block events need to +/// be serialized/deserialized with "tag" and "content", while other +/// events only require "tag". +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +enum TransactionEventNonBlockIR { + Validated, + Broadcasted(TransactionBroadcasted), + Error(TransactionError), + Invalid(TransactionError), + Dropped(TransactionDropped), +} + +/// Intermediate representation (IR) used for serialization/deserialization of the +/// [`TransactionEvent`] in a JSON compatible format. +/// +/// Serde cannot mix `#[serde(tag = "event")]` with `#[serde(tag = "event", content = "block")]` +/// for specific enum variants. Therefore, this IR is introduced to circumvent this +/// restriction, while exposing a simplified [`TransactionEvent`] for users of the +/// rust ecosystem. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(bound(deserialize = "Hash: Deserialize<'de>"))] +#[serde(rename_all = "camelCase")] +#[serde(untagged)] +enum TransactionEventIR { + Block(TransactionEventBlockIR), + NonBlock(TransactionEventNonBlockIR), +} + +impl From> for TransactionEventIR { + fn from(value: TransactionEvent) -> Self { + match value { + TransactionEvent::Validated => { + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Validated) + } + TransactionEvent::Broadcasted(event) => { + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Broadcasted( + event, + )) + } + TransactionEvent::BestChainBlockIncluded(event) => { + TransactionEventIR::Block( + TransactionEventBlockIR::BestChainBlockIncluded(event), + ) + } + TransactionEvent::Finalized(event) => { + TransactionEventIR::Block(TransactionEventBlockIR::Finalized(event)) + } + TransactionEvent::Error(event) => { + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Error(event)) + } + TransactionEvent::Invalid(event) => { + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Invalid(event)) + } + TransactionEvent::Dropped(event) => { + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Dropped(event)) + } + } + } +} + +impl From> for TransactionEvent { + fn from(value: TransactionEventIR) -> Self { + match value { + TransactionEventIR::NonBlock(status) => { + match status { + TransactionEventNonBlockIR::Validated => TransactionEvent::Validated, + TransactionEventNonBlockIR::Broadcasted(event) => { + TransactionEvent::Broadcasted(event) + } + TransactionEventNonBlockIR::Error(event) => { + TransactionEvent::Error(event) + } + TransactionEventNonBlockIR::Invalid(event) => { + TransactionEvent::Invalid(event) + } + TransactionEventNonBlockIR::Dropped(event) => { + TransactionEvent::Dropped(event) + } + } + } + TransactionEventIR::Block(block) => { + match block { + TransactionEventBlockIR::Finalized(event) => { + TransactionEvent::Finalized(event) + } + TransactionEventBlockIR::BestChainBlockIncluded(event) => { + TransactionEvent::BestChainBlockIncluded(event) + } + } + } + } + } +} + +/// Serialize and deserialize helper as string. +mod as_string { + use super::*; + use serde::Deserializer; + + pub fn deserialize<'de, D: Deserializer<'de>>( + deserializer: D, + ) -> Result { + String::deserialize(deserializer)? + .parse() + .map_err(|e| serde::de::Error::custom(format!("Parsing failed: {}", e))) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/testing/integration-tests/src/client/mod.rs b/testing/integration-tests/src/client/mod.rs index ad18d439e4..666bbad952 100644 --- a/testing/integration-tests/src/client/mod.rs +++ b/testing/integration-tests/src/client/mod.rs @@ -11,9 +11,11 @@ use crate::{ wait_for_blocks, }, }; +use assert_matches::assert_matches; use codec::{ Compact, Decode, + Encode, }; use frame_metadata::RuntimeMetadataPrefixed; use sp_core::{ @@ -24,7 +26,15 @@ use sp_core::{ use sp_keyring::AccountKeyring; use subxt::{ error::DispatchError, - rpc::types::DryRunError, + rpc::types::{ + ChainHeadEvent, + DryRunError, + FollowEvent, + Initialized, + RuntimeEvent, + RuntimeVersionEvent, + }, + utils::AccountId32, }; #[tokio::test] @@ -281,3 +291,177 @@ async fn rpc_state_call() { let metadata = metadata.runtime_metadata(); assert_eq!(&metadata_call, metadata); } + +#[tokio::test] +async fn chainhead_unstable_follow() { + let ctx = test_context().await; + let api = ctx.client(); + + // Check subscription with runtime updates set on false. + let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + // The initialized event should contain the finalized block hash. + let finalized_block_hash = api.rpc().finalized_head().await.unwrap(); + assert_eq!( + event, + FollowEvent::Initialized(Initialized { + finalized_block_hash, + finalized_block_runtime: None, + }) + ); + + // Expect subscription to produce runtime versions. + let mut blocks = api.rpc().chainhead_unstable_follow(true).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + // The initialized event should contain the finalized block hash. + let finalized_block_hash = api.rpc().finalized_head().await.unwrap(); + let runtime_version = ctx.client().runtime_version(); + + assert_matches!( + event, + FollowEvent::Initialized(init) => { + assert_eq!(init.finalized_block_hash, finalized_block_hash); + assert_eq!(init.finalized_block_runtime, Some(RuntimeEvent::Valid(RuntimeVersionEvent { + spec: runtime_version, + }))); + } + ); +} + +#[tokio::test] +async fn chainhead_unstable_body() { + let ctx = test_context().await; + let api = ctx.client(); + + let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + let hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hash, + _ => panic!("Unexpected event"), + }; + let sub_id = blocks.subscription_id().unwrap().clone(); + + // Subscribe to fetch the block's body. + let mut sub = api + .rpc() + .chainhead_unstable_body(sub_id, hash) + .await + .unwrap(); + let event = sub.next().await.unwrap().unwrap(); + + // Expected block's extrinsics scale encoded and hex encoded. + let body = api.rpc().block(Some(hash)).await.unwrap().unwrap(); + let extrinsics: Vec> = + body.block.extrinsics.into_iter().map(|ext| ext.0).collect(); + let expected = format!("0x{}", hex::encode(extrinsics.encode())); + + assert_matches!(event, + ChainHeadEvent::Done(done) if done.result == expected + ); +} + +#[tokio::test] +async fn chainhead_unstable_header() { + let ctx = test_context().await; + let api = ctx.client(); + + let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + let hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hash, + _ => panic!("Unexpected event"), + }; + let sub_id = blocks.subscription_id().unwrap().clone(); + + let header = api.rpc().header(Some(hash)).await.unwrap().unwrap(); + let expected = format!("0x{}", hex::encode(header.encode())); + + let header = api + .rpc() + .chainhead_unstable_header(sub_id, hash) + .await + .unwrap() + .unwrap(); + + assert_eq!(header, expected); +} + +#[tokio::test] +async fn chainhead_unstable_storage() { + let ctx = test_context().await; + let api = ctx.client(); + + let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + let hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hash, + _ => panic!("Unexpected event"), + }; + let sub_id = blocks.subscription_id().unwrap().clone(); + + let alice: AccountId32 = AccountKeyring::Alice.to_account_id().into(); + let addr = node_runtime::storage().system().account(alice).to_bytes(); + let mut sub = api + .rpc() + .chainhead_unstable_storage(sub_id, hash, &addr, None) + .await + .unwrap(); + let event = sub.next().await.unwrap().unwrap(); + + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result.is_some()); +} + +#[tokio::test] +async fn chainhead_unstable_call() { + let ctx = test_context().await; + let api = ctx.client(); + + let mut blocks = api.rpc().chainhead_unstable_follow(true).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + let hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hash, + _ => panic!("Unexpected event"), + }; + let sub_id = blocks.subscription_id().unwrap().clone(); + + let alice_id = AccountKeyring::Alice.to_account_id(); + let mut sub = api + .rpc() + .chainhead_unstable_call( + sub_id, + hash, + "AccountNonceApi_account_nonce".into(), + &alice_id.encode(), + ) + .await + .unwrap(); + let event = sub.next().await.unwrap().unwrap(); + + assert_matches!(event, ChainHeadEvent::::Done(_)); +} + +#[tokio::test] +async fn chainhead_unstable_unpin() { + let ctx = test_context().await; + let api = ctx.client(); + + let mut blocks = api.rpc().chainhead_unstable_follow(true).await.unwrap(); + let event = blocks.next().await.unwrap().unwrap(); + let hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hash, + _ => panic!("Unexpected event"), + }; + let sub_id = blocks.subscription_id().unwrap().clone(); + + assert!(api + .rpc() + .chainhead_unstable_unpin(sub_id.clone(), hash) + .await + .is_ok()); + // The block was already unpinned. + assert!(api + .rpc() + .chainhead_unstable_unpin(sub_id, hash) + .await + .is_err()); +}