mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-07-05 10:57:24 +00:00
archive: Implement archive_unstable_storage (#1846)
This PR implements the `archive_unstable_storage` method that offers support for: - fetching values - fetching hashes - iterating over keys and values - iterating over keys and hashes - fetching merkle values from the trie-db A common component dedicated to RPC-V2 storage queries is created to bridge the gap between `chainHead/storage` and `archive/storage`. Query pagination is supported by `paginationStartKey`, similar to the old APIs. Similarly to the `chainHead/storage`, the `archive/storage` method accepts a maximum number of queried items. The design builds upon: https://github.com/paritytech/json-rpc-interface-spec/pull/94. Closes https://github.com/paritytech/polkadot-sdk/issues/1512. cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
@@ -19,7 +19,10 @@
|
||||
#![allow(non_snake_case)]
|
||||
|
||||
//! API trait of the chain head.
|
||||
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
|
||||
use crate::{
|
||||
chain_head::event::{FollowEvent, MethodResponse},
|
||||
common::events::StorageQuery,
|
||||
};
|
||||
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
|
||||
use sp_rpc::list::ListOrValue;
|
||||
|
||||
|
||||
@@ -27,11 +27,11 @@ use crate::{
|
||||
api::ChainHeadApiServer,
|
||||
chain_head_follow::ChainHeadFollower,
|
||||
error::Error as ChainHeadRpcError,
|
||||
event::{FollowEvent, MethodResponse, OperationError, StorageQuery},
|
||||
hex_string,
|
||||
event::{FollowEvent, MethodResponse, OperationError},
|
||||
subscription::{SubscriptionManagement, SubscriptionManagementError},
|
||||
},
|
||||
SubscriptionTaskExecutor,
|
||||
common::events::StorageQuery,
|
||||
hex_string, SubscriptionTaskExecutor,
|
||||
};
|
||||
use codec::Encode;
|
||||
use futures::future::FutureExt;
|
||||
|
||||
@@ -22,33 +22,24 @@ use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
|
||||
|
||||
use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
|
||||
use sc_utils::mpsc::TracingUnboundedSender;
|
||||
use sp_core::storage::well_known_keys;
|
||||
use sp_runtime::traits::Block as BlockT;
|
||||
|
||||
use crate::chain_head::event::OperationStorageItems;
|
||||
|
||||
use super::{
|
||||
event::{
|
||||
OperationError, OperationId, StorageQuery, StorageQueryType, StorageResult,
|
||||
StorageResultType,
|
||||
use crate::{
|
||||
chain_head::{
|
||||
event::{OperationError, OperationId, OperationStorageItems},
|
||||
subscription::BlockGuard,
|
||||
FollowEvent,
|
||||
},
|
||||
common::{
|
||||
events::{StorageQuery, StorageQueryType},
|
||||
storage::{IterQueryType, QueryIter, QueryIterResult, Storage},
|
||||
},
|
||||
hex_string,
|
||||
subscription::BlockGuard,
|
||||
FollowEvent,
|
||||
};
|
||||
|
||||
/// The query type of an interation.
|
||||
enum IterQueryType {
|
||||
/// Iterating over (key, value) pairs.
|
||||
Value,
|
||||
/// Iterating over (key, hash) pairs.
|
||||
Hash,
|
||||
}
|
||||
|
||||
/// Generates the events of the `chainHead_storage` method.
|
||||
pub struct ChainHeadStorage<Client, Block, BE> {
|
||||
/// Substrate client.
|
||||
client: Arc<Client>,
|
||||
/// Storage client.
|
||||
client: Storage<Client, Block, BE>,
|
||||
/// Queue of operations that may require pagination.
|
||||
iter_operations: VecDeque<QueryIter>,
|
||||
/// The maximum number of items reported by the `chainHead_storage` before
|
||||
@@ -61,7 +52,7 @@ impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
|
||||
/// Constructs a new [`ChainHeadStorage`].
|
||||
pub fn new(client: Arc<Client>, operation_max_storage_items: usize) -> Self {
|
||||
Self {
|
||||
client,
|
||||
client: Storage::new(client),
|
||||
iter_operations: VecDeque::new(),
|
||||
operation_max_storage_items,
|
||||
_phandom: PhantomData,
|
||||
@@ -69,163 +60,12 @@ impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Query to iterate over storage.
|
||||
struct QueryIter {
|
||||
/// The key from which the iteration was started.
|
||||
query_key: StorageKey,
|
||||
/// The key after which pagination should resume.
|
||||
pagination_start_key: Option<StorageKey>,
|
||||
/// The type of the query (either value or hash).
|
||||
ty: IterQueryType,
|
||||
}
|
||||
|
||||
/// Checks if the provided key (main or child key) is valid
|
||||
/// for queries.
|
||||
///
|
||||
/// Keys that are identical to `:child_storage:` or `:child_storage:default:`
|
||||
/// are not queryable.
|
||||
fn is_key_queryable(key: &[u8]) -> bool {
|
||||
!well_known_keys::is_default_child_storage_key(key) &&
|
||||
!well_known_keys::is_child_storage_key(key)
|
||||
}
|
||||
|
||||
/// The result of making a query call.
|
||||
type QueryResult = Result<Option<StorageResult>, String>;
|
||||
|
||||
/// The result of iterating over keys.
|
||||
type QueryIterResult = Result<(Vec<StorageResult>, Option<QueryIter>), String>;
|
||||
|
||||
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
|
||||
where
|
||||
Block: BlockT + 'static,
|
||||
BE: Backend<Block> + 'static,
|
||||
Client: StorageProvider<Block, BE> + 'static,
|
||||
{
|
||||
/// Fetch the value from storage.
|
||||
fn query_storage_value(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
key: &StorageKey,
|
||||
child_key: Option<&ChildInfo>,
|
||||
) -> QueryResult {
|
||||
let result = if let Some(child_key) = child_key {
|
||||
self.client.child_storage(hash, child_key, key)
|
||||
} else {
|
||||
self.client.storage(hash, key)
|
||||
};
|
||||
|
||||
result
|
||||
.map(|opt| {
|
||||
QueryResult::Ok(opt.map(|storage_data| StorageResult {
|
||||
key: hex_string(&key.0),
|
||||
result: StorageResultType::Value(hex_string(&storage_data.0)),
|
||||
}))
|
||||
})
|
||||
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
|
||||
}
|
||||
|
||||
/// Fetch the hash of a value from storage.
|
||||
fn query_storage_hash(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
key: &StorageKey,
|
||||
child_key: Option<&ChildInfo>,
|
||||
) -> QueryResult {
|
||||
let result = if let Some(child_key) = child_key {
|
||||
self.client.child_storage_hash(hash, child_key, key)
|
||||
} else {
|
||||
self.client.storage_hash(hash, key)
|
||||
};
|
||||
|
||||
result
|
||||
.map(|opt| {
|
||||
QueryResult::Ok(opt.map(|storage_data| StorageResult {
|
||||
key: hex_string(&key.0),
|
||||
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
|
||||
}))
|
||||
})
|
||||
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
|
||||
}
|
||||
|
||||
/// Fetch the closest merkle value.
|
||||
fn query_storage_merkle_value(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
key: &StorageKey,
|
||||
child_key: Option<&ChildInfo>,
|
||||
) -> QueryResult {
|
||||
let result = if let Some(child_key) = child_key {
|
||||
self.client.child_closest_merkle_value(hash, child_key, key)
|
||||
} else {
|
||||
self.client.closest_merkle_value(hash, key)
|
||||
};
|
||||
|
||||
result
|
||||
.map(|opt| {
|
||||
QueryResult::Ok(opt.map(|storage_data| {
|
||||
let result = match &storage_data {
|
||||
sc_client_api::MerkleValue::Node(data) => hex_string(&data.as_slice()),
|
||||
sc_client_api::MerkleValue::Hash(hash) => hex_string(&hash.as_ref()),
|
||||
};
|
||||
|
||||
StorageResult {
|
||||
key: hex_string(&key.0),
|
||||
result: StorageResultType::ClosestDescendantMerkleValue(result),
|
||||
}
|
||||
}))
|
||||
})
|
||||
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
|
||||
}
|
||||
|
||||
/// Iterate over at most `operation_max_storage_items` keys.
|
||||
///
|
||||
/// Returns the storage result with a potential next key to resume iteration.
|
||||
fn query_storage_iter_pagination(
|
||||
&self,
|
||||
query: QueryIter,
|
||||
hash: Block::Hash,
|
||||
child_key: Option<&ChildInfo>,
|
||||
) -> QueryIterResult {
|
||||
let QueryIter { ty, query_key, pagination_start_key } = query;
|
||||
|
||||
let mut keys_iter = if let Some(child_key) = child_key {
|
||||
self.client.child_storage_keys(
|
||||
hash,
|
||||
child_key.to_owned(),
|
||||
Some(&query_key),
|
||||
pagination_start_key.as_ref(),
|
||||
)
|
||||
} else {
|
||||
self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref())
|
||||
}
|
||||
.map_err(|err| err.to_string())?;
|
||||
|
||||
let mut ret = Vec::with_capacity(self.operation_max_storage_items);
|
||||
let mut next_pagination_key = None;
|
||||
for _ in 0..self.operation_max_storage_items {
|
||||
let Some(key) = keys_iter.next() else { break };
|
||||
|
||||
next_pagination_key = Some(key.clone());
|
||||
|
||||
let result = match ty {
|
||||
IterQueryType::Value => self.query_storage_value(hash, &key, child_key),
|
||||
IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key),
|
||||
}?;
|
||||
|
||||
if let Some(value) = result {
|
||||
ret.push(value);
|
||||
}
|
||||
}
|
||||
|
||||
// Save the next key if any to continue the iteration.
|
||||
let maybe_next_query = keys_iter.next().map(|_| QueryIter {
|
||||
ty,
|
||||
query_key,
|
||||
pagination_start_key: next_pagination_key,
|
||||
});
|
||||
Ok((ret, maybe_next_query))
|
||||
}
|
||||
|
||||
/// Iterate over (key, hash) and (key, value) generating the `WaitingForContinue` event if
|
||||
/// necessary.
|
||||
async fn generate_storage_iter_events(
|
||||
@@ -242,7 +82,12 @@ where
|
||||
return
|
||||
}
|
||||
|
||||
let result = self.query_storage_iter_pagination(query, hash, child_key.as_ref());
|
||||
let result = self.client.query_iter_pagination(
|
||||
query,
|
||||
hash,
|
||||
child_key.as_ref(),
|
||||
self.operation_max_storage_items,
|
||||
);
|
||||
let (events, maybe_next_query) = match result {
|
||||
QueryIterResult::Ok(result) => result,
|
||||
QueryIterResult::Err(error) => {
|
||||
@@ -294,24 +139,11 @@ where
|
||||
let sender = block_guard.response_sender();
|
||||
let operation = block_guard.operation();
|
||||
|
||||
if let Some(child_key) = child_key.as_ref() {
|
||||
if !is_key_queryable(child_key.storage_key()) {
|
||||
let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(
|
||||
OperationId { operation_id: operation.operation_id() },
|
||||
));
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
let mut storage_results = Vec::with_capacity(items.len());
|
||||
for item in items {
|
||||
if !is_key_queryable(&item.key.0) {
|
||||
continue
|
||||
}
|
||||
|
||||
match item.query_type {
|
||||
StorageQueryType::Value => {
|
||||
match self.query_storage_value(hash, &item.key, child_key.as_ref()) {
|
||||
match self.client.query_value(hash, &item.key, child_key.as_ref()) {
|
||||
Ok(Some(value)) => storage_results.push(value),
|
||||
Ok(None) => continue,
|
||||
Err(error) => {
|
||||
@@ -321,7 +153,7 @@ where
|
||||
}
|
||||
},
|
||||
StorageQueryType::Hash =>
|
||||
match self.query_storage_hash(hash, &item.key, child_key.as_ref()) {
|
||||
match self.client.query_hash(hash, &item.key, child_key.as_ref()) {
|
||||
Ok(Some(value)) => storage_results.push(value),
|
||||
Ok(None) => continue,
|
||||
Err(error) => {
|
||||
@@ -330,7 +162,7 @@ where
|
||||
},
|
||||
},
|
||||
StorageQueryType::ClosestDescendantMerkleValue =>
|
||||
match self.query_storage_merkle_value(hash, &item.key, child_key.as_ref()) {
|
||||
match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) {
|
||||
Ok(Some(value)) => storage_results.push(value),
|
||||
Ok(None) => continue,
|
||||
Err(error) => {
|
||||
|
||||
@@ -23,6 +23,8 @@ use sp_api::ApiError;
|
||||
use sp_version::RuntimeVersion;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::common::events::StorageResult;
|
||||
|
||||
/// The operation could not be processed due to an error.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -313,56 +315,6 @@ pub enum FollowEvent<Hash> {
|
||||
Stop,
|
||||
}
|
||||
|
||||
/// The storage item received as paramter.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StorageQuery<Key> {
|
||||
/// The provided key.
|
||||
pub key: Key,
|
||||
/// The type of the storage query.
|
||||
#[serde(rename = "type")]
|
||||
pub query_type: StorageQueryType,
|
||||
}
|
||||
|
||||
/// The type of the storage query.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum StorageQueryType {
|
||||
/// Fetch the value of the provided key.
|
||||
Value,
|
||||
/// Fetch the hash of the value of the provided key.
|
||||
Hash,
|
||||
/// Fetch the closest descendant merkle value.
|
||||
ClosestDescendantMerkleValue,
|
||||
/// Fetch the values of all descendants of they provided key.
|
||||
DescendantsValues,
|
||||
/// Fetch the hashes of the values of all descendants of they provided key.
|
||||
DescendantsHashes,
|
||||
}
|
||||
|
||||
/// The storage result.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StorageResult {
|
||||
/// The hex-encoded key of the result.
|
||||
pub key: String,
|
||||
/// The result of the query.
|
||||
#[serde(flatten)]
|
||||
pub result: StorageResultType,
|
||||
}
|
||||
|
||||
/// The type of the storage query.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum StorageResultType {
|
||||
/// Fetch the value of the provided key.
|
||||
Value(String),
|
||||
/// Fetch the hash of the value of the provided key.
|
||||
Hash(String),
|
||||
/// Fetch the closest descendant merkle value.
|
||||
ClosestDescendantMerkleValue(String),
|
||||
}
|
||||
|
||||
/// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -388,6 +340,8 @@ pub struct MethodResponseStarted {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::common::events::StorageResultType;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -697,96 +651,4 @@ mod tests {
|
||||
let event_dec: MethodResponse = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(event_dec, event);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chain_head_storage_query() {
|
||||
// Item with Value.
|
||||
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Value };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","type":"value"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
|
||||
// Item with Hash.
|
||||
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Hash };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","type":"hash"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
|
||||
// Item with DescendantsValues.
|
||||
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsValues };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","type":"descendantsValues"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
|
||||
// Item with DescendantsHashes.
|
||||
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsHashes };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","type":"descendantsHashes"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
|
||||
// Item with Merkle.
|
||||
let item =
|
||||
StorageQuery { key: "0x1", query_type: StorageQueryType::ClosestDescendantMerkleValue };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","type":"closestDescendantMerkleValue"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageQuery<&str> = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chain_head_storage_result() {
|
||||
// Item with Value.
|
||||
let item =
|
||||
StorageResult { key: "0x1".into(), result: StorageResultType::Value("res".into()) };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","value":"res"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageResult = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
|
||||
// Item with Hash.
|
||||
let item =
|
||||
StorageResult { key: "0x1".into(), result: StorageResultType::Hash("res".into()) };
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","hash":"res"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageResult = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
|
||||
// Item with DescendantsValues.
|
||||
let item = StorageResult {
|
||||
key: "0x1".into(),
|
||||
result: StorageResultType::ClosestDescendantMerkleValue("res".into()),
|
||||
};
|
||||
// Encode
|
||||
let ser = serde_json::to_string(&item).unwrap();
|
||||
let exp = r#"{"key":"0x1","closestDescendantMerkleValue":"res"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
// Decode
|
||||
let dec: StorageResult = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(dec, item);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,10 +42,3 @@ pub use event::{
|
||||
BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
|
||||
RuntimeVersionEvent,
|
||||
};
|
||||
|
||||
use sp_core::hexdisplay::{AsBytesRef, HexDisplay};
|
||||
|
||||
/// Util function to print the results of `chianHead` as hex string
|
||||
pub(crate) fn hex_string<Data: AsBytesRef>(data: &Data) -> String {
|
||||
format!("0x{:?}", HexDisplay::from(data))
|
||||
}
|
||||
|
||||
@@ -16,9 +16,10 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::chain_head::{
|
||||
event::{MethodResponse, StorageQuery, StorageQueryType, StorageResultType},
|
||||
test_utils::ChainHeadMockClient,
|
||||
use crate::{
|
||||
chain_head::{event::MethodResponse, test_utils::ChainHeadMockClient},
|
||||
common::events::{StorageQuery, StorageQueryType, StorageResultType},
|
||||
hex_string,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user