rpc: Update jsonrpsee v0.15.1 (#11939)

* Bump jsonrpsee to v0.15.1

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update cargo.lock

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc-servers: Adjust RpcMiddleware to WS and HTTP traits

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/author: Use `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain: Use `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/state:  Use `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/finality-grandpa: Use `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/beefy: Use `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client: Extract RPC string result from queries

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Apply rust-fmt

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix warnings

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix testing

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tests: Remove trailing comma

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Use `SubscriptionResult` for implementations

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Remove comment

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Delegate middleware calls to `RpcMiddleware`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Remove comment

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Revert Cargo.lock

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update Cargo.lock with minimal changes

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Update imports for `SubscriptionResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Apply cargo fmt

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tests: Submit raw json requests to validate DenyUnsafe

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-08-02 20:00:18 +03:00
committed by GitHub
parent 103f770e75
commit 63f847c24f
32 changed files with 201 additions and 136 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ homepage = "https://substrate.io"
[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] }
futures = "0.3.21"
jsonrpsee = { version = "0.14.0", features = ["server", "macros"] }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
log = "0.4"
parking_lot = "0.12.0"
serde = { version = "1.0.136", features = ["derive"] }
+10 -11
View File
@@ -30,8 +30,8 @@ use futures::{task::SpawnError, FutureExt, StreamExt};
use jsonrpsee::{
core::{async_trait, Error as JsonRpseeError, RpcResult},
proc_macros::rpc,
types::{error::CallError, ErrorObject},
PendingSubscription,
types::{error::CallError, ErrorObject, SubscriptionResult},
SubscriptionSink,
};
use log::warn;
@@ -135,19 +135,18 @@ impl<Block> BeefyApiServer<notification::EncodedSignedCommitment, Block::Hash> f
where
Block: BlockT,
{
fn subscribe_justifications(&self, pending: PendingSubscription) {
fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self
.signed_commitment_stream
.subscribe()
.map(|sc| notification::EncodedSignedCommitment::new::<Block>(sc));
let fut = async move {
if let Some(mut sink) = pending.accept() {
sink.pipe_from_stream(stream).await;
}
sink.pipe_from_stream(stream).await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
async fn latest_finalized(&self) -> RpcResult<Block::Hash> {
@@ -197,9 +196,9 @@ mod tests {
let (rpc, _) = setup_io_handler();
let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
let expected_response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#.to_string();
let (result, _) = rpc.raw_json_request(&request).await.unwrap();
let (response, _) = rpc.raw_json_request(&request).await.unwrap();
assert_eq!(expected_response, result,);
assert_eq!(expected_response, response.result);
}
#[tokio::test]
@@ -229,8 +228,8 @@ mod tests {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline {
let (response, _) = io.raw_json_request(request).await.expect("RPC requests work");
if response != not_ready {
assert_eq!(response, expected);
if response.result != not_ready {
assert_eq!(response.result, expected);
// Success
return
}
@@ -260,7 +259,7 @@ mod tests {
.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":false,"id":1}"#;
assert_eq!(response, expected);
assert_eq!(response.result, expected);
}
fn create_commitment() -> BeefySignedCommitment<Block> {
@@ -13,7 +13,7 @@ readme = "README.md"
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.14.0", features = ["server", "macros"] }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }
thiserror = "1.0"
@@ -262,7 +262,7 @@ mod tests {
let (response, _) = api.raw_json_request(request).await.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":{"5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY":{"primary":[0],"secondary":[1,2,4],"secondary_vrf":[]}},"id":1}"#;
assert_eq!(&response, expected);
assert_eq!(&response.result, expected);
}
#[tokio::test]
@@ -274,6 +274,6 @@ mod tests {
let (response, _) = api.raw_json_request(request).await.unwrap();
let expected = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"RPC call is unsafe to be called externally"},"id":1}"#;
assert_eq!(&response, expected);
assert_eq!(&response.result, expected);
}
}
@@ -13,7 +13,7 @@ readme = "README.md"
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.14.0", features = ["server", "macros"] }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
assert_matches = "1.3.0"
async-trait = "0.1.50"
codec = { package = "parity-scale-codec", version = "3.0.0" }
@@ -12,7 +12,7 @@ homepage = "https://substrate.io"
[dependencies]
finality-grandpa = { version = "0.16.0", features = ["derive-codec"] }
futures = "0.3.16"
jsonrpsee = { version = "0.14.0", features = ["server", "macros"] }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
log = "0.4.8"
parity-scale-codec = { version = "3.0.0", features = ["derive"] }
serde = { version = "1.0.105", features = ["derive"] }
@@ -26,7 +26,8 @@ use std::sync::Arc;
use jsonrpsee::{
core::{async_trait, RpcResult},
proc_macros::rpc,
PendingSubscription,
types::SubscriptionResult,
SubscriptionSink,
};
mod error;
@@ -102,7 +103,7 @@ where
ReportedRoundStates::from(&self.authority_set, &self.voter_state).map_err(Into::into)
}
fn subscribe_justifications(&self, pending: PendingSubscription) {
fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self.justification_stream.subscribe().map(
|x: sc_finality_grandpa::GrandpaJustification<Block>| {
JustificationNotification::from(x)
@@ -110,12 +111,11 @@ where
);
let fut = async move {
if let Some(mut sink) = pending.accept() {
sink.pipe_from_stream(stream).await;
}
sink.pipe_from_stream(stream).await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
async fn prove_finality(
@@ -283,9 +283,9 @@ mod tests {
let (rpc, _) = setup_io_handler(EmptyVoterState);
let expected_response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"GRANDPA RPC endpoint not ready"},"id":0}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":0}"#;
let (result, _) = rpc.raw_json_request(&request).await.unwrap();
let (response, _) = rpc.raw_json_request(&request).await.unwrap();
assert_eq!(expected_response, result,);
assert_eq!(expected_response, response.result);
}
#[tokio::test]
@@ -306,8 +306,8 @@ mod tests {
},\"id\":0}".to_string();
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":0}"#;
let (result, _) = rpc.raw_json_request(&request).await.unwrap();
assert_eq!(expected_response, result);
let (response, _) = rpc.raw_json_request(&request).await.unwrap();
assert_eq!(expected_response, response.result);
}
#[tokio::test]
@@ -328,7 +328,7 @@ mod tests {
.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":false,"id":1}"#;
assert_eq!(response, expected);
assert_eq!(response.result, expected);
}
fn create_justification() -> GrandpaJustification<Block> {
+1 -1
View File
@@ -28,4 +28,4 @@ sp-rpc = { version = "6.0.0", path = "../../primitives/rpc" }
sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" }
sp-tracing = { version = "5.0.0", path = "../../primitives/tracing" }
sp-version = { version = "5.0.0", path = "../../primitives/version" }
jsonrpsee = { version = "0.14.0", features = ["server", "macros"] }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
+1 -1
View File
@@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.21"
jsonrpsee = { version = "0.14.0", features = ["server"] }
jsonrpsee = { version = "0.15.1", features = ["server"] }
log = "0.4.17"
serde_json = "1.0.79"
tokio = { version = "1.17.0", features = ["parking_lot"] }
+64 -15
View File
@@ -18,11 +18,12 @@
//! RPC middlware to collect prometheus metrics on RPC calls.
use jsonrpsee::core::middleware::Middleware;
use jsonrpsee::core::middleware::{Headers, HttpMiddleware, MethodKind, Params, WsMiddleware};
use prometheus_endpoint::{
register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
};
use std::net::SocketAddr;
/// Metrics for RPC middleware storing information about the number of requests started/completed,
/// calls started/completed and their timings.
@@ -134,30 +135,33 @@ impl RpcMiddleware {
pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
Self { metrics, transport_label }
}
}
impl Middleware for RpcMiddleware {
type Instant = std::time::Instant;
fn on_connect(&self) {
self.metrics.ws_sessions_opened.as_ref().map(|counter| counter.inc());
}
fn on_request(&self) -> Self::Instant {
/// Called when a new JSON-RPC request comes to the server.
fn on_request(&self) -> std::time::Instant {
let now = std::time::Instant::now();
self.metrics.requests_started.with_label_values(&[self.transport_label]).inc();
now
}
fn on_call(&self, name: &str) {
log::trace!(target: "rpc_metrics", "[{}] on_call name={}", self.transport_label, name);
/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
log::trace!(
target: "rpc_metrics",
"[{}] on_call name={} params={:?} kind={}",
self.transport_label,
name,
params,
kind,
);
self.metrics
.calls_started
.with_label_values(&[self.transport_label, name])
.inc();
}
fn on_result(&self, name: &str, success: bool, started_at: Self::Instant) {
/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple
/// times.
fn on_result(&self, name: &str, success: bool, started_at: std::time::Instant) {
let micros = started_at.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
@@ -183,12 +187,57 @@ impl Middleware for RpcMiddleware {
.inc();
}
fn on_response(&self, started_at: Self::Instant) {
/// Called once the JSON-RPC request is finished and response is sent to the output buffer.
fn on_response(&self, _result: &str, started_at: std::time::Instant) {
log::trace!(target: "rpc_metrics", "[{}] on_response started_at={:?}", self.transport_label, started_at);
self.metrics.requests_finished.with_label_values(&[self.transport_label]).inc();
}
}
fn on_disconnect(&self) {
impl WsMiddleware for RpcMiddleware {
type Instant = std::time::Instant;
fn on_connect(&self, _remote_addr: SocketAddr, _headers: &Headers) {
self.metrics.ws_sessions_opened.as_ref().map(|counter| counter.inc());
}
fn on_request(&self) -> Self::Instant {
self.on_request()
}
fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
self.on_call(name, params, kind)
}
fn on_result(&self, name: &str, success: bool, started_at: Self::Instant) {
self.on_result(name, success, started_at)
}
fn on_response(&self, _result: &str, started_at: Self::Instant) {
self.on_response(_result, started_at)
}
fn on_disconnect(&self, _remote_addr: SocketAddr) {
self.metrics.ws_sessions_closed.as_ref().map(|counter| counter.inc());
}
}
impl HttpMiddleware for RpcMiddleware {
type Instant = std::time::Instant;
fn on_request(&self, _remote_addr: SocketAddr, _headers: &Headers) -> Self::Instant {
self.on_request()
}
fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
self.on_call(name, params, kind)
}
fn on_result(&self, name: &str, success: bool, started_at: Self::Instant) {
self.on_result(name, success, started_at)
}
fn on_response(&self, _result: &str, started_at: Self::Instant) {
self.on_response(_result, started_at)
}
}
+1 -1
View File
@@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "3.0.0" }
futures = "0.3.21"
hash-db = { version = "0.15.2", default-features = false }
jsonrpsee = { version = "0.14.0", features = ["server"] }
jsonrpsee = { version = "0.15.1", features = ["server"] }
lazy_static = { version = "1.4.0", optional = true }
log = "0.4.17"
parking_lot = "0.12.0"
+7 -10
View File
@@ -29,7 +29,8 @@ use codec::{Decode, Encode};
use futures::{FutureExt, TryFutureExt};
use jsonrpsee::{
core::{async_trait, Error as JsonRpseeError, RpcResult},
PendingSubscription,
types::SubscriptionResult,
SubscriptionSink,
};
use sc_rpc_api::DenyUnsafe;
use sc_transaction_pool_api::{
@@ -176,13 +177,13 @@ where
.collect())
}
fn watch_extrinsic(&self, pending: PendingSubscription, xt: Bytes) {
fn watch_extrinsic(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult {
let best_block_hash = self.client.info().best_hash;
let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(|e| Error::from(e)) {
Ok(dxt) => dxt,
Err(e) => {
pending.reject(JsonRpseeError::from(e));
return
let _ = sink.reject(JsonRpseeError::from(e));
return Ok(())
},
};
@@ -199,19 +200,15 @@ where
let stream = match submit.await {
Ok(stream) => stream,
Err(err) => {
pending.reject(JsonRpseeError::from(err));
let _ = sink.reject(JsonRpseeError::from(err));
return
},
};
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
sink.pipe_from_stream(stream).await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
}
+6 -8
View File
@@ -26,7 +26,7 @@ use futures::{
future::{self, FutureExt},
stream::{self, Stream, StreamExt},
};
use jsonrpsee::PendingSubscription;
use jsonrpsee::SubscriptionSink;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
use sp_runtime::{
@@ -69,7 +69,7 @@ where
self.client.block(&BlockId::Hash(self.unwrap_or_best(hash))).map_err(client_err)
}
fn subscribe_all_heads(&self, sink: PendingSubscription) {
fn subscribe_all_heads(&self, sink: SubscriptionSink) {
subscribe_headers(
&self.client,
&self.executor,
@@ -83,7 +83,7 @@ where
)
}
fn subscribe_new_heads(&self, sink: PendingSubscription) {
fn subscribe_new_heads(&self, sink: SubscriptionSink) {
subscribe_headers(
&self.client,
&self.executor,
@@ -98,7 +98,7 @@ where
)
}
fn subscribe_finalized_heads(&self, sink: PendingSubscription) {
fn subscribe_finalized_heads(&self, sink: SubscriptionSink) {
subscribe_headers(
&self.client,
&self.executor,
@@ -117,7 +117,7 @@ where
fn subscribe_headers<Block, Client, F, G, S>(
client: &Arc<Client>,
executor: &SubscriptionTaskExecutor,
pending: PendingSubscription,
mut sink: SubscriptionSink,
best_block_hash: G,
stream: F,
) where
@@ -143,9 +143,7 @@ fn subscribe_headers<Block, Client, F, G, S>(
let stream = stream::iter(maybe_header).chain(stream());
let fut = async move {
if let Some(mut sink) = pending.accept() {
sink.pipe_from_stream(stream).await;
}
sink.pipe_from_stream(stream).await;
};
executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+13 -10
View File
@@ -27,7 +27,7 @@ use std::sync::Arc;
use crate::SubscriptionTaskExecutor;
use jsonrpsee::{core::RpcResult, PendingSubscription};
use jsonrpsee::{core::RpcResult, types::SubscriptionResult, SubscriptionSink};
use sc_client_api::BlockchainEvents;
use sp_rpc::{list::ListOrValue, number::NumberOrHex};
use sp_runtime::{
@@ -95,13 +95,13 @@ where
}
/// All new head subscription
fn subscribe_all_heads(&self, sink: PendingSubscription);
fn subscribe_all_heads(&self, sink: SubscriptionSink);
/// New best head subscription
fn subscribe_new_heads(&self, sink: PendingSubscription);
fn subscribe_new_heads(&self, sink: SubscriptionSink);
/// Finalized head subscription
fn subscribe_finalized_heads(&self, sink: PendingSubscription);
fn subscribe_finalized_heads(&self, sink: SubscriptionSink);
}
/// Create new state API that works on full node.
@@ -160,16 +160,19 @@ where
self.backend.finalized_head().map_err(Into::into)
}
fn subscribe_all_heads(&self, sink: PendingSubscription) {
self.backend.subscribe_all_heads(sink)
fn subscribe_all_heads(&self, sink: SubscriptionSink) -> SubscriptionResult {
self.backend.subscribe_all_heads(sink);
Ok(())
}
fn subscribe_new_heads(&self, sink: PendingSubscription) {
self.backend.subscribe_new_heads(sink)
fn subscribe_new_heads(&self, sink: SubscriptionSink) -> SubscriptionResult {
self.backend.subscribe_new_heads(sink);
Ok(())
}
fn subscribe_finalized_heads(&self, sink: PendingSubscription) {
self.backend.subscribe_finalized_heads(sink)
fn subscribe_finalized_heads(&self, sink: SubscriptionSink) -> SubscriptionResult {
self.backend.subscribe_finalized_heads(sink);
Ok(())
}
}
+13 -6
View File
@@ -17,8 +17,6 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::*;
use assert_matches::assert_matches;
use jsonrpsee::{core::Error as JsonRpseeError, types::error::CallError};
use sc_block_builder::BlockBuilderProvider;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
@@ -61,9 +59,18 @@ async fn deny_unsafe_works() {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).await.unwrap();
assert_matches!(
api.call::<_, Option<BlockStats>>("dev_getBlockStats", [client.info().best_hash])
.await,
Err(JsonRpseeError::Call(CallError::Custom(err))) if err.message().contains("RPC call is unsafe to be called externally")
let best_hash = client.info().best_hash;
let best_hash_param =
serde_json::to_string(&best_hash).expect("To string must always succeed for block hashes");
let request = format!(
"{{\"jsonrpc\":\"2.0\",\"method\":\"dev_getBlockStats\",\"params\":[{}],\"id\":1}}",
best_hash_param
);
let (resp, _) = api.raw_json_request(&request).await.expect("Raw calls should succeed");
assert_eq!(
resp.result,
r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"RPC call is unsafe to be called externally"},"id":1}"#
);
}
+15 -8
View File
@@ -29,7 +29,8 @@ use crate::SubscriptionTaskExecutor;
use jsonrpsee::{
core::{Error as JsonRpseeError, RpcResult},
ws_server::PendingSubscription,
types::SubscriptionResult,
ws_server::SubscriptionSink,
};
use sc_rpc_api::{state::ReadProof, DenyUnsafe};
@@ -155,10 +156,10 @@ where
) -> Result<sp_rpc::tracing::TraceBlockResponse, Error>;
/// New runtime version subscription
fn subscribe_runtime_version(&self, sink: PendingSubscription);
fn subscribe_runtime_version(&self, sink: SubscriptionSink);
/// New storage subscription
fn subscribe_storage(&self, sink: PendingSubscription, keys: Option<Vec<StorageKey>>);
fn subscribe_storage(&self, sink: SubscriptionSink, keys: Option<Vec<StorageKey>>);
}
/// Create new state API that works on full node.
@@ -318,19 +319,25 @@ where
.map_err(Into::into)
}
fn subscribe_runtime_version(&self, sink: PendingSubscription) {
self.backend.subscribe_runtime_version(sink)
fn subscribe_runtime_version(&self, sink: SubscriptionSink) -> SubscriptionResult {
self.backend.subscribe_runtime_version(sink);
Ok(())
}
fn subscribe_storage(&self, sink: PendingSubscription, keys: Option<Vec<StorageKey>>) {
fn subscribe_storage(
&self,
mut sink: SubscriptionSink,
keys: Option<Vec<StorageKey>>,
) -> SubscriptionResult {
if keys.is_none() {
if let Err(err) = self.deny_unsafe.check_if_safe() {
let _ = sink.reject(JsonRpseeError::from(err));
return
return Ok(())
}
}
self.backend.subscribe_storage(sink, keys)
self.backend.subscribe_storage(sink, keys);
Ok(())
}
}
+7 -11
View File
@@ -28,7 +28,7 @@ use super::{
use crate::SubscriptionTaskExecutor;
use futures::{future, stream, FutureExt, StreamExt};
use jsonrpsee::{core::Error as JsonRpseeError, PendingSubscription};
use jsonrpsee::{core::Error as JsonRpseeError, SubscriptionSink};
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ExecutorProvider, ProofProvider,
StorageProvider,
@@ -357,7 +357,7 @@ where
.map_err(client_err)
}
fn subscribe_runtime_version(&self, pending: PendingSubscription) {
fn subscribe_runtime_version(&self, mut sink: SubscriptionSink) {
let client = self.client.clone();
let initial = match self
@@ -369,7 +369,7 @@ where
{
Ok(initial) => initial,
Err(e) => {
pending.reject(JsonRpseeError::from(e));
let _ = sink.reject(JsonRpseeError::from(e));
return
},
};
@@ -397,19 +397,17 @@ where
let stream = futures::stream::once(future::ready(initial)).chain(version_stream);
let fut = async move {
if let Some(mut sink) = pending.accept() {
sink.pipe_from_stream(stream).await;
}
sink.pipe_from_stream(stream).await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
fn subscribe_storage(&self, pending: PendingSubscription, keys: Option<Vec<StorageKey>>) {
fn subscribe_storage(&self, mut sink: SubscriptionSink, keys: Option<Vec<StorageKey>>) {
let stream = match self.client.storage_changes_notification_stream(keys.as_deref(), None) {
Ok(stream) => stream,
Err(blockchain_err) => {
pending.reject(JsonRpseeError::from(Error::Client(Box::new(blockchain_err))));
let _ = sink.reject(JsonRpseeError::from(Error::Client(Box::new(blockchain_err))));
return
},
};
@@ -442,9 +440,7 @@ where
.filter(|storage| future::ready(!storage.changes.is_empty()));
let fut = async move {
if let Some(mut sink) = pending.accept() {
sink.pipe_from_stream(stream).await;
}
sink.pipe_from_stream(stream).await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+1 -1
View File
@@ -22,7 +22,7 @@ wasmtime = ["sc-executor/wasmtime"]
test-helpers = []
[dependencies]
jsonrpsee = { version = "0.14.0", features = ["server"] }
jsonrpsee = { version = "0.15.1", features = ["server"] }
thiserror = "1.0.30"
futures = "0.3.21"
rand = "0.7.3"
+4 -1
View File
@@ -101,7 +101,10 @@ impl RpcHandlers {
&self,
json_query: &str,
) -> Result<(String, mpsc::UnboundedReceiver<String>), JsonRpseeError> {
self.0.raw_json_request(json_query).await
self.0
.raw_json_request(json_query)
.await
.map(|(method_res, recv)| (method_res.result, recv))
}
/// Provides access to the underlying `RpcModule`
+1 -1
View File
@@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0" }
jsonrpsee = { version = "0.14.0", features = ["server", "macros"] }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
thiserror = "1.0.30"