chainHead_storage: Iterate over keys (#14628)

* chainHead: Iterate over key,values and key,hashes

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Multi query with iteration over keys

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Fix typo in StorageQuery

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Take 10 from key iterator

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: parity-processbot <>
This commit is contained in:
Alexandru Vasile
2023-07-25 18:43:04 +03:00
committed by GitHub
parent 59d8b86450
commit 00787a10e9
4 changed files with 217 additions and 67 deletions
@@ -300,9 +300,7 @@ where
let items = items
.into_iter()
.map(|query| {
if query.queue_type != StorageQueryType::Value &&
query.queue_type != StorageQueryType::Hash
{
if query.query_type == StorageQueryType::ClosestDescendantMerkleValue {
// Note: remove this once all types are implemented.
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"Storage query type not supported".into(),
@@ -312,7 +310,7 @@ where
Ok(StorageQuery {
key: StorageKey(parse_hex_param(&mut sink, query.key)?),
queue_type: query.queue_type,
query_type: query.query_type,
})
})
.collect::<Result<Vec<_>, _>>()?;
@@ -33,6 +33,18 @@ use super::{
hex_string, ErrorEvent,
};
/// The maximum number of items the `chainHead_storage` can return
/// before paginations is required.
const MAX_ITER_ITEMS: usize = 10;
/// The query type of an interation.
enum IterQueryType {
/// Iterating over (key, value) pairs.
Value,
/// Iterating over (key, hash) pairs.
Hash,
}
/// Generates the events of the `chainHead_storage` method.
pub struct ChainHeadStorage<Client, Block, BE> {
/// Substrate client.
@@ -58,7 +70,10 @@ fn is_key_queryable(key: &[u8]) -> bool {
}
/// The result of making a query call.
type QueryResult = Result<StorageResult<String>, ChainHeadStorageEvent<String>>;
type QueryResult = Result<Option<StorageResult<String>>, ChainHeadStorageEvent<String>>;
/// The result of iterating over keys.
type QueryIterResult = Result<Vec<StorageResult<String>>, ChainHeadStorageEvent<String>>;
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
@@ -72,7 +87,7 @@ where
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage(hash, child_key, key)
} else {
@@ -81,17 +96,15 @@ where
result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
})
})
QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
}))
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
}))
})
}
@@ -101,7 +114,7 @@ where
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage_hash(hash, child_key, key)
} else {
@@ -110,36 +123,49 @@ where
result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
})
})
QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
}))
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
}))
})
}
/// Make the storage query.
fn query_storage(
/// Handle iterating over (key, value) or (key, hash) pairs.
fn query_storage_iter(
&self,
hash: Block::Hash,
query: &StorageQuery<StorageKey>,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
if !is_key_queryable(&query.key.0) {
return None
ty: IterQueryType,
) -> QueryIterResult {
let keys_iter = if let Some(child_key) = child_key {
self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None)
} else {
self.client.storage_keys(hash, Some(key), None)
}
.map_err(|err| {
ChainHeadStorageEvent::<String>::Error(ErrorEvent { error: err.to_string() })
})?;
let mut ret = Vec::with_capacity(MAX_ITER_ITEMS);
let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS);
while let Some(key) = keys_iter.next() {
let result = match ty {
IterQueryType::Value => self.query_storage_value(hash, &key, child_key),
IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key),
}?;
if let Some(result) = result {
ret.push(result);
}
}
match query.queue_type {
StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key),
StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key),
_ => None,
}
QueryIterResult::Ok(ret)
}
/// Generate the block events for the `chainHead_storage` method.
@@ -159,19 +185,56 @@ where
let mut storage_results = Vec::with_capacity(items.len());
for item in items {
let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else {
continue
};
match result {
QueryResult::Ok(storage_result) => storage_results.push(storage_result),
QueryResult::Err(event) => {
let _ = sink.send(&event);
// If an error is encountered for any of the query items
// do not produce any other events.
return
},
if !is_key_queryable(&item.key.0) {
continue
}
match item.query_type {
StorageQueryType::Value => {
match self.query_storage_value(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
return
},
}
},
StorageQueryType::Hash =>
match self.query_storage_hash(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
return
},
},
StorageQueryType::DescendantsValues => match self.query_storage_iter(
hash,
&item.key,
child_key.as_ref(),
IterQueryType::Value,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
return
},
},
StorageQueryType::DescendantsHashes => match self.query_storage_iter(
hash,
&item.key,
child_key.as_ref(),
IterQueryType::Hash,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
return
},
},
_ => continue,
};
}
if !storage_results.is_empty() {
@@ -249,7 +249,7 @@ pub struct StorageQuery<Key> {
pub key: Key,
/// The type of the storage query.
#[serde(rename = "type")]
pub queue_type: StorageQueryType,
pub query_type: StorageQueryType,
}
/// The type of the storage query.
@@ -558,7 +558,7 @@ mod tests {
#[test]
fn chain_head_storage_query() {
// Item with Value.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Value };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Value };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"value"}"#;
@@ -568,7 +568,7 @@ mod tests {
assert_eq!(dec, item);
// Item with Hash.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Hash };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Hash };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"hash"}"#;
@@ -578,7 +578,7 @@ mod tests {
assert_eq!(dec, item);
// Item with DescendantsValues.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsValues };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsValues };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-values"}"#;
@@ -588,7 +588,7 @@ mod tests {
assert_eq!(dec, item);
// Item with DescendantsHashes.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsHashes };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsHashes };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-hashes"}"#;
@@ -599,7 +599,7 @@ mod tests {
// Item with Merkle.
let item =
StorageQuery { key: "0x1", queue_type: StorageQueryType::ClosestDescendantMerkleValue };
StorageQuery { key: "0x1", query_type: StorageQueryType::ClosestDescendantMerkleValue };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"closest-descendant-merkle-value"}"#;
@@ -527,7 +527,7 @@ async fn get_storage_hash() {
rpc_params![
"invalid_sub_id",
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }]
],
)
.await
@@ -542,7 +542,7 @@ async fn get_storage_hash() {
rpc_params![
&sub_id,
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }]
],
)
.await
@@ -558,7 +558,7 @@ async fn get_storage_hash() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }]
],
)
.await
@@ -592,7 +592,7 @@ async fn get_storage_hash() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }]
],
)
.await
@@ -606,14 +606,13 @@ async fn get_storage_hash() {
let child_info = hex_string(&CHILD_STORAGE_KEY);
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
println!("Expe: {:?}", expected_hash);
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&genesis_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }],
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }],
&child_info
],
)
@@ -625,6 +624,96 @@ async fn get_storage_hash() {
assert_matches!(event, ChainHeadStorageEvent::Done);
}
#[tokio::test]
async fn get_storage_multi_query_iter() {
let (mut client, api, mut block_sub, sub_id, _) = setup_api().await;
let key = hex_string(&KEY);
// Import a new block with storage changes.
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_storage_change(KEY.to_vec(), Some(VALUE.to_vec())).unwrap();
let block = builder.build().unwrap().block;
let block_hash = format!("{:?}", block.header.hash());
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::BestBlockChanged(_)
);
// Valid call with storage at the key.
let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
let expected_value = hex_string(&VALUE);
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![
StorageQuery {
key: key.clone(),
query_type: StorageQueryType::DescendantsHashes
},
StorageQuery {
key: key.clone(),
query_type: StorageQueryType::DescendantsValues
}
]
],
)
.await
.unwrap();
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 2 &&
res.items[0].key == key &&
res.items[1].key == key &&
res.items[0].result == StorageResultType::Hash(expected_hash) &&
res.items[1].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
// Child value set in `setup_api`.
let child_info = hex_string(&CHILD_STORAGE_KEY);
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
let expected_value = hex_string(&CHILD_VALUE);
let mut sub = api
.subscribe(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&genesis_hash,
vec![
StorageQuery {
key: key.clone(),
query_type: StorageQueryType::DescendantsHashes
},
StorageQuery {
key: key.clone(),
query_type: StorageQueryType::DescendantsValues
}
],
&child_info
],
)
.await
.unwrap();
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 2 &&
res.items[0].key == key &&
res.items[1].key == key &&
res.items[0].result == StorageResultType::Hash(expected_hash) &&
res.items[1].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
}
#[tokio::test]
async fn get_storage_value() {
let (mut client, api, mut block_sub, sub_id, block) = setup_api().await;
@@ -639,7 +728,7 @@ async fn get_storage_value() {
rpc_params![
"invalid_sub_id",
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }]
],
)
.await
@@ -654,7 +743,7 @@ async fn get_storage_value() {
rpc_params![
&sub_id,
&invalid_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }]
],
)
.await
@@ -670,7 +759,7 @@ async fn get_storage_value() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }]
],
)
.await
@@ -704,7 +793,7 @@ async fn get_storage_value() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }]
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }]
],
)
.await
@@ -724,7 +813,7 @@ async fn get_storage_value() {
rpc_params![
&sub_id,
&genesis_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }],
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }],
&child_info
],
)
@@ -752,7 +841,7 @@ async fn get_storage_wrong_key() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }]
vec![StorageQuery { key: prefixed_key, query_type: StorageQueryType::Value }]
],
)
.await
@@ -770,7 +859,7 @@ async fn get_storage_wrong_key() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }]
vec![StorageQuery { key: prefixed_key, query_type: StorageQueryType::Value }]
],
)
.await
@@ -788,7 +877,7 @@ async fn get_storage_wrong_key() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }],
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }],
&prefixed_key
],
)
@@ -807,7 +896,7 @@ async fn get_storage_wrong_key() {
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key, queue_type: StorageQueryType::Value }],
vec![StorageQuery { key, query_type: StorageQueryType::Value }],
&prefixed_key
],
)