diff --git a/subxt/src/rpc/rpc.rs b/subxt/src/rpc/rpc.rs index 7e8ddff797..1d2be6e5bd 100644 --- a/subxt/src/rpc/rpc.rs +++ b/subxt/src/rpc/rpc.rs @@ -39,7 +39,7 @@ use crate::{error::Error, utils::PhantomDataSendSync, Config, Metadata}; use super::{ rpc_params, - types::{self, ChainHeadEvent, ChainHeadStorageEvent, FollowEvent, StorageQuery}, + types::{self, FollowEvent, StorageQuery}, RpcClient, RpcClientT, Subscription, }; @@ -499,7 +499,10 @@ impl Rpc { Ok(subscription) } - /// Subscribe to `chainHead_unstable_body` to obtain events regarding the block's body. + /// Call the `chainHead_unstable_body` method and return an operation ID to obtain the block's body. + /// + /// The response events are provided on the `chainHead_follow` subscription and identified by + /// the returned operation ID. /// /// # Note /// @@ -509,17 +512,16 @@ impl Rpc { &self, subscription_id: String, hash: T::Hash, - ) -> Result>, Error> { - let subscription = self + ) -> Result { + let response = self .client - .subscribe( + .request( "chainHead_unstable_body", rpc_params![subscription_id, hash], - "chainHead_unstable_stopBody", ) .await?; - Ok(subscription) + Ok(response) } /// Get the block's body using the `chainHead_unstable_header` method. @@ -544,8 +546,10 @@ impl Rpc { Ok(header) } - /// Subscribe to `chainHead_storage` to obtain events regarding the - /// block's storage. + /// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the block's storage. + /// + /// The response events are provided on the `chainHead_follow` subscription and identified by + /// the returned operation ID. /// /// # Note /// @@ -557,7 +561,7 @@ impl Rpc { hash: T::Hash, items: Vec>, child_key: Option<&[u8]>, - ) -> Result>>, Error> { + ) -> Result { let items: Vec> = items .into_iter() .map(|item| StorageQuery { @@ -566,20 +570,21 @@ impl Rpc { }) .collect(); - let subscription = self + let response = self .client - .subscribe( + .request( "chainHead_unstable_storage", rpc_params![subscription_id, hash, items, child_key.map(to_hex)], - "chainHead_unstable_stopStorage", ) .await?; - Ok(subscription) + Ok(response) } - /// Subscribe to `chainHead_call` to obtain events regarding the - /// runtime API call. + /// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the runtime API result. + /// + /// The response events are provided on the `chainHead_follow` subscription and identified by + /// the returned operation ID. /// /// # Note /// @@ -591,17 +596,16 @@ impl Rpc { hash: T::Hash, function: String, call_parameters: &[u8], - ) -> Result>, Error> { - let subscription = self + ) -> Result { + let response = self .client - .subscribe( + .request( "chainHead_unstable_call", rpc_params![subscription_id, hash, function, to_hex(call_parameters)], - "chainHead_unstable_stopCall", ) .await?; - Ok(subscription) + Ok(response) } /// Unpin a block reported by the `chainHead_follow` subscription. diff --git a/subxt/src/rpc/types.rs b/subxt/src/rpc/types.rs index d5ded7d634..587199fb11 100644 --- a/subxt/src/rpc/types.rs +++ b/subxt/src/rpc/types.rs @@ -383,7 +383,7 @@ pub struct RuntimeVersionEvent { } /// The runtime event generated if the `follow` subscription -/// has set the `runtime_updates` flag. +/// has set the `with_runtime` flag. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "type")] @@ -400,9 +400,6 @@ pub enum RuntimeEvent { /// /// This is the first event generated by the `follow` subscription /// and is submitted only once. -/// -/// If the `runtime_updates` flag is set, then this event contains -/// the `RuntimeEvent`, otherwise the `RuntimeEvent` is not present. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Initialized { @@ -412,7 +409,7 @@ pub struct Initialized { /// /// # Note /// - /// This is present only if the `runtime_updates` flag is set for + /// This is present only if the `with_runtime` flag is set for /// the `follow` subscription. pub finalized_block_runtime: Option, } @@ -429,7 +426,7 @@ pub struct NewBlock { /// /// # Note /// - /// This is present only if the `runtime_updates` flag is set for + /// This is present only if the `with_runtime` flag is set for /// the `follow` subscription. pub new_runtime: Option, } @@ -452,16 +449,76 @@ pub struct Finalized { pub pruned_block_hashes: Vec, } -/// The event generated by the `chainHead_follow` method. +/// Indicate the operation id of the event. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OperationId { + /// The operation id of the event. + pub operation_id: String, +} + +/// The response of the `chainHead_body` method. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OperationBodyDone { + /// The operation id of the event. + pub operation_id: String, + /// Array of hexadecimal-encoded scale-encoded extrinsics found in the block. + pub value: Vec, +} + +/// The response of the `chainHead_call` method. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OperationCallDone { + /// The operation id of the event. + pub operation_id: String, + /// Hexadecimal-encoded output of the runtime function call. + pub output: String, +} + +/// The response of the `chainHead_call` method. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OperationStorageItems { + /// The operation id of the event. + pub operation_id: String, + /// The resulting items. + pub items: Vec, +} + +/// Indicate a problem during the operation. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OperationError { + /// The operation id of the event. + pub operation_id: String, + /// The reason of the error. + pub error: String, +} + +/// The event generated by the `follow` method. /// -/// The events are generated in the following order: -/// 1. Initialized - generated only once to signal the -/// latest finalized block +/// The block events are generated in the following order: +/// 1. Initialized - generated only once to signal the latest finalized block /// 2. NewBlock - a new block was added. -/// 3. BestBlockChanged - indicate that the best block -/// is now the one from this event. The block was -/// announced priorly with the `NewBlock` event. +/// 3. BestBlockChanged - indicate that the best block is now the one from this event. The block was +/// announced priorly with the `NewBlock` event. /// 4. Finalized - State the finalized and pruned blocks. +/// +/// The following events are related to operations: +/// - OperationBodyDone: The response of the `chainHead_body` +/// - OperationCallDone: The response of the `chainHead_call` +/// - OperationStorageItems: Items produced by the `chianHead_storage` +/// - OperationWaitingForContinue: Generated after OperationStorageItems and requires the user to +/// call `chainHead_continue` +/// - OperationStorageDone: The `chainHead_storage` method has produced all the results +/// - OperationInaccessible: The server was unable to provide the result, retries might succeed in +/// the future +/// - OperationError: The server encountered an error, retries will not succeed +/// +/// The stop event indicates that the JSON-RPC server was unable to provide a consistent list of +/// the blocks at the head of the chain. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "event")] @@ -476,34 +533,99 @@ pub enum FollowEvent { BestBlockChanged(BestBlockChanged), /// A list of finalized and pruned blocks. Finalized(Finalized), + /// The response of the `chainHead_body` method. + OperationBodyDone(OperationBodyDone), + /// The response of the `chainHead_call` method. + OperationCallDone(OperationCallDone), + /// Yield one or more items found in the storage. + OperationStorageItems(OperationStorageItems), + /// Ask the user to call `chainHead_continue` to produce more events + /// regarding the operation id. + OperationWaitingForContinue(OperationId), + /// The responses of the `chainHead_storage` method have been produced. + OperationStorageDone(OperationId), + /// The RPC server was unable to provide the response of the following operation id. + /// + /// Repeating the same operation in the future might succeed. + OperationInaccessible(OperationId), + /// The RPC server encountered an error while processing an operation id. + /// + /// Repeating the same operation in the future will not succeed. + OperationError(OperationError), /// The subscription is dropped and no further events /// will be generated. Stop, } -/// The result of a chain head method. -#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +/// The storage item received as parameter. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ChainHeadResult { - /// Result of the method. - pub result: T, +pub struct StorageQuery { + /// The provided key. + pub key: Key, + /// The type of the storage query. + #[serde(rename = "type")] + pub query_type: StorageQueryType, } -/// The event generated by the body and call methods. +/// The type of the storage query. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum StorageQueryType { + /// Fetch the value of the provided key. + Value, + /// Fetch the hash of the value of the provided key. + Hash, + /// Fetch the closest descendant merkle value. + ClosestDescendantMerkleValue, + /// Fetch the values of all descendants of they provided key. + DescendantsValues, + /// Fetch the hashes of the values of all descendants of they provided key. + DescendantsHashes, +} + +/// The storage result. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] -#[serde(tag = "event")] -pub enum ChainHeadEvent { - /// The request completed successfully. - Done(ChainHeadResult), - /// The resources requested are inaccessible. - /// - /// Resubmitting the request later might succeed. - Inaccessible(ErrorEvent), - /// An error occurred. This is definitive. - Error(ErrorEvent), - /// The provided subscription ID is stale or invalid. - Disjoint, +pub struct StorageResult { + /// The hex-encoded key of the result. + pub key: String, + /// The result of the query. + #[serde(flatten)] + pub result: StorageResultType, +} + +/// The type of the storage query. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum StorageResultType { + /// Fetch the value of the provided key. + Value(String), + /// Fetch the hash of the value of the provided key. + Hash(String), + /// Fetch the closest descendant merkle value. + ClosestDescendantMerkleValue(String), +} + +/// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "result")] +pub enum MethodResponse { + /// The method has started. + Started(MethodResponseStarted), + /// The RPC server cannot handle the request at the moment. + LimitReached, +} + +/// The `started` result of a method. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MethodResponseStarted { + /// The operation id of the response. + pub operation_id: String, + /// The number of items from the back of the `chainHead_storage` that have been discarded. + pub discarded_items: Option, } /// The transaction was broadcasted to a number of peers. @@ -728,85 +850,6 @@ mod as_string { } } -/// The storage item received as paramter. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct StorageQuery { - /// The provided key. - pub key: Key, - /// The type of the storage query. - #[serde(rename = "type")] - pub query_type: StorageQueryType, -} - -/// The type of the storage query. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub enum StorageQueryType { - /// Fetch the value of the provided key. - Value, - /// Fetch the hash of the value of the provided key. - Hash, - /// Fetch the closest descendant merkle value. - ClosestDescendantMerkleValue, - /// Fetch the values of all descendants of they provided key. - DescendantsValues, - /// Fetch the hashes of the values of all descendants of they provided key. - DescendantsHashes, -} - -/// The storage result. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct StorageResult { - /// The hex-encoded key of the result. - pub key: String, - /// The result of the query. - #[serde(flatten)] - pub result: StorageResultType, -} - -/// The type of the storage query. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub enum StorageResultType { - /// Fetch the value of the provided key. - Value(T), - /// Fetch the hash of the value of the provided key. - Hash(T), - /// Fetch the closest descendant merkle value. - ClosestDescendantMerkleValue(T), -} - -/// The event generated by storage method. -#[derive(Debug, Clone, PartialEq, Deserialize)] -#[serde(rename_all = "kebab-case")] -#[serde(tag = "event")] -pub enum ChainHeadStorageEvent { - /// The request produced multiple result items. - Items(ItemsEvent), - /// The request produced multiple result items. - WaitForContinue, - /// The request completed successfully and all the results were provided. - Done, - /// The resources requested are inaccessible. - /// - /// Resubmitting the request later might succeed. - Inaccessible, - /// An error occurred. This is definitive. - Error(ErrorEvent), - /// The provided subscription ID is stale or invalid. - Disjoint, -} - -/// The request produced multiple result items. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ItemsEvent { - /// The resulting items. - pub items: Vec>, -} - #[cfg(test)] mod test { use super::*; diff --git a/testing/integration-tests/src/full_client/client/mod.rs b/testing/integration-tests/src/full_client/client/mod.rs index 88c9be6592..4626b040f2 100644 --- a/testing/integration-tests/src/full_client/client/mod.rs +++ b/testing/integration-tests/src/full_client/client/mod.rs @@ -9,18 +9,44 @@ use crate::{ use assert_matches::assert_matches; use codec::{Compact, Decode, Encode}; use sp_core::storage::well_known_keys; +use sp_runtime::DeserializeOwned; use subxt::{ error::{DispatchError, Error, TokenError}, - rpc::types::{ - ChainHeadEvent, ChainHeadStorageEvent, DryRunResult, DryRunResultBytes, FollowEvent, - Initialized, RuntimeEvent, RuntimeVersionEvent, StorageQuery, StorageQueryType, - StorageResultType, + rpc::{ + types::{ + DryRunResult, DryRunResultBytes, FollowEvent, Initialized, MethodResponse, + RuntimeEvent, RuntimeVersionEvent, StorageQuery, StorageQueryType, + }, + Subscription, }, utils::AccountId32, }; use subxt_metadata::Metadata; use subxt_signer::sr25519::dev; +/// Ignore block related events and obtain the next event related to an operation. +async fn next_operation_event( + sub: &mut Subscription>, +) -> FollowEvent { + // At most 5 retries. + for _ in 0..5 { + let event = sub.next().await.unwrap().unwrap(); + + match event { + // Can also return the `Stop` event for better debugging. + FollowEvent::Initialized(_) + | FollowEvent::NewBlock(_) + | FollowEvent::BestBlockChanged(_) + | FollowEvent::Finalized(_) => continue, + _ => (), + }; + + return event; + } + + panic!("Cannot find operation related event after 5 produced events"); +} + #[tokio::test] async fn insert_key() { let ctx = test_context_with("bob".to_string()).await; @@ -484,21 +510,22 @@ async fn chainhead_unstable_body() { }; let sub_id = blocks.subscription_id().unwrap().clone(); - // Subscribe to fetch the block's body. - let mut sub = api + // Fetch the block's body. + let response = api .rpc() .chainhead_unstable_body(sub_id, hash) .await .unwrap(); - let event = sub.next().await.unwrap().unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; - // Expected block's extrinsics scale encoded and hex encoded. - let body = api.rpc().block(Some(hash)).await.unwrap().unwrap(); - let extrinsics: Vec> = body.block.extrinsics.into_iter().map(|ext| ext.0).collect(); - let expected = format!("0x{}", hex::encode(extrinsics.encode())); - - assert_matches!(event, - ChainHeadEvent::Done(done) if done.result == expected + // Response propagated to `chainHead_follow`. + let event = next_operation_event(&mut blocks).await; + assert_matches!( + event, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id ); } @@ -549,24 +576,29 @@ async fn chainhead_unstable_storage() { key: addr_bytes.as_slice(), query_type: StorageQueryType::Value, }]; - let mut sub = api + + // Fetch storage. + let response = api .rpc() .chainhead_unstable_storage(sub_id, hash, items, None) .await .unwrap(); - let event = sub.next().await.unwrap().unwrap(); - - match event { - ChainHeadStorageEvent::>::Items(event) => { - assert_eq!(event.items.len(), 1); - assert_eq!(event.items[0].key, format!("0x{}", hex::encode(addr_bytes))); - assert_matches!(&event.items[0].result, StorageResultType::Value(value) if value.is_some()); - } - _ => panic!("unexpected ChainHeadStorageEvent"), + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), }; - let event = sub.next().await.unwrap().unwrap(); - assert_matches!(event, ChainHeadStorageEvent::>::Done); + // Response propagated to `chainHead_follow`. + let event = next_operation_event(&mut blocks).await; + assert_matches!( + event, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == format!("0x{}", hex::encode(addr_bytes)) + ); + + let event = next_operation_event(&mut blocks).await; + assert_matches!(event, FollowEvent::OperationStorageDone(res) if res.operation_id == operation_id); } #[tokio::test] @@ -583,7 +615,8 @@ async fn chainhead_unstable_call() { let sub_id = blocks.subscription_id().unwrap().clone(); let alice_id = dev::alice().public_key().to_account_id(); - let mut sub = api + // Runtime API call. + let response = api .rpc() .chainhead_unstable_call( sub_id, @@ -593,9 +626,17 @@ async fn chainhead_unstable_call() { ) .await .unwrap(); - let event = sub.next().await.unwrap().unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; - assert_matches!(event, ChainHeadEvent::::Done(_)); + // Response propagated to `chainHead_follow`. + let event = next_operation_event(&mut blocks).await; + assert_matches!( + event, + FollowEvent::OperationCallDone(res) if res.operation_id == operation_id + ); } #[tokio::test] diff --git a/testing/integration-tests/src/light_client/mod.rs b/testing/integration-tests/src/light_client/mod.rs index 42e3b7f510..8d446e140d 100644 --- a/testing/integration-tests/src/light_client/mod.rs +++ b/testing/integration-tests/src/light_client/mod.rs @@ -31,7 +31,7 @@ use crate::utils::node_runtime; use codec::{Compact, Encode}; use futures::StreamExt; use subxt::{ - client::{LightClient, LightClientBuilder, OfflineClientT, OnlineClientT}, + client::{LightClient, LightClientBuilder, OnlineClientT}, config::PolkadotConfig, rpc::types::FollowEvent, };