chainHead: Produce method responses on chainHead_follow (#14692)

* chainHead/api: Make storage/body/call pure RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Add mpsc channel between RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Extract mpsc::Sender via BlockGuard

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Generate and provide the method operation ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_body` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_call` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_storage` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Propagate responses of methods to chainHead_follow

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_body` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure unique operation IDs across methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Remove old method events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Return `InvalidBlock` error if pinning fails

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Wrap subscription IDs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure separate operation IDs across subscriptions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: parity-processbot <>
This commit is contained in:
Alexandru Vasile
2023-08-08 21:13:52 +03:00
committed by GitHub
parent 7afff714e4
commit 4849b6e865
10 changed files with 663 additions and 494 deletions
+1 -1
View File
@@ -24,6 +24,7 @@ sp-api = { version = "4.0.0-dev", path = "../../primitives/api" }
sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
sp-version = { version = "22.0.0", path = "../../primitives/version" }
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
codec = { package = "parity-scale-codec", version = "3.6.1" }
thiserror = "1.0"
serde = "1.0"
@@ -44,6 +45,5 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
assert_matches = "1.3.0"
pretty_assertions = "1.2.1"
@@ -19,7 +19,7 @@
#![allow(non_snake_case)]
//! API trait of the chain head.
use crate::chain_head::event::{ChainHeadEvent, FollowEvent, StorageQuery};
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
#[rpc(client, server)]
@@ -47,12 +47,12 @@ pub trait ChainHeadApi<Hash> {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_body",
unsubscribe = "chainHead_unstable_stopBody",
item = ChainHeadEvent<String>,
)]
fn chain_head_unstable_body(&self, follow_subscription: String, hash: Hash);
#[method(name = "chainHead_unstable_body", blocking)]
fn chain_head_unstable_body(
&self,
follow_subscription: String,
hash: Hash,
) -> RpcResult<MethodResponse>;
/// Retrieves the header of a pinned block.
///
@@ -86,36 +86,28 @@ pub trait ChainHeadApi<Hash> {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_storage",
unsubscribe = "chainHead_unstable_stopStorage",
item = ChainHeadEvent<String>,
)]
#[method(name = "chainHead_unstable_storage", blocking)]
fn chain_head_unstable_storage(
&self,
follow_subscription: String,
hash: Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
);
) -> RpcResult<MethodResponse>;
/// Call into the Runtime API at a specified block's state.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_call",
unsubscribe = "chainHead_unstable_stopCall",
item = ChainHeadEvent<String>,
)]
#[method(name = "chainHead_unstable_call", blocking)]
fn chain_head_unstable_call(
&self,
follow_subscription: String,
hash: Hash,
function: String,
call_parameters: String,
);
) -> RpcResult<MethodResponse>;
/// Unpin a block reported by the `follow` method.
///
@@ -18,12 +18,16 @@
//! API implementation for `chainHead`.
use super::{
chain_head_storage::ChainHeadStorage,
event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
};
use crate::{
chain_head::{
api::ChainHeadApiServer,
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent},
event::{FollowEvent, MethodResponse, OperationError, StorageQuery, StorageQueryType},
hex_string,
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
@@ -47,11 +51,6 @@ use sp_core::{traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};
use super::{
chain_head_storage::ChainHeadStorage,
event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType},
};
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
/// An API for chain head RPC calls.
@@ -81,7 +80,6 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend: backend.clone(),
@@ -121,11 +119,8 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
/// Parse hex-encoded string parameter as raw bytes.
///
/// If the parsing fails, the subscription is rejected.
fn parse_hex_param(
sink: &mut SubscriptionSink,
param: String,
) -> Result<Vec<u8>, SubscriptionEmptyError> {
/// If the parsing fails, returns an error propagated to the RPC method.
fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
// Methods can accept empty parameters.
if param.is_empty() {
return Ok(Default::default())
@@ -133,10 +128,7 @@ fn parse_hex_param(
match array_bytes::hex2bytes(&param) {
Ok(bytes) => Ok(bytes),
Err(_) => {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(param));
Err(SubscriptionEmptyError)
},
Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
}
}
@@ -168,7 +160,7 @@ where
},
};
// Keep track of the subscription.
let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime)
let Some(sub_data) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
@@ -190,7 +182,7 @@ where
sub_id.clone(),
);
chain_head_follow.generate_events(sink, rx_stop).await;
chain_head_follow.generate_events(sink, sub_data).await;
subscriptions.remove_subscription(&sub_id);
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
@@ -202,59 +194,57 @@ where
fn chain_head_unstable_body(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
) -> SubscriptionResult {
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return Ok(())
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let fut = async move {
let _block_guard = block_guard;
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
let result = hex_string(&extrinsics.encode());
ChainHeadEvent::Done(ChainHeadResult { result })
},
Ok(None) => {
// The block's body was pruned. This subscription ID has become invalid.
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
&follow_subscription,
hash
);
subscriptions.remove_subscription(&follow_subscription);
ChainHeadEvent::<String>::Disjoint
},
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
};
let _ = sink.send(&event);
let event = match self.client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block
.block
.extrinsics()
.iter()
.map(|extrinsic| hex_string(&extrinsic.encode()))
.collect();
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
operation_id: block_guard.operation_id(),
value: extrinsics,
})
},
Ok(None) => {
// The block's body was pruned. This subscription ID has become invalid.
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
&follow_subscription,
hash
);
self.subscriptions.remove_subscription(&follow_subscription);
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
error: error.to_string(),
}),
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
discarded_items: None,
}))
}
fn chain_head_unstable_header(
@@ -288,128 +278,113 @@ where
fn chain_head_unstable_storage(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> SubscriptionResult {
) -> RpcResult<MethodResponse> {
// Gain control over parameter parsing and returned error.
let items = items
.into_iter()
.map(|query| {
if query.query_type == StorageQueryType::ClosestDescendantMerkleValue {
// Note: remove this once all types are implemented.
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
return Err(ChainHeadRpcError::InvalidParam(
"Storage query type not supported".into(),
));
return Err(SubscriptionEmptyError)
))
}
Ok(StorageQuery {
key: StorageKey(parse_hex_param(&mut sink, query.key)?),
key: StorageKey(parse_hex_param(query.key)?),
query_type: query.query_type,
})
})
.collect::<Result<Vec<_>, _>>()?;
let child_trie = child_trie
.map(|child_trie| parse_hex_param(&mut sink, child_trie))
.map(|child_trie| parse_hex_param(child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadStorageEvent::Disjoint);
return Ok(())
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return Ok(())
},
Err(error) => {
let _ = sink
.send(&ChainHeadStorageEvent::Error(ErrorEvent { error: error.to_string() }));
return Ok(())
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let storage_client = ChainHeadStorage::<Client, Block, BE>::new(client);
let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();
let fut = async move {
let _block_guard = block_guard;
storage_client.generate_events(sink, hash, items, child_trie);
storage_client.generate_events(block_guard, hash, items, child_trie);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
self.executor
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id,
discarded_items: Some(0),
}))
}
fn chain_head_unstable_call(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
function: String,
call_parameters: String,
) -> SubscriptionResult {
let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?);
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return Ok(())
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let fut = async move {
// Reject subscription if with_runtime is false.
if !block_guard.has_runtime() {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".into(),
));
return
}
// Reject subscription if with_runtime is false.
if !block_guard.has_runtime() {
return Err(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".to_string(),
)
.into())
}
let res = client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
let result = hex_string(&result);
ChainHeadEvent::Done(ChainHeadResult { result })
let event = self
.client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
operation_id: block_guard.operation_id(),
output: hex_string(&result),
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
})
.unwrap_or_else(|error| {
FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
error: error.to_string(),
})
});
let _ = sink.send(&res);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
discarded_items: None,
}))
}
fn chain_head_unstable_unpin(
@@ -24,7 +24,7 @@ use crate::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{SubscriptionManagement, SubscriptionManagementError},
subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
@@ -80,6 +80,8 @@ enum NotificationType<Block: BlockT> {
NewBlock(BlockImportNotification<Block>),
/// The finalized block notification obtained from `finality_notification_stream`.
Finalized(FinalityNotification<Block>),
/// The response of `chainHead` method calls.
MethodResponse(FollowEvent<Block::Hash>),
}
/// The initial blocks that should be reported or ignored by the chainHead.
@@ -515,6 +517,7 @@ where
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};
let events = match events {
@@ -572,7 +575,7 @@ where
pub async fn generate_events(
&mut self,
mut sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
sub_data: InsertedSubscriptionData<Block>,
) {
// Register for the new block and finalized notifications.
let stream_import = self
@@ -585,6 +588,10 @@ where
.finality_notification_stream()
.map(|notification| NotificationType::Finalized(notification));
let stream_responses = sub_data
.response_receiver
.map(|response| NotificationType::MethodResponse(response));
let startup_point = StartupPoint::from(self.client.info());
let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) {
Ok(blocks) => blocks,
@@ -602,9 +609,10 @@ where
let initial = NotificationType::InitialEvents(initial_events);
let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
let stream = stream::once(futures::future::ready(initial)).chain(merged);
self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, rx_stop)
self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
}
}
@@ -20,17 +20,21 @@
use std::{marker::PhantomData, sync::Arc};
use jsonrpsee::SubscriptionSink;
use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::BlockT;
use sp_core::storage::well_known_keys;
use crate::chain_head::event::OperationStorageItems;
use super::{
event::{
ChainHeadStorageEvent, ItemsEvent, StorageQuery, StorageQueryType, StorageResult,
OperationError, OperationId, StorageQuery, StorageQueryType, StorageResult,
StorageResultType,
},
hex_string, ErrorEvent,
hex_string,
subscription::BlockGuard,
FollowEvent,
};
/// The maximum number of items the `chainHead_storage` can return
@@ -70,10 +74,10 @@ fn is_key_queryable(key: &[u8]) -> bool {
}
/// The result of making a query call.
type QueryResult = Result<Option<StorageResult>, ChainHeadStorageEvent>;
type QueryResult = Result<Option<StorageResult>, String>;
/// The result of iterating over keys.
type QueryIterResult = Result<Vec<StorageResult>, ChainHeadStorageEvent>;
type QueryIterResult = Result<Vec<StorageResult>, String>;
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
@@ -101,11 +105,7 @@ where
result: StorageResultType::Value(hex_string(&storage_data.0)),
}))
})
.unwrap_or_else(|err| {
QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent {
error: err.to_string(),
}))
})
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
}
/// Fetch the hash of a value from storage.
@@ -128,11 +128,7 @@ where
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
}))
})
.unwrap_or_else(|err| {
QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent {
error: err.to_string(),
}))
})
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
}
/// Handle iterating over (key, value) or (key, hash) pairs.
@@ -148,7 +144,7 @@ where
} else {
self.client.storage_keys(hash, Some(key), None)
}
.map_err(|err| ChainHeadStorageEvent::Error(ErrorEvent { error: err.to_string() }))?;
.map_err(|error| error.to_string())?;
let mut ret = Vec::with_capacity(MAX_ITER_ITEMS);
let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS);
@@ -169,14 +165,31 @@ where
/// Generate the block events for the `chainHead_storage` method.
pub fn generate_events(
&self,
mut sink: SubscriptionSink,
block_guard: BlockGuard<Block, BE>,
hash: Block::Hash,
items: Vec<StorageQuery<StorageKey>>,
child_key: Option<ChildInfo>,
) {
/// Build and send the opaque error back to the `chainHead_follow` method.
fn send_error<Block: BlockT>(
sender: &TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: String,
error: String,
) {
let _ =
sender.unbounded_send(FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id,
error,
}));
}
let sender = block_guard.response_sender();
if let Some(child_key) = child_key.as_ref() {
if !is_key_queryable(child_key.storage_key()) {
let _ = sink.send(&ChainHeadStorageEvent::Done);
let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(
OperationId { operation_id: block_guard.operation_id() },
));
return
}
}
@@ -192,8 +205,8 @@ where
match self.query_storage_value(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
Err(error) => {
send_error::<Block>(&sender, block_guard.operation_id(), error);
return
},
}
@@ -202,8 +215,8 @@ where
match self.query_storage_hash(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
Err(error) => {
send_error::<Block>(&sender, block_guard.operation_id(), error);
return
},
},
@@ -214,8 +227,8 @@ where
IterQueryType::Value,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
Err(error) => {
send_error::<Block>(&sender, block_guard.operation_id(), error);
return
},
},
@@ -226,8 +239,8 @@ where
IterQueryType::Hash,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
Err(error) => {
send_error::<Block>(&sender, block_guard.operation_id(), error);
return
},
},
@@ -236,10 +249,17 @@ where
}
if !storage_results.is_empty() {
let event = ChainHeadStorageEvent::Items(ItemsEvent { items: storage_results });
let _ = sink.send(&event);
let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
OperationStorageItems {
operation_id: block_guard.operation_id(),
items: storage_results,
},
));
}
let _ = sink.send(&ChainHeadStorageEvent::Done);
let _ =
sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(OperationId {
operation_id: block_guard.operation_id(),
}));
}
}
@@ -276,31 +276,6 @@ pub enum FollowEvent<Hash> {
Stop,
}
/// The result of a chain head method.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChainHeadResult<T> {
/// Result of the method.
pub result: T,
}
/// The event generated by the body / call / storage methods.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
pub enum ChainHeadEvent<T> {
/// The request completed successfully.
Done(ChainHeadResult<T>),
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible(ErrorEvent),
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
}
/// The storage item received as paramter.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -351,35 +326,6 @@ pub enum StorageResultType {
ClosestDescendantMerkleValue(String),
}
/// The event generated by storage method.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "event")]
pub enum ChainHeadStorageEvent {
/// The request produced multiple result items.
Items(ItemsEvent),
/// The request produced multiple result items.
WaitForContinue,
/// The request completed successfully and all the results were provided.
Done,
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible,
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
}
/// The request produced multiple result items.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ItemsEvent {
/// The resulting items.
pub items: Vec<StorageResult>,
}
/// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -714,56 +660,6 @@ mod tests {
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_done_event() {
let event: ChainHeadEvent<String> =
ChainHeadEvent::Done(ChainHeadResult { result: "A".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"done","result":"A"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_inaccessible_event() {
let event: ChainHeadEvent<String> =
ChainHeadEvent::Inaccessible(ErrorEvent { error: "A".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"inaccessible","error":"A"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_error_event() {
let event: ChainHeadEvent<String> = ChainHeadEvent::Error(ErrorEvent { error: "A".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"error","error":"A"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_disjoint_event() {
let event: ChainHeadEvent<String> = ChainHeadEvent::Disjoint;
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"disjoint"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_storage_query() {
// Item with Value.
@@ -855,68 +751,4 @@ mod tests {
let dec: StorageResult = serde_json::from_str(exp).unwrap();
assert_eq!(dec, item);
}
#[test]
fn chain_head_storage_event() {
// Event with Items.
let event = ChainHeadStorageEvent::Items(ItemsEvent {
items: vec![
StorageResult {
key: "0x1".into(),
result: StorageResultType::Value("first".into()),
},
StorageResult {
key: "0x2".into(),
result: StorageResultType::Hash("second".into()),
},
],
});
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"items","items":[{"key":"0x1","value":"first"},{"key":"0x2","hash":"second"}]}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with WaitForContinue.
let event = ChainHeadStorageEvent::WaitForContinue;
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"wait-for-continue"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with Done.
let event = ChainHeadStorageEvent::Done;
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"done"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with Inaccessible.
let event = ChainHeadStorageEvent::Inaccessible;
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"inaccessible"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
// Event with Inaccessible.
let event = ChainHeadStorageEvent::Error(ErrorEvent { error: "reason".into() });
// Encode
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"error","error":"reason"}"#;
assert_eq!(ser, exp);
// Decode
let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
assert_eq!(dec, event);
}
}
@@ -39,8 +39,8 @@ mod subscription;
pub use api::ChainHeadApiServer;
pub use chain_head::ChainHead;
pub use event::{
BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent,
Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent,
BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
};
use sp_core::hexdisplay::{AsBytesRef, HexDisplay};
@@ -18,6 +18,7 @@
use futures::channel::oneshot;
use sc_client_api::Backend;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap},
@@ -25,7 +26,10 @@ use std::{
time::{Duration, Instant},
};
use crate::chain_head::subscription::SubscriptionManagementError;
use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent};
/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings.
const QUEUE_SIZE_WARNING: usize = 512;
/// The state machine of a block of a single subscription ID.
///
@@ -116,6 +120,12 @@ struct SubscriptionState<Block: BlockT> {
with_runtime: bool,
/// Signals the "Stop" event.
tx_stop: Option<oneshot::Sender<()>>,
/// The sender of message responses to the `chainHead_follow` events.
///
/// This object is cloned between methods.
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
/// The next operation ID.
next_operation_id: usize,
/// Track the block hashes available for this subscription.
///
/// This implementation assumes:
@@ -227,6 +237,13 @@ impl<Block: BlockT> SubscriptionState<Block> {
}
timestamp
}
/// Generate the next operation ID for this subscription.
fn next_operation_id(&mut self) -> usize {
let op_id = self.next_operation_id;
self.next_operation_id = self.next_operation_id.wrapping_add(1);
op_id
}
}
/// Keeps a specific block pinned while the handle is alive.
@@ -235,6 +252,8 @@ impl<Block: BlockT> SubscriptionState<Block> {
pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
hash: Block::Hash,
with_runtime: bool,
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: String,
backend: Arc<BE>,
}
@@ -251,19 +270,37 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
fn new(
hash: Block::Hash,
with_runtime: bool,
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: usize,
backend: Arc<BE>,
) -> Result<Self, SubscriptionManagementError> {
backend
.pin_block(hash)
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
Ok(Self { hash, with_runtime, backend })
Ok(Self {
hash,
with_runtime,
response_sender,
operation_id: operation_id.to_string(),
backend,
})
}
/// The `with_runtime` flag of the subscription.
pub fn has_runtime(&self) -> bool {
self.with_runtime
}
/// Send message responses from the `chainHead` methods to `chainHead_follow`.
pub fn response_sender(&self) -> TracingUnboundedSender<FollowEvent<Block::Hash>> {
self.response_sender.clone()
}
/// The operation ID of this method.
pub fn operation_id(&self) -> String {
self.operation_id.clone()
}
}
impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
@@ -272,6 +309,15 @@ impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
}
}
/// The data propagated back to the `chainHead_follow` method after
/// the subscription is successfully inserted.
pub struct InsertedSubscriptionData<Block: BlockT> {
/// Signal that the subscription must stop.
pub rx_stop: oneshot::Receiver<()>,
/// Receive message responses from the `chainHead` methods.
pub response_receiver: TracingUnboundedReceiver<FollowEvent<Block::Hash>>,
}
pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
/// Reference count the block hashes across all subscriptions.
///
@@ -311,16 +357,21 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
&mut self,
sub_id: String,
with_runtime: bool,
) -> Option<oneshot::Receiver<()>> {
) -> Option<InsertedSubscriptionData<Block>> {
if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
let (tx_stop, rx_stop) = oneshot::channel();
let (response_sender, response_receiver) =
tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING);
let state = SubscriptionState::<Block> {
with_runtime,
tx_stop: Some(tx_stop),
response_sender,
next_operation_id: 0,
blocks: Default::default(),
};
entry.insert(state);
Some(rx_stop)
Some(InsertedSubscriptionData { rx_stop, response_receiver })
} else {
None
}
@@ -491,7 +542,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
sub_id: &str,
hash: Block::Hash,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let Some(sub) = self.subs.get(sub_id) else {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
@@ -499,7 +550,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
return Err(SubscriptionManagementError::BlockHashAbsent)
}
BlockGuard::new(hash, sub.with_runtime, self.backend.clone())
let operation_id = sub.next_operation_id();
BlockGuard::new(
hash,
sub.with_runtime,
sub.response_sender.clone(),
operation_id,
self.backend.clone(),
)
}
}
@@ -604,9 +662,13 @@ mod tests {
#[test]
fn sub_state_register_twice() {
let (response_sender, _response_receiver) =
tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
let mut sub_state = SubscriptionState::<Block> {
with_runtime: false,
tx_stop: None,
response_sender,
next_operation_id: 0,
blocks: Default::default(),
};
@@ -629,9 +691,13 @@ mod tests {
#[test]
fn sub_state_register_unregister() {
let (response_sender, _response_receiver) =
tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
let mut sub_state = SubscriptionState::<Block> {
with_runtime: false,
tx_stop: None,
response_sender,
next_operation_id: 0,
blocks: Default::default(),
};
@@ -921,17 +987,17 @@ mod tests {
let id = "abc".to_string();
let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap();
let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
// Check the stop signal was not received.
let res = rx_stop.try_recv().unwrap();
let res = sub_data.rx_stop.try_recv().unwrap();
assert!(res.is_none());
let sub = subs.subs.get_mut(&id).unwrap();
sub.stop();
// Check the signal was received.
let res = rx_stop.try_recv().unwrap();
let res = sub_data.rx_stop.try_recv().unwrap();
assert!(res.is_some());
}
}
@@ -16,7 +16,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::channel::oneshot;
use parking_lot::RwLock;
use sc_client_api::Backend;
use sp_runtime::traits::Block as BlockT;
@@ -25,9 +24,9 @@ use std::{sync::Arc, time::Duration};
mod error;
mod inner;
use self::inner::SubscriptionsInner;
pub use error::SubscriptionManagementError;
pub use inner::BlockGuard;
use inner::SubscriptionsInner;
pub use inner::{BlockGuard, InsertedSubscriptionData};
/// Manage block pinning / unpinning for subscription IDs.
pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
@@ -61,7 +60,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
&self,
sub_id: String,
runtime_updates: bool,
) -> Option<oneshot::Receiver<()>> {
) -> Option<InsertedSubscriptionData<Block>> {
let mut inner = self.inner.write();
inner.insert_subscription(sub_id, runtime_updates)
}
@@ -1,5 +1,5 @@
use crate::chain_head::{
event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType, StorageResultType},
event::{MethodResponse, StorageQuery, StorageQueryType, StorageResultType},
test_utils::ChainHeadMockClient,
};
@@ -25,7 +25,7 @@ use sp_core::{
Blake2Hasher, Hasher,
};
use sp_version::RuntimeVersion;
use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};
use substrate_test_runtime::Transfer;
use substrate_test_runtime_client::{
prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client,
@@ -330,29 +330,34 @@ async fn get_body() {
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = hex_string(&INVALID_HASH);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
.subscribe("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash])
// Subscription ID is invalid.
let response: MethodResponse = api
.call("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash])
.await
.unwrap();
let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadEvent::<String>::Disjoint);
assert_matches!(response, MethodResponse::LimitReached);
// Valid subscription ID with invalid block hash will error.
// Block hash is invalid.
let err = api
.subscribe("chainHead_unstable_body", [&sub_id, &invalid_hash])
.call::<_, serde_json::Value>("chainHead_unstable_body", [&sub_id, &invalid_hash])
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);
// Obtain valid the body (list of extrinsics).
let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
// Block contains no extrinsics.
assert_matches!(event,
ChainHeadEvent::Done(done) if done.result == "0x00"
// Valid call.
let response: MethodResponse =
api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Response propagated to `chainHead_follow`.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty()
);
// Import a block with extrinsics.
@@ -378,35 +383,41 @@ async fn get_body() {
FollowEvent::BestBlockChanged(_)
);
let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
// Hex encoded scale encoded string for the vector of extrinsics.
let expected = hex_string(&block.extrinsics.encode());
assert_matches!(event,
ChainHeadEvent::Done(done) if done.result == expected
// Valid call to a block with extrinsics.
let response: MethodResponse =
api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Response propagated to `chainHead_follow`.
let expected_tx = hex_string(&block.extrinsics[0].encode());
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value == vec![expected_tx]
);
}
#[tokio::test]
async fn call_runtime() {
let (_client, api, _sub, sub_id, block) = setup_api().await;
let (_client, api, mut block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let invalid_hash = hex_string(&INVALID_HASH);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
.subscribe(
// Subscription ID is invalid.
let response: MethodResponse = api
.call(
"chainHead_unstable_call",
["invalid_sub_id", &block_hash, "BabeApi_current_epoch", "0x00"],
)
.await
.unwrap();
let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadEvent::<String>::Disjoint);
assert_matches!(response, MethodResponse::LimitReached);
// Valid subscription ID with invalid block hash will error.
// Block hash is invalid.
let err = api
.subscribe(
.call::<_, serde_json::Value>(
"chainHead_unstable_call",
[&sub_id, &invalid_hash, "BabeApi_current_epoch", "0x00"],
)
@@ -418,8 +429,9 @@ async fn call_runtime() {
// Pass an invalid parameters that cannot be decode.
let err = api
.subscribe(
.call::<_, serde_json::Value>(
"chainHead_unstable_call",
// 0x0 is invalid.
[&sub_id, &block_hash, "BabeApi_current_epoch", "0x0"],
)
.await
@@ -428,34 +440,43 @@ async fn call_runtime() {
Error::Call(CallError::Custom(ref err)) if err.code() == 2003 && err.message().contains("Invalid parameter")
);
// Valid call.
let alice_id = AccountKeyring::Alice.to_account_id();
// Hex encoded scale encoded bytes representing the call parameters.
let call_parameters = hex_string(&alice_id.encode());
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_call",
[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
)
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Response propagated to `chainHead_follow`.
assert_matches!(
get_next_event::<ChainHeadEvent<String>>(&mut sub).await,
ChainHeadEvent::Done(done) if done.result == "0x0000000000000000"
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
);
// The `current_epoch` takes no parameters and not draining the input buffer
// will cause the execution to fail.
let mut sub = api
.subscribe(
"chainHead_unstable_call",
[&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"],
)
let response: MethodResponse = api
.call("chainHead_unstable_call", [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"])
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Error propagated to `chainHead_follow`.
assert_matches!(
get_next_event::<ChainHeadEvent<String>>(&mut sub).await,
ChainHeadEvent::Error(event) if event.error.contains("Execution failed")
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationError(error) if error.operation_id == operation_id && error.error.contains("Execution failed")
);
}
@@ -501,7 +522,7 @@ async fn call_runtime_without_flag() {
let alice_id = AccountKeyring::Alice.to_account_id();
let call_parameters = hex_string(&alice_id.encode());
let err = api
.subscribe(
.call::<_, serde_json::Value>(
"chainHead_unstable_call",
[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
)
@@ -520,9 +541,9 @@ async fn get_storage_hash() {
let invalid_hash = hex_string(&INVALID_HASH);
let key = hex_string(&KEY);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
.subscribe(
// Subscription ID is invalid.
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
"invalid_sub_id",
@@ -532,12 +553,11 @@ async fn get_storage_hash() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadStorageEvent::Disjoint);
assert_matches!(response, MethodResponse::LimitReached);
// Valid subscription ID with invalid block hash will error.
// Block hash is invalid.
let err = api
.subscribe(
.call::<_, serde_json::Value>(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -552,8 +572,8 @@ async fn get_storage_hash() {
);
// Valid call without storage at the key.
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -563,9 +583,15 @@ async fn get_storage_hash() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key does not have any value associated.
assert_matches!(event, ChainHeadStorageEvent::Done);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Import a new block with storage changes.
let mut builder = client.new_block(Default::default()).unwrap();
@@ -585,9 +611,8 @@ async fn get_storage_hash() {
);
// Valid call with storage at the key.
let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -597,17 +622,30 @@ async fn get_storage_hash() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash));
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Child value set in `setup_api`.
let child_info = hex_string(&CHILD_STORAGE_KEY);
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
let mut sub = api
.subscribe(
// Valid call with storage at the key.
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -618,10 +656,22 @@ async fn get_storage_hash() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash));
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
}
#[tokio::test]
@@ -647,10 +697,8 @@ async fn get_storage_multi_query_iter() {
);
// Valid call with storage at the key.
let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
let expected_value = hex_string(&VALUE);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -669,22 +717,34 @@ async fn get_storage_multi_query_iter() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 &&
res.items[0].key == key &&
res.items[1].key == key &&
res.items[0].result == StorageResultType::Hash(expected_hash) &&
res.items[1].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
let expected_value = hex_string(&VALUE);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 2 &&
res.items[0].key == key &&
res.items[1].key == key &&
res.items[0].result == StorageResultType::Hash(expected_hash) &&
res.items[1].result == StorageResultType::Value(expected_value)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Child value set in `setup_api`.
let child_info = hex_string(&CHILD_STORAGE_KEY);
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
let expected_value = hex_string(&CHILD_VALUE);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -704,14 +764,24 @@ async fn get_storage_multi_query_iter() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 &&
res.items[0].key == key &&
res.items[1].key == key &&
res.items[0].result == StorageResultType::Hash(expected_hash) &&
res.items[1].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 2 &&
res.items[0].key == key &&
res.items[1].key == key &&
res.items[0].result == StorageResultType::Hash(expected_hash) &&
res.items[1].result == StorageResultType::Value(expected_value)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
}
#[tokio::test]
@@ -721,9 +791,9 @@ async fn get_storage_value() {
let invalid_hash = hex_string(&INVALID_HASH);
let key = hex_string(&KEY);
// Subscription ID is stale the disjoint event is emitted.
let mut sub = api
.subscribe(
// Subscription ID is invalid.
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
"invalid_sub_id",
@@ -733,12 +803,11 @@ async fn get_storage_value() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_eq!(event, ChainHeadStorageEvent::Disjoint);
assert_matches!(response, MethodResponse::LimitReached);
// Valid subscription ID with invalid block hash will error.
// Block hash is invalid.
let err = api
.subscribe(
.call::<_, serde_json::Value>(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -753,8 +822,8 @@ async fn get_storage_value() {
);
// Valid call without storage at the key.
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -764,9 +833,15 @@ async fn get_storage_value() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key does not have any value associated.
assert_matches!(event, ChainHeadStorageEvent::Done);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Import a new block with storage changes.
let mut builder = client.new_block(Default::default()).unwrap();
@@ -786,9 +861,8 @@ async fn get_storage_value() {
);
// Valid call with storage at the key.
let expected_value = hex_string(&VALUE);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -798,17 +872,29 @@ async fn get_storage_value() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
let expected_value = hex_string(&VALUE);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Child value set in `setup_api`.
let child_info = hex_string(b"child");
let child_info = hex_string(&CHILD_STORAGE_KEY);
let genesis_hash = format!("{:?}", client.genesis_hash());
let expected_value = hex_string(&CHILD_VALUE);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -819,15 +905,28 @@ async fn get_storage_value() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value));
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
let expected_value = hex_string(&CHILD_VALUE);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
}
#[tokio::test]
async fn get_storage_wrong_key() {
let (mut _client, api, mut _block_sub, sub_id, block) = setup_api().await;
async fn get_storage_non_queryable_key() {
let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let key = hex_string(&KEY);
@@ -835,8 +934,9 @@ async fn get_storage_wrong_key() {
let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(&KEY);
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -846,15 +946,22 @@ async fn get_storage_wrong_key() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key is not queryable.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(&KEY);
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -864,15 +971,22 @@ async fn get_storage_wrong_key() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key is not queryable.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Child key is prefixed by CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(b"child");
prefixed_key.extend_from_slice(CHILD_STORAGE_KEY);
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -883,15 +997,22 @@ async fn get_storage_wrong_key() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key is not queryable.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Child key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX.
let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec();
prefixed_key.extend_from_slice(b"child");
prefixed_key.extend_from_slice(CHILD_STORAGE_KEY);
let prefixed_key = hex_string(&prefixed_key);
let mut sub = api
.subscribe(
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
@@ -902,8 +1023,164 @@ async fn get_storage_wrong_key() {
)
.await
.unwrap();
let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
assert_matches!(event, ChainHeadStorageEvent::Done);
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key is not queryable.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
}
#[tokio::test]
async fn unique_operation_ids() {
let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await;
let block_hash = format!("{:?}", block.header.hash());
let mut op_ids = HashSet::new();
// Ensure that operation IDs are unique for multiple method calls.
for _ in 0..5 {
// Valid `chainHead_unstable_body` call.
let response: MethodResponse =
api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty()
);
// Ensure uniqueness.
assert!(op_ids.insert(operation_id));
// Valid `chainHead_unstable_storage` call.
let key = hex_string(&KEY);
let response: MethodResponse = api
.call(
"chainHead_unstable_storage",
rpc_params![
&sub_id,
&block_hash,
vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }]
],
)
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The `Done` event is generated directly since the key does not have any value associated.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// Ensure uniqueness.
assert!(op_ids.insert(operation_id));
// Valid `chainHead_unstable_call` call.
let alice_id = AccountKeyring::Alice.to_account_id();
let call_parameters = hex_string(&alice_id.encode());
let response: MethodResponse = api
.call(
"chainHead_unstable_call",
[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
)
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Response propagated to `chainHead_follow`.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut block_sub).await,
FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
);
// Ensure uniqueness.
assert!(op_ids.insert(operation_id));
}
}
#[tokio::test]
async fn separate_operation_ids_for_subscriptions() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());
let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
// Create two separate subscriptions.
let mut sub_first = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id_first = sub_first.subscription_id();
let sub_id_first = serde_json::to_string(&sub_id_first).unwrap();
let mut sub_second = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id_second = sub_second.subscription_id();
let sub_id_second = serde_json::to_string(&sub_id_second).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
let block_hash = format!("{:?}", block.header.hash());
// Ensure the imported block is propagated and pinned.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub_first).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub_first).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub_first).await,
FollowEvent::BestBlockChanged(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub_second).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub_second).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub_second).await,
FollowEvent::BestBlockChanged(_)
);
// Each `chainHead_follow` subscription receives a separate operation ID.
let response: MethodResponse =
api.call("chainHead_unstable_body", [&sub_id_first, &block_hash]).await.unwrap();
let operation_id: String = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
assert_eq!(operation_id, "0");
let response: MethodResponse = api
.call("chainHead_unstable_body", [&sub_id_second, &block_hash])
.await
.unwrap();
let operation_id_second: String = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// The second subscription does not increment the operation ID of the first one.
assert_eq!(operation_id_second, "0");
}
#[tokio::test]