chainHead_storage: Backport queries for value types (#14551)

* chainHead/events: Add storage params and events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Check storage events serialization / deserialization

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/error: Add error for invalid WaitForContinue storage call

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/storage: Use new items params

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust storage tests to the new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Generalize StorageQuery by provided key

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Add dedicated ChainHeadStorage client for queries

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/storage: Implement queries for hashes of values

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Check storage queries for hashes of values

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Improve API documentation wrt multiple entries

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/event: Rename StorageQueue ty to queue_ty

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chianHead: Add helper to encode chainHead results as hex str

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/chain_head/error.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* chainHead: Change the `queryResult` to a plain `Result`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Stop producing events after the first error

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Change child_key to child_trie API param

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
Alexandru Vasile
2023-07-21 19:56:40 +03:00
committed by GitHub
parent 649be3aaaa
commit 1fef5ee4a4
7 changed files with 703 additions and 110 deletions
@@ -19,7 +19,7 @@
#![allow(non_snake_case)]
//! API trait of the chain head.
use crate::chain_head::event::{ChainHeadEvent, FollowEvent, NetworkConfig};
use crate::chain_head::event::{ChainHeadEvent, FollowEvent, NetworkConfig, StorageQuery};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
#[rpc(client, server)]
@@ -86,7 +86,7 @@ pub trait ChainHeadApi<Hash> {
#[method(name = "chainHead_unstable_genesisHash", blocking)]
fn chain_head_unstable_genesis_hash(&self) -> RpcResult<String>;
/// Return a storage entry at a specific block's state.
/// Returns storage entries at a specific block's state.
///
/// # Unstable
///
@@ -100,8 +100,8 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
hash: Hash,
key: String,
child_key: Option<String>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
network_config: Option<NetworkConfig>,
);
@@ -24,6 +24,7 @@ use crate::{
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
hex_string,
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
@@ -42,10 +43,15 @@ use sc_client_api::{
};
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
use sp_core::{traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};
use super::{
chain_head_storage::ChainHeadStorage,
event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType},
};
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
/// An API for chain head RPC calls.
@@ -74,7 +80,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
max_pinned_blocks: usize,
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
@@ -229,7 +235,7 @@ where
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
let result = format!("0x{:?}", HexDisplay::from(&extrinsics.encode()));
let result = hex_string(&extrinsics.encode());
ChainHeadEvent::Done(ChainHeadResult { result })
},
Ok(None) => {
@@ -272,7 +278,7 @@ where
self.client
.header(hash)
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
.map_err(ChainHeadRpcError::FetchBlockHeader)
.map_err(Into::into)
}
@@ -286,14 +292,33 @@ where
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
key: String,
child_key: Option<String>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let key = StorageKey(parse_hex_param(&mut sink, key)?);
// Gain control over parameter parsing and returned error.
let items = items
.into_iter()
.map(|query| {
if query.queue_type != StorageQueryType::Value &&
query.queue_type != StorageQueryType::Hash
{
// Note: remove this once all types are implemented.
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"Storage query type not supported".into(),
));
return Err(SubscriptionEmptyError)
}
let child_key = child_key
.map(|child_key| parse_hex_param(&mut sink, child_key))
Ok(StorageQuery {
key: StorageKey(parse_hex_param(&mut sink, query.key)?),
queue_type: query.queue_type,
})
})
.collect::<Result<Vec<_>, _>>()?;
let child_trie = child_trie
.map(|child_trie| parse_hex_param(&mut sink, child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);
@@ -304,7 +329,7 @@ where
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
let _ = sink.send(&ChainHeadStorageEvent::<String>::Disjoint);
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
@@ -313,63 +338,19 @@ where
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
let _ = sink.send(&ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let storage_client = ChainHeadStorage::<Client, Block, BE>::new(client);
let fut = async move {
let _block_guard = block_guard;
// The child key is provided, use the key to query the child trie.
if let Some(child_key) = child_key {
// The child key must not be prefixed with ":child_storage:" nor
// ":child_storage:default:".
if well_known_keys::is_default_child_storage_key(child_key.storage_key()) ||
well_known_keys::is_child_storage_key(child_key.storage_key())
{
let _ = sink
.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}
let res = client
.child_storage(hash, &child_key, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
return
}
// The main key must not be prefixed with b":child_storage:" nor
// b":child_storage:default:".
if well_known_keys::is_default_child_storage_key(&key.0) ||
well_known_keys::is_child_storage_key(&key.0)
{
let _ =
sink.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}
// Main root trie storage query.
let res = client
.storage(hash, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
storage_client.generate_events(sink, hash, items, child_trie);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
@@ -423,7 +404,7 @@ where
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
let result = format!("0x{:?}", HexDisplay::from(&result));
let result = hex_string(&result);
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
@@ -0,0 +1,184 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Implementation of the `chainHead_storage` method.
use std::{marker::PhantomData, sync::Arc};
use jsonrpsee::SubscriptionSink;
use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
use sp_api::BlockT;
use sp_core::storage::well_known_keys;
use super::{
event::{
ChainHeadStorageEvent, ItemsEvent, StorageQuery, StorageQueryType, StorageResult,
StorageResultType,
},
hex_string, ErrorEvent,
};
/// Generates the events of the `chainHead_storage` method.
pub struct ChainHeadStorage<Client, Block, BE> {
/// Substrate client.
client: Arc<Client>,
_phantom: PhantomData<(Block, BE)>,
}
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
/// Constructs a new [`ChainHeadStorage`].
pub fn new(client: Arc<Client>) -> Self {
Self { client, _phantom: PhantomData }
}
}
/// Checks if the provided key (main or child key) is valid
/// for queries.
///
/// Keys that are identical to `:child_storage:` or `:child_storage:default:`
/// are not queryable.
fn is_key_queryable(key: &[u8]) -> bool {
!well_known_keys::is_default_child_storage_key(key) &&
!well_known_keys::is_child_storage_key(key)
}
/// The result of making a query call.
type QueryResult = Result<StorageResult<String>, ChainHeadStorageEvent<String>>;
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: StorageProvider<Block, BE> + 'static,
{
/// Fetch the value from storage.
fn query_storage_value(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
let result = if let Some(child_key) = child_key {
self.client.child_storage(hash, child_key, key)
} else {
self.client.storage(hash, key)
};
result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
})
})
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
})
}
/// Fetch the hash of a value from storage.
fn query_storage_hash(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
let result = if let Some(child_key) = child_key {
self.client.child_storage_hash(hash, child_key, key)
} else {
self.client.storage_hash(hash, key)
};
result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
})
})
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
})
}
/// Make the storage query.
fn query_storage(
&self,
hash: Block::Hash,
query: &StorageQuery<StorageKey>,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
if !is_key_queryable(&query.key.0) {
return None
}
match query.queue_type {
StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key),
StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key),
_ => None,
}
}
/// Generate the block events for the `chainHead_storage` method.
pub fn generate_events(
&self,
mut sink: SubscriptionSink,
hash: Block::Hash,
items: Vec<StorageQuery<StorageKey>>,
child_key: Option<ChildInfo>,
) {
if let Some(child_key) = child_key.as_ref() {
if !is_key_queryable(child_key.storage_key()) {
let _ = sink.send(&ChainHeadStorageEvent::<String>::Done);
return
}
}
let mut storage_results = Vec::with_capacity(items.len());
for item in items {
let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else {
continue
};
match result {
QueryResult::Ok(storage_result) => storage_results.push(storage_result),
QueryResult::Err(event) => {
let _ = sink.send(&event);
// If an error is encountered for any of the query items
// do not produce any other events.
return
},
}
}
if !storage_results.is_empty() {
let event = ChainHeadStorageEvent::Items(ItemsEvent { items: storage_results });
let _ = sink.send(&event);
}
let _ = sink.send(&ChainHeadStorageEvent::<String>::Done);
}
}
@@ -39,6 +39,9 @@ pub enum Error {
/// Invalid subscription ID provided by the RPC server.
#[error("Invalid subscription ID")]
InvalidSubscriptionID,
/// Wait-for-continue event not generated.
#[error("Wait for continue event was not generated for the subscription")]
InvalidContinue,
}
// Base code for all `chainHead` errors.
@@ -51,6 +54,8 @@ const FETCH_BLOCK_HEADER_ERROR: i32 = BASE_ERROR + 2;
const INVALID_PARAM_ERROR: i32 = BASE_ERROR + 3;
/// Invalid subscription ID.
const INVALID_SUB_ID: i32 = BASE_ERROR + 4;
/// Wait-for-continue event not generated.
const INVALID_CONTINUE: i32 = BASE_ERROR + 5;
impl From<Error> for ErrorObject<'static> {
fn from(e: Error) -> Self {
@@ -62,6 +67,7 @@ impl From<Error> for ErrorObject<'static> {
ErrorObject::owned(FETCH_BLOCK_HEADER_ERROR, msg, None::<()>),
Error::InvalidParam(_) => ErrorObject::owned(INVALID_PARAM_ERROR, msg, None::<()>),
Error::InvalidSubscriptionID => ErrorObject::owned(INVALID_SUB_ID, msg, None::<()>),
Error::InvalidContinue => ErrorObject::owned(INVALID_CONTINUE, msg, None::<()>),
}
.into()
}
@@ -241,6 +241,85 @@ pub enum ChainHeadEvent<T> {
Disjoint,
}
/// The storage item received as paramter.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageQuery<Key> {
/// The provided key.
pub key: Key,
/// The type of the storage query.
#[serde(rename = "type")]
pub queue_type: StorageQueryType,
}
/// The type of the storage query.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum StorageQueryType {
/// Fetch the value of the provided key.
Value,
/// Fetch the hash of the value of the provided key.
Hash,
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue,
/// Fetch the values of all descendants of they provided key.
DescendantsValues,
/// Fetch the hashes of the values of all descendants of they provided key.
DescendantsHashes,
}
/// The storage result.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageResult<T> {
/// The hex-encoded key of the result.
pub key: String,
/// The result of the query.
#[serde(flatten)]
pub result: StorageResultType<T>,
}
/// The type of the storage query.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum StorageResultType<T> {
/// Fetch the value of the provided key.
Value(T),
/// Fetch the hash of the value of the provided key.
Hash(T),
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue(T),
}
/// The event generated by storage method.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "event")]
pub enum ChainHeadStorageEvent<T> {
/// The request produced multiple result items.
Items(ItemsEvent<T>),
/// The request produced multiple result items.
WaitForContinue,
/// The request completed successfully and all the results were provided.
Done,
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible,
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
}
/// The request produced multiple result items.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ItemsEvent<T> {
/// The resulting items.
pub items: Vec<StorageResult<T>>,
}
#[cfg(test)]
mod tests {
use super::*;
@@ -475,4 +554,152 @@ mod tests {
let conf_dec: NetworkConfig = serde_json::from_str(exp).unwrap();
assert_eq!(conf_dec, conf);
}
#[test]
fn chain_head_storage_query() {
// Item with Value.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Value };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"value"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
// Item with Hash.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Hash };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"hash"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
// Item with DescendantsValues.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsValues };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-values"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
// Item with DescendantsHashes.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsHashes };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-hashes"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
// Item with Merkle.
let item =
StorageQuery { key: "0x1", queue_type: StorageQueryType::ClosestDescendantMerkleValue };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"closest-descendant-merkle-value"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
}
#[test]
fn chain_head_storage_result() {
// Item with Value.
let item = StorageResult { key: "0x1".into(), result: StorageResultType::Value("res") };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","value":"res"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageResult<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
// Item with Hash.
let item = StorageResult { key: "0x1".into(), result: StorageResultType::Hash("res") };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","hash":"res"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageResult<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
// Item with DescendantsValues.
let item = StorageResult {
key: "0x1".into(),
result: StorageResultType::ClosestDescendantMerkleValue("res"),
};
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","closest-descendant-merkle-value":"res"}"#;
assert_eq!(ser, exp);
// Decode
let dec: StorageResult<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
}
#[test]
fn chain_head_storage_event() {
// Event with Items.
let event = ChainHeadStorageEvent::Items(ItemsEvent {
items: vec![
StorageResult { key: "0x1".into(), result: StorageResultType::Value("first") },
StorageResult { key: "0x2".into(), result: StorageResultType::Hash("second") },
],
});
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"items","items":[{"key":"0x1","value":"first"},{"key":"0x2","hash":"second"}]}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with WaitForContinue.
let event = ChainHeadStorageEvent::WaitForContinue;
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"wait-for-continue"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with Done.
let event = ChainHeadStorageEvent::Done;
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"done"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with Inaccessible.
let event = ChainHeadStorageEvent::Inaccessible;
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"inaccessible"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with Inaccessible.
let event = ChainHeadStorageEvent::Error(ErrorEvent { error: "reason".into() });
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"error","error":"reason"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent<&str> = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
}
}
@@ -33,6 +33,7 @@ pub mod error;
pub mod event;
mod chain_head_follow;
mod chain_head_storage;
mod subscription;
pub use api::ChainHeadApiServer;
@@ -41,3 +42,10 @@ pub use event::{
BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent,
Initialized, NetworkConfig, NewBlock, RuntimeEvent, RuntimeVersionEvent,
};
use sp_core::hexdisplay::{AsBytesRef, HexDisplay};
/// Util function to print the results of `chianHead` as hex string
pub(crate) fn hex_string<Data: AsBytesRef>(data: &Data) -> String {
format!("0x{:?}", HexDisplay::from(data))
}
@@ -1,4 +1,7 @@
use crate::chain_head::test_utils::ChainHeadMockClient;
use crate::chain_head::{
event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType, StorageResultType},
test_utils::ChainHeadMockClient,
};
use super::*;
use assert_matches::assert_matches;
@@ -6,6 +9,7 @@ use codec::{Decode, Encode};
use futures::Future;
use jsonrpsee::{
core::{error::Error, server::rpc_module::Subscription as RpcSubscription},
rpc_params,
types::{error::CallError, EmptyServerParams as EmptyParams},
RpcModule,
};
@@ -16,9 +20,9 @@ use sp_api::BlockT;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_core::{
hexdisplay::HexDisplay,
storage::well_known_keys::{self, CODE},
testing::TaskExecutor,
Blake2Hasher, Hasher,
};
use sp_version::RuntimeVersion;
use std::{sync::Arc, time::Duration};
@@ -288,14 +292,14 @@ async fn get_genesis() {
let genesis: String =
api.call("chainHead_unstable_genesisHash", EmptyParams::new()).await.unwrap();
assert_eq!(genesis, format!("0x{}", HexDisplay::from(&CHAIN_GENESIS)));
assert_eq!(genesis, hex_string(&CHAIN_GENESIS));
}
#[tokio::test]
async fn get_header() {
let (_client, api, _sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH));
let invalid_hash = hex_string(&INVALID_HASH);
// Invalid subscription ID must produce no results.
let res: Option<String> = api
@@ -324,7 +328,7 @@ async fn get_header() {
async fn get_body() {
let (mut client, api, mut block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH));
let invalid_hash = hex_string(&INVALID_HASH);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
@@ -377,7 +381,7 @@ async fn get_body() {
let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
// Hex encoded scale encoded string for the vector of extrinsics.
let expected = format!("0x{:?}", HexDisplay::from(&block.extrinsics.encode()));
let expected = hex_string(&block.extrinsics.encode());
assert_matches!(event,
ChainHeadEvent::Done(done) if done.result == expected
);
@@ -387,7 +391,7 @@ async fn get_body() {
async fn call_runtime() {
let (_client, api, _sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH));
let invalid_hash = hex_string(&INVALID_HASH);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
@@ -426,7 +430,7 @@ async fn call_runtime() {
let alice_id = AccountKeyring::Alice.to_account_id();
// Hex encoded scale encoded bytes representing the call parameters.
let call_parameters = format!("0x{:?}", HexDisplay::from(&alice_id.encode()));
let call_parameters = hex_string(&alice_id.encode());
let mut sub = api
.subscribe(
"chainHead_unstable_call",
@@ -495,7 +499,7 @@ async fn call_runtime_without_flag() {
// Valid runtime call on a subscription started with `with_runtime` false.
let alice_id = AccountKeyring::Alice.to_account_id();
let call_parameters = format!("0x{:?}", HexDisplay::from(&alice_id.encode()));
let call_parameters = hex_string(&alice_id.encode());
let err = api
.subscribe(
"chainHead_unstable_call",
@@ -510,23 +514,37 @@ async fn call_runtime_without_flag() {
}
#[tokio::test]
async fn get_storage() {
async fn get_storage_hash() {
let (mut client, api, mut block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH));
let key = format!("0x{:?}", HexDisplay::from(&KEY));
let invalid_hash = hex_string(&INVALID_HASH);
let key = hex_string(&KEY);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
.subscribe("chainHead_unstable_storage", ["invalid_sub_id", &invalid_hash, &key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
"invalid_sub_id",
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
],
)
.await
.unwrap();
let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadEvent::<String>::Disjoint);
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadStorageEvent::<String>::Disjoint);
// Valid subscription ID with invalid block hash will error.
let err = api
.subscribe("chainHead_unstable_storage", [&sub_id, &invalid_hash, &key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
],
)
.await
.unwrap_err();
assert_matches!(err,
@@ -535,11 +553,19 @@ async fn get_storage() {
// Valid call without storage at the key.
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result.is_none());
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
// The `Done` event is generated directly since the key does not have any value associated.
assert_matches!(event, ChainHeadStorageEvent::Done);
// Import a new block with storage changes.
let mut builder = client.new_block(Default::default()).unwrap();
@@ -559,75 +585,236 @@ async fn get_storage() {
);
// Valid call with storage at the key.
let expected_value = Some(format!("0x{:?}", HexDisplay::from(&VALUE)));
let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result == expected_value);
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash));
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
// Child value set in `setup_api`.
let child_info = format!("0x{:?}", HexDisplay::from(b"child"));
let child_info = hex_string(&CHILD_STORAGE_KEY);
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_value = Some(format!("0x{:?}", HexDisplay::from(&CHILD_VALUE)));
let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
println!("Expe: {:?}", expected_hash);
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &genesis_hash, &key, &child_info])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&genesis_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }],
&child_info
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result == expected_value);
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash));
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
}
#[tokio::test]
async fn get_storage_value() {
let (mut client, api, mut block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = hex_string(&INVALID_HASH);
let key = hex_string(&KEY);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
"invalid_sub_id",
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
],
)
.await
.unwrap();
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadStorageEvent::<String>::Disjoint);
// Valid subscription ID with invalid block hash will error.
let err = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);
// Valid call without storage at the key.
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
],
)
.await
.unwrap();
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
// The `Done` event is generated directly since the key does not have any value associated.
assert_matches!(event, ChainHeadStorageEvent::Done);
// Import a new block with storage changes.
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_storage_change(KEY.to_vec(), Some(VALUE.to_vec())).unwrap();
let block = builder.build().unwrap().block;
let block_hash = format!("{:?}", block.header.hash());
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::BestBlockChanged(_)
);
// Valid call with storage at the key.
let expected_value = hex_string(&VALUE);
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
],
)
.await
.unwrap();
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
// Child value set in `setup_api`.
let child_info = hex_string(b"child");
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_value = hex_string(&CHILD_VALUE);
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&genesis_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }],
&child_info
],
)
.await
.unwrap();
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
}
#[tokio::test]
async fn get_storage_wrong_key() {
let (mut _client, api, mut _block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let key = format!("0x{:?}", HexDisplay::from(&KEY));
let key = hex_string(&KEY);
// Key is prefixed by CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(&KEY);
let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key));
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &prefixed_key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }]
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result.is_none());
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
// Key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(&KEY);
let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key));
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &prefixed_key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }]
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result.is_none());
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
// Child key is prefixed by CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(b"child");
let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key));
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key, &prefixed_key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }],
&prefixed_key
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result.is_none());
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
// Child key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(b"child");
let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key));
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key, &prefixed_key])
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key, queue_type: StorageQueryType::Value }],
&prefixed_key
],
)
.await
.unwrap();
let event: ChainHeadEvent<Option<String>> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result.is_none());
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
}
#[tokio::test]
@@ -848,14 +1035,14 @@ async fn follow_with_unpin() {
);
// Unpin an invalid subscription ID must return Ok(()).
let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH));
let invalid_hash = hex_string(&INVALID_HASH);
let _res: () = api
.call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash])
.await
.unwrap();
// Valid subscription with invalid block hash.
let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH));
let invalid_hash = hex_string(&INVALID_HASH);
let err = api
.call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash])
.await