chainHead: Propagate results on the chainHead_follow (#1116)

* rpc/types: Update chainHead events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Sync chainHead methods with spec

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* testing: Adjust chainHead tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Ignore block related events to avoid flaky tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Adjust clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* test: Remove unused OfflineClientT

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* types: Remove serde flags for serialization

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Alexandru Vasile
2023-08-09 17:40:39 +03:00
committed by GitHub
parent d02afcdf07
commit b97acc5df6
4 changed files with 249 additions and 161 deletions
+25 -21
View File
@@ -39,7 +39,7 @@ use crate::{error::Error, utils::PhantomDataSendSync, Config, Metadata};
use super::{
rpc_params,
types::{self, ChainHeadEvent, ChainHeadStorageEvent, FollowEvent, StorageQuery},
types::{self, FollowEvent, StorageQuery},
RpcClient, RpcClientT, Subscription,
};
@@ -499,7 +499,10 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Subscribe to `chainHead_unstable_body` to obtain events regarding the block's body.
/// Call the `chainHead_unstable_body` method and return an operation ID to obtain the block's body.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
@@ -509,17 +512,16 @@ impl<T: Config> Rpc<T> {
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
) -> Result<types::MethodResponse, Error> {
let response = self
.client
.subscribe(
.request(
"chainHead_unstable_body",
rpc_params![subscription_id, hash],
"chainHead_unstable_stopBody",
)
.await?;
Ok(subscription)
Ok(response)
}
/// Get the block's body using the `chainHead_unstable_header` method.
@@ -544,8 +546,10 @@ impl<T: Config> Rpc<T> {
Ok(header)
}
/// Subscribe to `chainHead_storage` to obtain events regarding the
/// block's storage.
/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the block's storage.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
@@ -557,7 +561,7 @@ impl<T: Config> Rpc<T> {
hash: T::Hash,
items: Vec<StorageQuery<&[u8]>>,
child_key: Option<&[u8]>,
) -> Result<Subscription<ChainHeadStorageEvent<Option<String>>>, Error> {
) -> Result<types::MethodResponse, Error> {
let items: Vec<StorageQuery<String>> = items
.into_iter()
.map(|item| StorageQuery {
@@ -566,20 +570,21 @@ impl<T: Config> Rpc<T> {
})
.collect();
let subscription = self
let response = self
.client
.subscribe(
.request(
"chainHead_unstable_storage",
rpc_params![subscription_id, hash, items, child_key.map(to_hex)],
"chainHead_unstable_stopStorage",
)
.await?;
Ok(subscription)
Ok(response)
}
/// Subscribe to `chainHead_call` to obtain events regarding the
/// runtime API call.
/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the runtime API result.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
@@ -591,17 +596,16 @@ impl<T: Config> Rpc<T> {
hash: T::Hash,
function: String,
call_parameters: &[u8],
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
) -> Result<types::MethodResponse, Error> {
let response = self
.client
.subscribe(
.request(
"chainHead_unstable_call",
rpc_params![subscription_id, hash, function, to_hex(call_parameters)],
"chainHead_unstable_stopCall",
)
.await?;
Ok(subscription)
Ok(response)
}
/// Unpin a block reported by the `chainHead_follow` subscription.
+153 -110
View File
@@ -383,7 +383,7 @@ pub struct RuntimeVersionEvent {
}
/// The runtime event generated if the `follow` subscription
/// has set the `runtime_updates` flag.
/// has set the `with_runtime` flag.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
@@ -400,9 +400,6 @@ pub enum RuntimeEvent {
///
/// This is the first event generated by the `follow` subscription
/// and is submitted only once.
///
/// If the `runtime_updates` flag is set, then this event contains
/// the `RuntimeEvent`, otherwise the `RuntimeEvent` is not present.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
@@ -412,7 +409,7 @@ pub struct Initialized<Hash> {
///
/// # Note
///
/// This is present only if the `runtime_updates` flag is set for
/// This is present only if the `with_runtime` flag is set for
/// the `follow` subscription.
pub finalized_block_runtime: Option<RuntimeEvent>,
}
@@ -429,7 +426,7 @@ pub struct NewBlock<Hash> {
///
/// # Note
///
/// This is present only if the `runtime_updates` flag is set for
/// This is present only if the `with_runtime` flag is set for
/// the `follow` subscription.
pub new_runtime: Option<RuntimeEvent>,
}
@@ -452,16 +449,76 @@ pub struct Finalized<Hash> {
pub pruned_block_hashes: Vec<Hash>,
}
/// The event generated by the `chainHead_follow` method.
/// Indicate the operation id of the event.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OperationId {
/// The operation id of the event.
pub operation_id: String,
}
/// The response of the `chainHead_body` method.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OperationBodyDone {
/// The operation id of the event.
pub operation_id: String,
/// Array of hexadecimal-encoded scale-encoded extrinsics found in the block.
pub value: Vec<String>,
}
/// The response of the `chainHead_call` method.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OperationCallDone {
/// The operation id of the event.
pub operation_id: String,
/// Hexadecimal-encoded output of the runtime function call.
pub output: String,
}
/// The response of the `chainHead_call` method.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OperationStorageItems {
/// The operation id of the event.
pub operation_id: String,
/// The resulting items.
pub items: Vec<StorageResult>,
}
/// Indicate a problem during the operation.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OperationError {
/// The operation id of the event.
pub operation_id: String,
/// The reason of the error.
pub error: String,
}
/// The event generated by the `follow` method.
///
/// The events are generated in the following order:
/// 1. Initialized - generated only once to signal the
/// latest finalized block
/// The block events are generated in the following order:
/// 1. Initialized - generated only once to signal the latest finalized block
/// 2. NewBlock - a new block was added.
/// 3. BestBlockChanged - indicate that the best block
/// is now the one from this event. The block was
/// announced priorly with the `NewBlock` event.
/// 3. BestBlockChanged - indicate that the best block is now the one from this event. The block was
/// announced priorly with the `NewBlock` event.
/// 4. Finalized - State the finalized and pruned blocks.
///
/// The following events are related to operations:
/// - OperationBodyDone: The response of the `chainHead_body`
/// - OperationCallDone: The response of the `chainHead_call`
/// - OperationStorageItems: Items produced by the `chianHead_storage`
/// - OperationWaitingForContinue: Generated after OperationStorageItems and requires the user to
/// call `chainHead_continue`
/// - OperationStorageDone: The `chainHead_storage` method has produced all the results
/// - OperationInaccessible: The server was unable to provide the result, retries might succeed in
/// the future
/// - OperationError: The server encountered an error, retries will not succeed
///
/// The stop event indicates that the JSON-RPC server was unable to provide a consistent list of
/// the blocks at the head of the chain.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
@@ -476,34 +533,99 @@ pub enum FollowEvent<Hash> {
BestBlockChanged(BestBlockChanged<Hash>),
/// A list of finalized and pruned blocks.
Finalized(Finalized<Hash>),
/// The response of the `chainHead_body` method.
OperationBodyDone(OperationBodyDone),
/// The response of the `chainHead_call` method.
OperationCallDone(OperationCallDone),
/// Yield one or more items found in the storage.
OperationStorageItems(OperationStorageItems),
/// Ask the user to call `chainHead_continue` to produce more events
/// regarding the operation id.
OperationWaitingForContinue(OperationId),
/// The responses of the `chainHead_storage` method have been produced.
OperationStorageDone(OperationId),
/// The RPC server was unable to provide the response of the following operation id.
///
/// Repeating the same operation in the future might succeed.
OperationInaccessible(OperationId),
/// The RPC server encountered an error while processing an operation id.
///
/// Repeating the same operation in the future will not succeed.
OperationError(OperationError),
/// The subscription is dropped and no further events
/// will be generated.
Stop,
}
/// The result of a chain head method.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
/// The storage item received as parameter.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChainHeadResult<T> {
/// Result of the method.
pub result: T,
pub struct StorageQuery<Key> {
/// The provided key.
pub key: Key,
/// The type of the storage query.
#[serde(rename = "type")]
pub query_type: StorageQueryType,
}
/// The event generated by the body and call methods.
/// The type of the storage query.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum StorageQueryType {
/// Fetch the value of the provided key.
Value,
/// Fetch the hash of the value of the provided key.
Hash,
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue,
/// Fetch the values of all descendants of they provided key.
DescendantsValues,
/// Fetch the hashes of the values of all descendants of they provided key.
DescendantsHashes,
}
/// The storage result.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
pub enum ChainHeadEvent<T> {
/// The request completed successfully.
Done(ChainHeadResult<T>),
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible(ErrorEvent),
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
pub struct StorageResult {
/// The hex-encoded key of the result.
pub key: String,
/// The result of the query.
#[serde(flatten)]
pub result: StorageResultType,
}
/// The type of the storage query.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum StorageResultType {
/// Fetch the value of the provided key.
Value(String),
/// Fetch the hash of the value of the provided key.
Hash(String),
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue(String),
}
/// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "result")]
pub enum MethodResponse {
/// The method has started.
Started(MethodResponseStarted),
/// The RPC server cannot handle the request at the moment.
LimitReached,
}
/// The `started` result of a method.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MethodResponseStarted {
/// The operation id of the response.
pub operation_id: String,
/// The number of items from the back of the `chainHead_storage` that have been discarded.
pub discarded_items: Option<usize>,
}
/// The transaction was broadcasted to a number of peers.
@@ -728,85 +850,6 @@ mod as_string {
}
}
/// The storage item received as paramter.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageQuery<Key> {
/// The provided key.
pub key: Key,
/// The type of the storage query.
#[serde(rename = "type")]
pub query_type: StorageQueryType,
}
/// The type of the storage query.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum StorageQueryType {
/// Fetch the value of the provided key.
Value,
/// Fetch the hash of the value of the provided key.
Hash,
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue,
/// Fetch the values of all descendants of they provided key.
DescendantsValues,
/// Fetch the hashes of the values of all descendants of they provided key.
DescendantsHashes,
}
/// The storage result.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageResult<T> {
/// The hex-encoded key of the result.
pub key: String,
/// The result of the query.
#[serde(flatten)]
pub result: StorageResultType<T>,
}
/// The type of the storage query.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum StorageResultType<T> {
/// Fetch the value of the provided key.
Value(T),
/// Fetch the hash of the value of the provided key.
Hash(T),
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue(T),
}
/// The event generated by storage method.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "event")]
pub enum ChainHeadStorageEvent<T> {
/// The request produced multiple result items.
Items(ItemsEvent<T>),
/// The request produced multiple result items.
WaitForContinue,
/// The request completed successfully and all the results were provided.
Done,
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible,
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
}
/// The request produced multiple result items.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ItemsEvent<T> {
/// The resulting items.
pub items: Vec<StorageResult<T>>,
}
#[cfg(test)]
mod test {
use super::*;
@@ -9,18 +9,44 @@ use crate::{
use assert_matches::assert_matches;
use codec::{Compact, Decode, Encode};
use sp_core::storage::well_known_keys;
use sp_runtime::DeserializeOwned;
use subxt::{
error::{DispatchError, Error, TokenError},
rpc::types::{
ChainHeadEvent, ChainHeadStorageEvent, DryRunResult, DryRunResultBytes, FollowEvent,
Initialized, RuntimeEvent, RuntimeVersionEvent, StorageQuery, StorageQueryType,
StorageResultType,
rpc::{
types::{
DryRunResult, DryRunResultBytes, FollowEvent, Initialized, MethodResponse,
RuntimeEvent, RuntimeVersionEvent, StorageQuery, StorageQueryType,
},
Subscription,
},
utils::AccountId32,
};
use subxt_metadata::Metadata;
use subxt_signer::sr25519::dev;
/// Ignore block related events and obtain the next event related to an operation.
async fn next_operation_event<T: DeserializeOwned>(
sub: &mut Subscription<FollowEvent<T>>,
) -> FollowEvent<T> {
// At most 5 retries.
for _ in 0..5 {
let event = sub.next().await.unwrap().unwrap();
match event {
// Can also return the `Stop` event for better debugging.
FollowEvent::Initialized(_)
| FollowEvent::NewBlock(_)
| FollowEvent::BestBlockChanged(_)
| FollowEvent::Finalized(_) => continue,
_ => (),
};
return event;
}
panic!("Cannot find operation related event after 5 produced events");
}
#[tokio::test]
async fn insert_key() {
let ctx = test_context_with("bob".to_string()).await;
@@ -484,21 +510,22 @@ async fn chainhead_unstable_body() {
};
let sub_id = blocks.subscription_id().unwrap().clone();
// Subscribe to fetch the block's body.
let mut sub = api
// Fetch the block's body.
let response = api
.rpc()
.chainhead_unstable_body(sub_id, hash)
.await
.unwrap();
let event = sub.next().await.unwrap().unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Expected block's extrinsics scale encoded and hex encoded.
let body = api.rpc().block(Some(hash)).await.unwrap().unwrap();
let extrinsics: Vec<Vec<u8>> = body.block.extrinsics.into_iter().map(|ext| ext.0).collect();
let expected = format!("0x{}", hex::encode(extrinsics.encode()));
assert_matches!(event,
ChainHeadEvent::Done(done) if done.result == expected
// Response propagated to `chainHead_follow`.
let event = next_operation_event(&mut blocks).await;
assert_matches!(
event,
FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id
);
}
@@ -549,24 +576,29 @@ async fn chainhead_unstable_storage() {
key: addr_bytes.as_slice(),
query_type: StorageQueryType::Value,
}];
let mut sub = api
// Fetch storage.
let response = api
.rpc()
.chainhead_unstable_storage(sub_id, hash, items, None)
.await
.unwrap();
let event = sub.next().await.unwrap().unwrap();
match event {
ChainHeadStorageEvent::<Option<String>>::Items(event) => {
assert_eq!(event.items.len(), 1);
assert_eq!(event.items[0].key, format!("0x{}", hex::encode(addr_bytes)));
assert_matches!(&event.items[0].result, StorageResultType::Value(value) if value.is_some());
}
_ => panic!("unexpected ChainHeadStorageEvent"),
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
let event = sub.next().await.unwrap().unwrap();
assert_matches!(event, ChainHeadStorageEvent::<Option<String>>::Done);
// Response propagated to `chainHead_follow`.
let event = next_operation_event(&mut blocks).await;
assert_matches!(
event,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == format!("0x{}", hex::encode(addr_bytes))
);
let event = next_operation_event(&mut blocks).await;
assert_matches!(event, FollowEvent::OperationStorageDone(res) if res.operation_id == operation_id);
}
#[tokio::test]
@@ -583,7 +615,8 @@ async fn chainhead_unstable_call() {
let sub_id = blocks.subscription_id().unwrap().clone();
let alice_id = dev::alice().public_key().to_account_id();
let mut sub = api
// Runtime API call.
let response = api
.rpc()
.chainhead_unstable_call(
sub_id,
@@ -593,9 +626,17 @@ async fn chainhead_unstable_call() {
)
.await
.unwrap();
let event = sub.next().await.unwrap().unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
assert_matches!(event, ChainHeadEvent::<String>::Done(_));
// Response propagated to `chainHead_follow`.
let event = next_operation_event(&mut blocks).await;
assert_matches!(
event,
FollowEvent::OperationCallDone(res) if res.operation_id == operation_id
);
}
#[tokio::test]
@@ -31,7 +31,7 @@ use crate::utils::node_runtime;
use codec::{Compact, Encode};
use futures::StreamExt;
use subxt::{
client::{LightClient, LightClientBuilder, OfflineClientT, OnlineClientT},
client::{LightClient, LightClientBuilder, OnlineClientT},
config::PolkadotConfig,
rpc::types::FollowEvent,
};