rpc: backpressured RPC server (bump jsonrpsee 0.20) (#1313)

This is a rather big change in jsonrpsee, the major things in this bump
are:
- Server backpressure (the subscription impls are modified to deal with
that)
- Allow custom error types / return types (remove jsonrpsee::core::Error
and jsonrpee::core::CallError)
- Bug fixes (graceful shutdown in particular not used by substrate
anyway)
   - Less dependencies for the clients in particular
   - Return type requires Clone in method call responses
   - Moved to tokio channels
   - Async subscription API (not used in this PR)

Major changes in this PR:
- The subscriptions are now bounded and if subscription can't keep up
with the server it is dropped
- CLI: add parameter to configure the jsonrpc server bounded message
buffer (default is 64)
- Add our own subscription helper to deal with the unbounded streams in
substrate

The most important things in this PR to review is the added helpers
functions in `substrate/client/rpc/src/utils.rs` and the rest is pretty
much chore.

Regarding the "bounded buffer limit" it may cause the server to handle
the JSON-RPC calls
slower than before.

The message size limit is bounded by "--rpc-response-size" thus "by
default 10MB * 64 = 640MB"
but the subscription message size is not covered by this limit and could
be capped as well.

Hopefully the last release prior to 1.0, sorry in advance for a big PR

Previous attempt: https://github.com/paritytech/substrate/pull/13992

Resolves https://github.com/paritytech/polkadot-sdk/issues/748, resolves
https://github.com/paritytech/polkadot-sdk/issues/627
This commit is contained in:
Niklas Adolfsson
2024-01-23 09:55:13 +01:00
committed by GitHub
parent 76c37c930b
commit e16ef0861f
117 changed files with 1245 additions and 1090 deletions
@@ -18,10 +18,7 @@
//! Error helpers for `archive` RPC module.
use jsonrpsee::{
core::Error as RpcError,
types::error::{CallError, ErrorObject},
};
use jsonrpsee::types::error::ErrorObject;
/// ChainHead RPC errors.
#[derive(Debug, thiserror::Error)]
@@ -58,9 +55,3 @@ impl From<Error> for ErrorObject<'static> {
.into()
}
}
impl From<Error> for RpcError {
fn from(e: Error) -> Self {
CallError::Custom(e.into()).into()
}
}
@@ -29,10 +29,8 @@ use super::{archive::Archive, *};
use assert_matches::assert_matches;
use codec::{Decode, Encode};
use jsonrpsee::{
core::error::Error,
rpc_params,
types::{error::CallError, EmptyServerParams as EmptyParams},
RpcModule,
core::{EmptyServerParams as EmptyParams, Error},
rpc_params, RpcModule,
};
use sc_block_builder::BlockBuilderBuilder;
use sc_client_api::ChildInfo;
@@ -289,7 +287,7 @@ async fn archive_call() {
)
.await
.unwrap_err();
assert_matches!(err, Error::Call(CallError::Custom(ref err)) if err.code() == 3001 && err.message().contains("Invalid parameter"));
assert_matches!(err, Error::Call(err) if err.code() == 3001 && err.message().contains("Invalid parameter"));
// Pass an invalid parameters that cannot be decode.
let err = api
@@ -300,7 +298,7 @@ async fn archive_call() {
)
.await
.unwrap_err();
assert_matches!(err, Error::Call(CallError::Custom(ref err)) if err.code() == 3001 && err.message().contains("Invalid parameter"));
assert_matches!(err, Error::Call(err) if err.code() == 3001 && err.message().contains("Invalid parameter"));
// Invalid hash.
let result: MethodResult = api
@@ -20,10 +20,13 @@
//! API trait of the chain head.
use crate::{
chain_head::event::{FollowEvent, MethodResponse},
chain_head::{
error::Error,
event::{FollowEvent, MethodResponse},
},
common::events::StorageQuery,
};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use jsonrpsee::proc_macros::rpc;
use sp_rpc::list::ListOrValue;
#[rpc(client, server)]
@@ -56,7 +59,7 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
hash: Hash,
) -> RpcResult<MethodResponse>;
) -> Result<MethodResponse, Error>;
/// Retrieves the header of a pinned block.
///
@@ -75,7 +78,7 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
hash: Hash,
) -> RpcResult<Option<String>>;
) -> Result<Option<String>, Error>;
/// Returns storage entries at a specific block's state.
///
@@ -89,7 +92,7 @@ pub trait ChainHeadApi<Hash> {
hash: Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<MethodResponse>;
) -> Result<MethodResponse, Error>;
/// Call into the Runtime API at a specified block's state.
///
@@ -103,7 +106,7 @@ pub trait ChainHeadApi<Hash> {
hash: Hash,
function: String,
call_parameters: String,
) -> RpcResult<MethodResponse>;
) -> Result<MethodResponse, Error>;
/// Unpin a block or multiple blocks reported by the `follow` method.
///
@@ -120,7 +123,7 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
hash_or_hashes: ListOrValue<Hash>,
) -> RpcResult<()>;
) -> Result<(), Error>;
/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
@@ -133,7 +136,7 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;
) -> Result<(), Error>;
/// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or
/// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If
@@ -147,5 +150,5 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;
) -> Result<(), Error>;
}
@@ -36,15 +36,14 @@ use crate::{
use codec::Encode;
use futures::future::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
types::{SubscriptionEmptyError, SubscriptionId, SubscriptionResult},
SubscriptionSink,
core::async_trait, types::SubscriptionId, PendingSubscriptionSink, SubscriptionSink,
};
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::to_sub_message;
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
@@ -136,27 +135,13 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
_phantom: PhantomData,
}
}
}
/// Accept the subscription and return the subscription ID on success.
fn accept_subscription(
&self,
sink: &mut SubscriptionSink,
) -> Result<String, SubscriptionEmptyError> {
// The subscription must be accepted before it can provide a valid subscription ID.
sink.accept()?;
let Some(sub_id) = sink.subscription_id() else {
// This can only happen if the subscription was not accepted.
return Err(SubscriptionEmptyError)
};
// Get the string representation for the subscription.
let sub_id = match sub_id {
SubscriptionId::Num(num) => num.to_string(),
SubscriptionId::Str(id) => id.into_owned().into(),
};
Ok(sub_id)
/// Helper to convert the `subscription ID` to a string.
pub fn read_subscription_id_as_string(sink: &SubscriptionSink) -> String {
match sink.subscription_id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.into_owned().into(),
}
}
@@ -190,35 +175,28 @@ where
+ StorageProvider<Block, BE>
+ 'static,
{
fn chain_head_unstable_follow(
&self,
mut sink: SubscriptionSink,
with_runtime: bool,
) -> SubscriptionResult {
let sub_id = match self.accept_subscription(&mut sink) {
Ok(sub_id) => sub_id,
Err(err) => {
sink.close(ChainHeadRpcError::InternalError(
"Cannot generate subscription ID".into(),
));
return Err(err)
},
};
// Keep track of the subscription.
let Some(sub_data) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
return Ok(())
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let fut = async move {
let Ok(sink) = pending.accept().await else { return };
let sub_id = read_subscription_id_as_string(&sink);
// Keep track of the subscription.
let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
@@ -234,14 +212,13 @@ where
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_body(
&self,
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
) -> Result<MethodResponse, ChainHeadRpcError> {
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
@@ -293,7 +270,7 @@ where
&self,
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
) -> Result<Option<String>, ChainHeadRpcError> {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
@@ -309,7 +286,6 @@ where
.header(hash)
.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
.map_err(Into::into)
}
fn chain_head_unstable_storage(
@@ -318,7 +294,7 @@ where
hash: Block::Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<MethodResponse> {
) -> Result<MethodResponse, ChainHeadRpcError> {
// Gain control over parameter parsing and returned error.
let items = items
.into_iter()
@@ -376,7 +352,7 @@ where
hash: Block::Hash,
function: String,
call_parameters: String,
) -> RpcResult<MethodResponse> {
) -> Result<MethodResponse, ChainHeadRpcError> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
@@ -420,14 +396,17 @@ where
});
let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: operation_id.clone(),
discarded_items: None,
}))
}
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash_or_hashes: ListOrValue<Block::Hash>,
) -> RpcResult<()> {
) -> Result<(), ChainHeadRpcError> {
let result = match hash_or_hashes {
ListOrValue::Value(hash) =>
self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
@@ -443,9 +422,9 @@ where
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
Err(ChainHeadRpcError::InvalidBlock.into())
Err(ChainHeadRpcError::InvalidBlock)
},
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
Err(_) => Err(ChainHeadRpcError::InvalidBlock),
}
}
@@ -453,7 +432,7 @@ where
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
) -> Result<(), ChainHeadRpcError> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
else {
return Ok(())
@@ -471,7 +450,7 @@ where
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
) -> Result<(), ChainHeadRpcError> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
else {
return Ok(())
@@ -24,7 +24,7 @@ use crate::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
subscription::{SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
@@ -36,6 +36,7 @@ use log::{debug, error};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
@@ -43,6 +44,8 @@ use sp_blockchain::{
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::HashSet, sync::Arc};
use super::subscription::InsertedSubscriptionData;
/// Generates the events of the `chainHead_follow` method.
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
@@ -500,7 +503,7 @@ where
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
mut to_ignore: HashSet<Block::Hash>,
mut sink: SubscriptionSink,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
@@ -529,35 +532,23 @@ where
self.sub_id,
err
);
let _ = sink.send(&FollowEvent::<String>::Stop);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
},
};
for event in events {
let result = sink.send(&event);
// Migration note: the new version of jsonrpsee returns Result<(), DisconnectError>
// The logic from `Err(err)` should be moved when building the new
// `SubscriptionMessage`.
// For now, jsonrpsee returns:
// Ok(true): message sent
// Ok(false): client disconnected or subscription closed
// Err(err): serder serialization error of the event
if let Err(err) = result {
let msg = to_sub_message(&sink, &event);
if let Err(err) = sink.send(msg).await {
// Failed to submit event.
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
);
let _ = sink.send(&FollowEvent::<String>::Stop);
return
}
if let Ok(false) = result {
// Client disconnected or subscription was closed.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
}
}
@@ -568,13 +559,14 @@ where
// If we got here either the substrate streams have closed
// or the `Stop` receiver was triggered.
let _ = sink.send(&FollowEvent::<String>::Stop);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
}
/// Generate the block events for the `chainHead_follow` method.
pub async fn generate_events(
&mut self,
mut sink: SubscriptionSink,
sink: SubscriptionSink,
sub_data: InsertedSubscriptionData<Block>,
) {
// Register for the new block and finalized notifications.
@@ -602,7 +594,8 @@ where
self.sub_id,
err
);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
},
};
@@ -18,10 +18,7 @@
//! Error helpers for `chainHead` RPC module.
use jsonrpsee::{
core::Error as RpcError,
types::error::{CallError, ErrorObject},
};
use jsonrpsee::types::error::ErrorObject;
/// ChainHead RPC errors.
#[derive(Debug, thiserror::Error)]
@@ -81,9 +78,3 @@ impl From<Error> for ErrorObject<'static> {
}
}
}
impl From<Error> for RpcError {
fn from(e: Error) -> Self {
CallError::Custom(e.into()).into()
}
}
@@ -27,10 +27,8 @@ use assert_matches::assert_matches;
use codec::{Decode, Encode};
use futures::Future;
use jsonrpsee::{
core::{error::Error, server::rpc_module::Subscription as RpcSubscription},
rpc_params,
types::error::CallError,
RpcModule,
core::{error::Error, server::Subscription as RpcSubscription},
rpc_params, RpcModule,
};
use sc_block_builder::BlockBuilderBuilder;
use sc_client_api::ChildInfo;
@@ -120,7 +118,7 @@ async fn setup_api() -> (
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -171,7 +169,7 @@ async fn follow_subscription_produces_blocks() {
.into_rpc();
let finalized_hash = client.info().finalized_hash;
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
@@ -239,7 +237,7 @@ async fn follow_with_runtime() {
.into_rpc();
let finalized_hash = client.info().finalized_hash;
let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
@@ -361,7 +359,7 @@ async fn get_header() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Obtain the valid header.
@@ -390,7 +388,7 @@ async fn get_body() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Valid call.
@@ -475,7 +473,7 @@ async fn call_runtime() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Pass an invalid parameters that cannot be decode.
@@ -488,7 +486,7 @@ async fn call_runtime() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message().contains("Invalid parameter")
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message().contains("Invalid parameter")
);
// Valid call.
@@ -550,7 +548,7 @@ async fn call_runtime_without_flag() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -591,7 +589,7 @@ async fn call_runtime_without_flag() {
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_RUNTIME_CALL && err.message().contains("subscription was started with `withRuntime` set to `false`")
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_RUNTIME_CALL && err.message().contains("subscription was started with `withRuntime` set to `false`")
);
}
@@ -629,7 +627,7 @@ async fn get_storage_hash() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Valid call without storage at the key.
@@ -897,7 +895,7 @@ async fn get_storage_value() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Valid call without storage at the key.
@@ -1209,11 +1207,12 @@ async fn separate_operation_ids_for_subscriptions() {
.into_rpc();
// Create two separate subscriptions.
let mut sub_first = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub_first = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id_first = sub_first.subscription_id();
let sub_id_first = serde_json::to_string(&sub_id_first).unwrap();
let mut sub_second = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub_second =
api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id_second = sub_second.subscription_id();
let sub_id_second = serde_json::to_string(&sub_id_second).unwrap();
@@ -1341,7 +1340,7 @@ async fn follow_generates_initial_blocks() {
let block_2_f_hash = block_2_f.header.hash();
client.import(BlockOrigin::Own, block_2_f.clone()).await.unwrap();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
@@ -1450,7 +1449,7 @@ async fn follow_exceeding_pinned_blocks() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
@@ -1526,7 +1525,7 @@ async fn follow_with_unpin() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -1572,7 +1571,7 @@ async fn follow_with_unpin() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// To not exceed the number of pinned blocks, we need to unpin before the next import.
@@ -1637,7 +1636,7 @@ async fn follow_with_multiple_unpin_hashes() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -1721,7 +1720,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
let _res: () = api
@@ -1738,7 +1737,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Unpin multiple blocks.
@@ -1756,7 +1755,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
let err = api
@@ -1767,7 +1766,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
}
@@ -1791,7 +1790,7 @@ async fn follow_prune_best_block() {
.into_rpc();
let finalized_hash = client.info().finalized_hash;
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
@@ -2051,7 +2050,7 @@ async fn follow_forks_pruned_block() {
// Block 2_f and 3_f are not pruned, pruning happens at height (N - 1).
client.finalize_block(block_3_hash, None).unwrap();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
@@ -2205,7 +2204,7 @@ async fn follow_report_multiple_pruned_block() {
let block_3_f = block_builder.build().unwrap().block;
let block_3_f_hash = block_3_f.hash();
client.import(BlockOrigin::Own, block_3_f.clone()).await.unwrap();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
@@ -2388,7 +2387,7 @@ async fn pin_block_references() {
}
}
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -2520,7 +2519,7 @@ async fn follow_finalized_before_new_block() {
let block_1_hash = block_1.header.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Trigger the `FinalizedNotification` for block 1 before the `BlockImportNotification`, and
// expect for the `chainHead` to generate `NewBlock`, `BestBlock` and `Finalized` events.
@@ -2622,7 +2621,7 @@ async fn ensure_operation_limits_works() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -2726,7 +2725,7 @@ async fn check_continue_operation() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -2908,7 +2907,7 @@ async fn stop_storage_operation() {
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::*;
use jsonrpsee::{types::EmptyServerParams as EmptyParams, RpcModule};
use jsonrpsee::{core::EmptyServerParams as EmptyParams, RpcModule};
use sc_chain_spec::Properties;
const CHAIN_NAME: &'static str = "TEST_CHAIN_NAME";
+1 -1
View File
@@ -37,7 +37,7 @@ pub mod transaction;
pub type SubscriptionTaskExecutor = std::sync::Arc<dyn sp_core::traits::SpawnNamed>;
/// The result of an RPC method.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(untagged)]
pub enum MethodResult {
/// Method generated a result.
@@ -29,27 +29,21 @@ use crate::{
},
SubscriptionTaskExecutor,
};
use jsonrpsee::{
core::async_trait,
types::{
error::{CallError, ErrorObject},
SubscriptionResult,
},
SubscriptionSink,
};
use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink};
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
};
use std::sync::Arc;
use sc_rpc::utils::pipe_from_stream;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_runtime::traits::Block as BlockT;
use codec::Decode;
use futures::{FutureExt, StreamExt, TryFutureExt};
use futures::{StreamExt, TryFutureExt};
/// An API for transaction RPC calls.
pub struct Transaction<Pool, Client> {
@@ -90,52 +84,53 @@ where
<Pool::Block as BlockT>::Hash: Unpin,
Client: HeaderBackend<Pool::Block> + ProvideRuntimeApi<Pool::Block> + Send + Sync + 'static,
{
fn submit_and_watch(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult {
// This is the only place where the RPC server can return an error for this
// subscription. Other defects must be signaled as events to the sink.
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
Ok(decoded_extrinsic) => decoded_extrinsic,
Err(e) => {
let err = CallError::Custom(ErrorObject::owned(
BAD_FORMAT,
format!("Extrinsic has invalid format: {}", e),
None::<()>,
));
let _ = sink.reject(err);
return Ok(())
},
};
let best_block_hash = self.client.info().best_hash;
let submit = self
.pool
.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
.map_err(|e| {
e.into_pool_error()
.map(Error::from)
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
});
fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
let client = self.client.clone();
let pool = self.pool.clone();
let fut = async move {
// This is the only place where the RPC server can return an error for this
// subscription. Other defects must be signaled as events to the sink.
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
Ok(decoded_extrinsic) => decoded_extrinsic,
Err(e) => {
let err = ErrorObject::owned(
BAD_FORMAT,
format!("Extrinsic has invalid format: {}", e),
None::<()>,
);
let _ = pending.reject(err).await;
return
},
};
let best_block_hash = client.info().best_hash;
let submit = pool
.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
.map_err(|e| {
e.into_pool_error()
.map(Error::from)
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
});
match submit.await {
Ok(stream) => {
let mut state = TransactionState::new();
let stream =
stream.filter_map(|event| async move { state.handle_event(event) });
sink.pipe_from_stream(stream.boxed()).await;
stream.filter_map(move |event| async move { state.handle_event(event) });
pipe_from_stream(pending, stream.boxed()).await;
},
Err(err) => {
// We have not created an `Watcher` for the tx. Make sure the
// error is still propagated as an event.
let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
sink.pipe_from_stream(futures::stream::once(async { event }).boxed()).await;
pipe_from_stream(pending, futures::stream::once(async { event }).boxed()).await;
},
};
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
}
}