rpc: bump jsonrpsee v0.22 and fix race in rpc v2 chain_head (#3230)

Close #2992 

Breaking changes:
- rpc server grafana metric `substrate_rpc_requests_started` is removed
(not possible to implement anymore)
- rpc server grafana metric `substrate_rpc_requests_finished` is removed
(not possible to implement anymore)
- rpc server ws ping/pong not ACK:ed within 30 seconds more than three
times then the connection will be closed

Added
- rpc server grafana metric `substrate_rpc_sessions_time` is added to
get the duration for each websocket session
This commit is contained in:
Niklas Adolfsson
2024-02-14 23:18:22 +01:00
committed by GitHub
parent 7e7c488ba8
commit c7c4fe0184
53 changed files with 777 additions and 468 deletions
@@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
futures = "0.3.21"
serde = { version = "1.0.195", features = ["derive"] }
thiserror = "1.0"
@@ -260,7 +260,7 @@ mod tests {
let (response, _) = api.raw_json_request(request, 1).await.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":{"5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY":{"primary":[0],"secondary":[1,2,4],"secondary_vrf":[]}},"id":1}"#;
assert_eq!(&response.result, expected);
assert_eq!(response, expected);
}
#[tokio::test]
@@ -272,6 +272,6 @@ mod tests {
let (response, _) = api.raw_json_request(request, 1).await.unwrap();
let expected = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"RPC call is unsafe to be called externally"},"id":1}"#;
assert_eq!(&response.result, expected);
assert_eq!(response, expected);
}
}
@@ -14,7 +14,7 @@ workspace = true
[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] }
futures = "0.3.21"
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
log = { workspace = true, default-features = true }
parking_lot = "0.12.1"
serde = { version = "1.0.195", features = ["derive"] }
@@ -184,10 +184,10 @@ mod tests {
async fn uninitialized_rpc_handler() {
let (rpc, _) = setup_io_handler();
let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
let expected_response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#.to_string();
let expected_response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response.result);
assert_eq!(expected_response, response);
}
#[tokio::test]
@@ -205,20 +205,18 @@ mod tests {
\"jsonrpc\":\"2.0\",\
\"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\",\
\"id\":1\
}"
.to_string();
}";
let not_ready = "{\
\"jsonrpc\":\"2.0\",\
\"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"},\
\"id\":1\
}"
.to_string();
}";
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline {
let (response, _) = io.raw_json_request(request, 1).await.expect("RPC requests work");
if response.result != not_ready {
assert_eq!(response.result, expected);
if response != not_ready {
assert_eq!(response, expected);
// Success
return
}
@@ -249,7 +247,7 @@ mod tests {
.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":false,"id":1}"#;
assert_eq!(response.result, expected);
assert_eq!(response, expected);
}
fn create_finality_proof() -> BeefyVersionedFinalityProof<Block> {
@@ -15,7 +15,7 @@ workspace = true
[dependencies]
finality-grandpa = { version = "0.16.2", features = ["derive-codec"] }
futures = "0.3.16"
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
log = { workspace = true, default-features = true }
parity-scale-codec = { version = "3.6.1", features = ["derive"] }
serde = { version = "1.0.195", features = ["derive"] }
@@ -273,7 +273,7 @@ mod tests {
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response.result);
assert_eq!(expected_response, response);
}
#[tokio::test]
@@ -295,7 +295,7 @@ mod tests {
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response.result);
assert_eq!(expected_response, response);
}
#[tokio::test]
@@ -317,7 +317,7 @@ mod tests {
.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":false,"id":1}"#;
assert_eq!(response.result, expected);
assert_eq!(response, expected);
}
fn create_justification() -> GrandpaJustification<Block> {
@@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
assert_matches = "1.3.0"
async-trait = "0.1.74"
codec = { package = "parity-scale-codec", version = "3.6.1" }
@@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.1" }
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
serde = { version = "1.0.195", features = ["derive"] }
sp-api = { path = "../../../primitives/api" }
sp-blockchain = { path = "../../../primitives/blockchain" }
+1 -1
View File
@@ -28,4 +28,4 @@ sp-core = { path = "../../primitives/core" }
sp-rpc = { path = "../../primitives/rpc" }
sp-runtime = { path = "../../primitives/runtime" }
sp-version = { path = "../../primitives/version" }
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
+4 -1
View File
@@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
log = { workspace = true, default-features = true }
serde_json = "1.0.111"
tokio = { version = "1.22.0", features = ["parking_lot"] }
@@ -24,3 +24,6 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../..
tower-http = { version = "0.4.0", features = ["cors"] }
tower = { version = "0.4.13", features = ["util"] }
http = "0.2.8"
hyper = "0.14.27"
futures = "0.3.29"
pin-project = "1.1.3"
+85 -16
View File
@@ -22,21 +22,32 @@
pub mod middleware;
use std::{error::Error as StdError, net::SocketAddr, time::Duration};
use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
use http::header::HeaderValue;
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
};
use jsonrpsee::{
server::middleware::{HostFilterLayer, ProxyGetRequestLayer},
RpcModule,
server::{
middleware::{
http::{HostFilterLayer, ProxyGetRequestLayer},
rpc::RpcServiceBuilder,
},
stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder,
},
Methods, RpcModule,
};
use tokio::net::TcpListener;
use tower::Service;
use tower_http::cors::{AllowOrigin, CorsLayer};
pub use crate::middleware::RpcMetrics;
pub use jsonrpsee::core::{
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
traits::IdProvider,
};
pub use middleware::{MetricsLayer, RpcMetrics};
const MEGABYTE: u32 = 1024 * 1024;
@@ -92,7 +103,7 @@ pub async fn start_server<M: Send + Sync + 'static>(
let local_addr = std_listener.local_addr().ok();
let host_filter = hosts_filtering(cors.is_some(), local_addr);
let middleware = tower::ServiceBuilder::new()
let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
@@ -103,10 +114,15 @@ pub async fn start_server<M: Send + Sync + 'static>(
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subs_per_conn)
.ping_interval(Duration::from_secs(30))
.set_middleware(middleware)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(message_buffer_capacity)
.custom_tokio_runtime(tokio_handle);
.custom_tokio_runtime(tokio_handle.clone());
if let Some(provider) = id_provider {
builder = builder.set_id_provider(provider);
@@ -114,22 +130,66 @@ pub async fn start_server<M: Send + Sync + 'static>(
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};
let rpc_api = build_rpc_api(rpc_api);
let handle = if let Some(metrics) = metrics {
let server = builder.set_logger(metrics).build_from_tcp(std_listener)?;
server.start(rpc_api)
} else {
let server = builder.build_from_tcp(std_listener)?;
server.start(rpc_api)
let (stop_handle, server_handle) = stop_channel();
let cfg = PerConnection {
methods: build_rpc_api(rpc_api).into(),
service_builder: builder.to_service_builder(),
metrics,
tokio_handle,
stop_handle: stop_handle.clone(),
};
let make_service = make_service_fn(move |_conn: &AddrStream| {
let cfg = cfg.clone();
async move {
let cfg = cfg.clone();
Ok::<_, Infallible>(service_fn(move |req| {
let PerConnection { service_builder, metrics, tokio_handle, stop_handle, methods } =
cfg.clone();
let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };
let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label));
let rpc_middleware = RpcServiceBuilder::new().option_layer(metrics.clone());
let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);
async move {
if is_websocket {
let on_disconnect = svc.on_session_closed();
// Spawn a task to handle when the connection is closed.
tokio_handle.spawn(async move {
let now = std::time::Instant::now();
metrics.as_ref().map(|m| m.ws_connect());
on_disconnect.await;
metrics.as_ref().map(|m| m.ws_disconnect(now));
});
}
svc.call(req).await
}
}))
}
});
let server = hyper::Server::from_tcp(std_listener)?.serve(make_service);
tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async move { stop_handle.shutdown().await });
let _ = graceful.await;
});
log::info!(
"Running JSON-RPC server: addr={}, allowed origins={}",
local_addr.map_or_else(|| "unknown".to_string(), |a| a.to_string()),
format_cors(cors)
);
Ok(handle)
Ok(server_handle)
}
fn hosts_filtering(enabled: bool, addr: Option<SocketAddr>) -> Option<HostFilterLayer> {
@@ -185,3 +245,12 @@ fn format_cors(maybe_cors: Option<&Vec<String>>) -> String {
format!("{:?}", ["*"])
}
}
#[derive(Clone)]
struct PerConnection<RpcMiddleware, HttpMiddleware> {
methods: Methods,
stop_handle: StopHandle,
metrics: Option<RpcMetrics>,
tokio_handle: tokio::runtime::Handle,
service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
}
@@ -1,226 +0,0 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! RPC middleware to collect prometheus metrics on RPC calls.
use jsonrpsee::server::logger::{
HttpRequest, Logger, MethodKind, Params, SuccessOrError, TransportProtocol,
};
use prometheus_endpoint::{
register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
};
use std::net::SocketAddr;
/// Histogram time buckets in microseconds.
const HISTOGRAM_BUCKETS: [f64; 11] = [
5.0,
25.0,
100.0,
500.0,
1_000.0,
2_500.0,
10_000.0,
25_000.0,
100_000.0,
1_000_000.0,
10_000_000.0,
];
/// Metrics for RPC middleware storing information about the number of requests started/completed,
/// calls started/completed and their timings.
#[derive(Debug, Clone)]
pub struct RpcMetrics {
/// Number of RPC requests received since the server started.
requests_started: CounterVec<U64>,
/// Number of RPC requests completed since the server started.
requests_finished: CounterVec<U64>,
/// Histogram over RPC execution times.
calls_time: HistogramVec,
/// Number of calls started.
calls_started: CounterVec<U64>,
/// Number of calls completed.
calls_finished: CounterVec<U64>,
/// Number of Websocket sessions opened.
ws_sessions_opened: Option<Counter<U64>>,
/// Number of Websocket sessions closed.
ws_sessions_closed: Option<Counter<U64>>,
}
impl RpcMetrics {
/// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
if let Some(metrics_registry) = metrics_registry {
Ok(Some(Self {
requests_started: register(
CounterVec::new(
Opts::new(
"substrate_rpc_requests_started",
"Number of RPC requests (not calls) received by the server.",
),
&["protocol"],
)?,
metrics_registry,
)?,
requests_finished: register(
CounterVec::new(
Opts::new(
"substrate_rpc_requests_finished",
"Number of RPC requests (not calls) processed by the server.",
),
&["protocol"],
)?,
metrics_registry,
)?,
calls_time: register(
HistogramVec::new(
HistogramOpts::new(
"substrate_rpc_calls_time",
"Total time [μs] of processed RPC calls",
)
.buckets(HISTOGRAM_BUCKETS.to_vec()),
&["protocol", "method"],
)?,
metrics_registry,
)?,
calls_started: register(
CounterVec::new(
Opts::new(
"substrate_rpc_calls_started",
"Number of received RPC calls (unique un-batched requests)",
),
&["protocol", "method"],
)?,
metrics_registry,
)?,
calls_finished: register(
CounterVec::new(
Opts::new(
"substrate_rpc_calls_finished",
"Number of processed RPC calls (unique un-batched requests)",
),
&["protocol", "method", "is_error"],
)?,
metrics_registry,
)?,
ws_sessions_opened: register(
Counter::new(
"substrate_rpc_sessions_opened",
"Number of persistent RPC sessions opened",
)?,
metrics_registry,
)?
.into(),
ws_sessions_closed: register(
Counter::new(
"substrate_rpc_sessions_closed",
"Number of persistent RPC sessions closed",
)?,
metrics_registry,
)?
.into(),
}))
} else {
Ok(None)
}
}
}
impl Logger for RpcMetrics {
type Instant = std::time::Instant;
fn on_connect(
&self,
_remote_addr: SocketAddr,
_request: &HttpRequest,
transport: TransportProtocol,
) {
if let TransportProtocol::WebSocket = transport {
self.ws_sessions_opened.as_ref().map(|counter| counter.inc());
}
}
fn on_request(&self, transport: TransportProtocol) -> Self::Instant {
let transport_label = transport_label_str(transport);
let now = std::time::Instant::now();
self.requests_started.with_label_values(&[transport_label]).inc();
now
}
fn on_call(&self, name: &str, params: Params, kind: MethodKind, transport: TransportProtocol) {
let transport_label = transport_label_str(transport);
log::trace!(
target: "rpc_metrics",
"[{}] on_call name={} params={:?} kind={}",
transport_label,
name,
params,
kind,
);
self.calls_started.with_label_values(&[transport_label, name]).inc();
}
fn on_result(
&self,
name: &str,
success_or_error: SuccessOrError,
started_at: Self::Instant,
transport: TransportProtocol,
) {
let transport_label = transport_label_str(transport);
let micros = started_at.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
"[{}] {} call took {} μs",
transport_label,
name,
micros,
);
self.calls_time.with_label_values(&[transport_label, name]).observe(micros as _);
self.calls_finished
.with_label_values(&[
transport_label,
name,
// the label "is_error", so `success` should be regarded as false
// and vice-versa to be registrered correctly.
if success_or_error.is_success() { "false" } else { "true" },
])
.inc();
}
fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol) {
let transport_label = transport_label_str(transport);
log::trace!(target: "rpc_metrics", "[{}] on_response started_at={:?}", transport_label, started_at);
log::trace!(target: "rpc_metrics::extra", "[{}] result={:?}", transport_label, result);
self.requests_finished.with_label_values(&[transport_label]).inc();
}
fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) {
if let TransportProtocol::WebSocket = transport {
self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
}
}
}
fn transport_label_str(t: TransportProtocol) -> &'static str {
match t {
TransportProtocol::Http => "http",
TransportProtocol::WebSocket => "ws",
}
}
@@ -0,0 +1,281 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! RPC middleware to collect prometheus metrics on RPC calls.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Instant,
};
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse};
use pin_project::pin_project;
use prometheus_endpoint::{
register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
};
/// Histogram time buckets in microseconds.
const HISTOGRAM_BUCKETS: [f64; 11] = [
5.0,
25.0,
100.0,
500.0,
1_000.0,
2_500.0,
10_000.0,
25_000.0,
100_000.0,
1_000_000.0,
10_000_000.0,
];
/// Metrics for RPC middleware storing information about the number of requests started/completed,
/// calls started/completed and their timings.
#[derive(Debug, Clone)]
pub struct RpcMetrics {
/// Histogram over RPC execution times.
calls_time: HistogramVec,
/// Number of calls started.
calls_started: CounterVec<U64>,
/// Number of calls completed.
calls_finished: CounterVec<U64>,
/// Number of Websocket sessions opened.
ws_sessions_opened: Option<Counter<U64>>,
/// Number of Websocket sessions closed.
ws_sessions_closed: Option<Counter<U64>>,
/// Histogram over RPC websocket sessions.
ws_sessions_time: HistogramVec,
}
impl RpcMetrics {
/// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
if let Some(metrics_registry) = metrics_registry {
Ok(Some(Self {
calls_time: register(
HistogramVec::new(
HistogramOpts::new(
"substrate_rpc_calls_time",
"Total time [μs] of processed RPC calls",
)
.buckets(HISTOGRAM_BUCKETS.to_vec()),
&["protocol", "method"],
)?,
metrics_registry,
)?,
calls_started: register(
CounterVec::new(
Opts::new(
"substrate_rpc_calls_started",
"Number of received RPC calls (unique un-batched requests)",
),
&["protocol", "method"],
)?,
metrics_registry,
)?,
calls_finished: register(
CounterVec::new(
Opts::new(
"substrate_rpc_calls_finished",
"Number of processed RPC calls (unique un-batched requests)",
),
&["protocol", "method", "is_error"],
)?,
metrics_registry,
)?,
ws_sessions_opened: register(
Counter::new(
"substrate_rpc_sessions_opened",
"Number of persistent RPC sessions opened",
)?,
metrics_registry,
)?
.into(),
ws_sessions_closed: register(
Counter::new(
"substrate_rpc_sessions_closed",
"Number of persistent RPC sessions closed",
)?,
metrics_registry,
)?
.into(),
ws_sessions_time: register(
HistogramVec::new(
HistogramOpts::new(
"substrate_rpc_sessions_time",
"Total time [s] for each websocket session",
)
.buckets(HISTOGRAM_BUCKETS.to_vec()),
&["protocol"],
)?,
metrics_registry,
)?,
}))
} else {
Ok(None)
}
}
pub(crate) fn ws_connect(&self) {
self.ws_sessions_opened.as_ref().map(|counter| counter.inc());
}
pub(crate) fn ws_disconnect(&self, now: Instant) {
let micros = now.elapsed().as_secs();
self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _);
}
}
/// Metrics layer.
#[derive(Clone)]
pub struct MetricsLayer {
inner: RpcMetrics,
transport_label: &'static str,
}
impl MetricsLayer {
/// Create a new [`MetricsLayer`].
pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
Self { inner: metrics, transport_label }
}
pub(crate) fn ws_connect(&self) {
self.inner.ws_connect();
}
pub(crate) fn ws_disconnect(&self, now: Instant) {
self.inner.ws_disconnect(now)
}
}
impl<S> tower::Layer<S> for MetricsLayer {
type Service = Metrics<S>;
fn layer(&self, inner: S) -> Self::Service {
Metrics::new(inner, self.inner.clone(), self.transport_label)
}
}
/// Metrics middleware.
#[derive(Clone)]
pub struct Metrics<S> {
service: S,
metrics: RpcMetrics,
transport_label: &'static str,
}
impl<S> Metrics<S> {
/// Create a new metrics middleware.
pub fn new(service: S, metrics: RpcMetrics, transport_label: &'static str) -> Metrics<S> {
Metrics { service, metrics, transport_label }
}
}
impl<'a, S> RpcServiceT<'a> for Metrics<S>
where
S: Send + Sync + RpcServiceT<'a>,
{
type Future = ResponseFuture<'a, S::Future>;
fn call(&self, req: Request<'a>) -> Self::Future {
let now = Instant::now();
log::trace!(
target: "rpc_metrics",
"[{}] on_call name={} params={:?}",
self.transport_label,
req.method_name(),
req.params(),
);
self.metrics
.calls_started
.with_label_values(&[self.transport_label, req.method_name()])
.inc();
ResponseFuture {
fut: self.service.call(req.clone()),
metrics: self.metrics.clone(),
req,
now,
transport_label: self.transport_label,
}
}
}
/// Response future for metrics.
#[pin_project]
pub struct ResponseFuture<'a, F> {
#[pin]
fut: F,
metrics: RpcMetrics,
req: Request<'a>,
now: Instant,
transport_label: &'static str,
}
impl<'a, F> std::fmt::Debug for ResponseFuture<'a, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("ResponseFuture")
}
}
impl<'a, F: Future<Output = MethodResponse>> Future for ResponseFuture<'a, F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.fut.poll(cx);
if let Poll::Ready(rp) = &res {
let method_name = this.req.method_name();
let transport_label = &this.transport_label;
let now = this.now;
let metrics = &this.metrics;
log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={:?}", rp);
let micros = now.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
"[{transport_label}] {method_name} call took {} μs",
micros,
);
metrics
.calls_time
.with_label_values(&[transport_label, method_name])
.observe(micros as _);
metrics
.calls_finished
.with_label_values(&[
transport_label,
method_name,
// the label "is_error", so `success` should be regarded as false
// and vice-versa to be registrered correctly.
if rp.is_success() { "false" } else { "true" },
])
.inc();
}
res
}
}
@@ -0,0 +1,23 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! JSON-RPC specific middleware.
pub mod metrics;
pub use metrics::*;
+1 -1
View File
@@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
# Internal chain structures for "chain_spec".
sc-chain-spec = { path = "../chain-spec" }
# Pool for submitting extrinsics required by "transaction"
@@ -32,8 +32,7 @@ use super::{
use assert_matches::assert_matches;
use codec::{Decode, Encode};
use jsonrpsee::{
core::{EmptyServerParams as EmptyParams, Error},
rpc_params, RpcModule,
core::EmptyServerParams as EmptyParams, rpc_params, MethodsError as Error, RpcModule,
};
use sc_block_builder::BlockBuilderBuilder;
use sc_client_api::ChildInfo;
@@ -294,7 +293,7 @@ async fn archive_call() {
)
.await
.unwrap_err();
assert_matches!(err, Error::Call(err) if err.code() == 3001 && err.message().contains("Invalid parameter"));
assert_matches!(err, Error::JsonRpc(err) if err.code() == 3001 && err.message().contains("Invalid parameter"));
// Pass an invalid parameters that cannot be decode.
let err = api
@@ -305,7 +304,7 @@ async fn archive_call() {
)
.await
.unwrap_err();
assert_matches!(err, Error::Call(err) if err.code() == 3001 && err.message().contains("Invalid parameter"));
assert_matches!(err, Error::JsonRpc(err) if err.code() == 3001 && err.message().contains("Invalid parameter"));
// Invalid hash.
let result: MethodResult = api
@@ -26,7 +26,7 @@ use crate::{
},
common::events::StorageQuery,
};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::{proc_macros::rpc, server::ResponsePayload};
use sp_rpc::list::ListOrValue;
#[rpc(client, server)]
@@ -59,7 +59,7 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
hash: Hash,
) -> Result<MethodResponse, Error>;
) -> ResponsePayload<'static, MethodResponse>;
/// Retrieves the header of a pinned block.
///
@@ -92,7 +92,7 @@ pub trait ChainHeadApi<Hash> {
hash: Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> Result<MethodResponse, Error>;
) -> ResponsePayload<'static, MethodResponse>;
/// Call into the Runtime API at a specified block's state.
///
@@ -106,7 +106,7 @@ pub trait ChainHeadApi<Hash> {
hash: Hash,
function: String,
call_parameters: String,
) -> Result<MethodResponse, Error>;
) -> ResponsePayload<'static, MethodResponse>;
/// Unpin a block or multiple blocks reported by the `follow` method.
///
@@ -36,7 +36,8 @@ use crate::{
use codec::Encode;
use futures::future::FutureExt;
use jsonrpsee::{
core::async_trait, types::SubscriptionId, PendingSubscriptionSink, SubscriptionSink,
core::async_trait, server::ResponsePayload, types::SubscriptionId, MethodResponseFuture,
PendingSubscriptionSink, SubscriptionSink,
};
use log::debug;
use sc_client_api::{
@@ -218,16 +219,17 @@ where
&self,
follow_subscription: String,
hash: Block::Hash,
) -> Result<MethodResponse, ChainHeadRpcError> {
) -> ResponsePayload<'static, MethodResponse> {
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::ExceededLimits) =>
return ResponsePayload::success(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
let operation_id = block_guard.operation().operation_id();
@@ -254,7 +256,7 @@ where
hash
);
self.subscriptions.remove_subscription(&follow_subscription);
return Err(ChainHeadRpcError::InvalidBlock.into())
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: operation_id.clone(),
@@ -262,8 +264,20 @@ where
}),
};
let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
let (rp, rp_fut) = method_started_response(operation_id, None);
let fut = async move {
// Events should only by generated
// if the response was successfully propagated.
if rp_fut.await.is_err() {
return;
}
let _ = block_guard.response_sender().unbounded_send(event);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
}
fn chain_head_unstable_header(
@@ -294,31 +308,40 @@ where
hash: Block::Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> Result<MethodResponse, ChainHeadRpcError> {
) -> ResponsePayload<'static, MethodResponse> {
// Gain control over parameter parsing and returned error.
let items = items
let items = match items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
Ok(StorageQuery { key, query_type: query.query_type })
})
.collect::<Result<Vec<_>, ChainHeadRpcError>>()?;
.collect::<Result<Vec<_>, ChainHeadRpcError>>()
{
Ok(items) => items,
Err(err) => {
return ResponsePayload::error(err);
},
};
let child_trie = child_trie
.map(|child_trie| parse_hex_param(child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
{
Ok(c) => c.map(ChildInfo::new_default_from_vec),
Err(e) => return ResponsePayload::error(e),
};
let mut block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::ExceededLimits) => {
return ResponsePayload::success(MethodResponse::LimitReached);
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
@@ -334,16 +357,21 @@ where
let mut items = items;
items.truncate(num_operations);
let (rp, rp_is_success) = method_started_response(operation_id, Some(discarded));
let fut = async move {
// Events should only by generated
// if the response was successfully propagated.
if rp_is_success.await.is_err() {
return;
}
storage_client.generate_events(block_guard, hash, items, child_trie).await;
};
self.executor
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id,
discarded_items: Some(discarded),
}))
rp
}
fn chain_head_unstable_call(
@@ -352,29 +380,31 @@ where
hash: Block::Hash,
function: String,
call_parameters: String,
) -> Result<MethodResponse, ChainHeadRpcError> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
) -> ResponsePayload<'static, MethodResponse> {
let call_parameters = match parse_hex_param(call_parameters) {
Ok(hex) => Bytes::from(hex),
Err(err) => return ResponsePayload::error(err),
};
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
return ResponsePayload::success(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
// Reject subscription if with_runtime is false.
if !block_guard.has_runtime() {
return Err(ChainHeadRpcError::InvalidRuntimeCall(
return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
"The runtime updates flag must be set".to_string(),
)
.into())
));
}
let operation_id = block_guard.operation().operation_id();
@@ -395,11 +425,20 @@ where
})
});
let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: operation_id.clone(),
discarded_items: None,
}))
let (rp, rp_fut) = method_started_response(operation_id, None);
let fut = async move {
// Events should only by generated
// if the response was successfully propagated.
if rp_fut.await.is_err() {
return;
}
let _ = block_guard.response_sender().unbounded_send(event);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
}
fn chain_head_unstable_unpin(
@@ -463,3 +502,11 @@ where
Ok(())
}
}
fn method_started_response(
operation_id: String,
discarded_items: Option<usize>,
) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
ResponsePayload::success(rp).notify_on_completion()
}
@@ -27,8 +27,7 @@ use assert_matches::assert_matches;
use codec::{Decode, Encode};
use futures::Future;
use jsonrpsee::{
core::{error::Error, server::Subscription as RpcSubscription},
rpc_params, RpcModule,
core::server::Subscription as RpcSubscription, rpc_params, MethodsError as Error, RpcModule,
};
use sc_block_builder::BlockBuilderBuilder;
use sc_client_api::ChildInfo;
@@ -359,7 +358,7 @@ async fn get_header() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Obtain the valid header.
@@ -388,7 +387,7 @@ async fn get_body() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Valid call.
@@ -473,7 +472,7 @@ async fn call_runtime() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Pass an invalid parameters that cannot be decode.
@@ -486,7 +485,7 @@ async fn call_runtime() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message().contains("Invalid parameter")
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message().contains("Invalid parameter")
);
// Valid call.
@@ -589,7 +588,7 @@ async fn call_runtime_without_flag() {
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_RUNTIME_CALL && err.message().contains("subscription was started with `withRuntime` set to `false`")
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_RUNTIME_CALL && err.message().contains("subscription was started with `withRuntime` set to `false`")
);
}
@@ -627,7 +626,7 @@ async fn get_storage_hash() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Valid call without storage at the key.
@@ -895,7 +894,7 @@ async fn get_storage_value() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Valid call without storage at the key.
@@ -1571,7 +1570,7 @@ async fn follow_with_unpin() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// To not exceed the number of pinned blocks, we need to unpin before the next import.
@@ -1674,7 +1673,7 @@ async fn unpin_duplicate_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
Error::JsonRpc(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
);
// Block tree:
@@ -1709,7 +1708,7 @@ async fn unpin_duplicate_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
Error::JsonRpc(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
);
// Can unpin blocks.
@@ -1822,7 +1821,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
let _res: () = api
@@ -1839,7 +1838,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
// Unpin multiple blocks.
@@ -1857,7 +1856,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
let err = api
@@ -1868,7 +1867,7 @@ async fn follow_with_multiple_unpin_hashes() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
Error::JsonRpc(ref err) if err.code() == super::error::rpc_spec_v2::INVALID_BLOCK_ERROR && err.message() == "Invalid block hash"
);
}
@@ -24,7 +24,7 @@ use crate::{
use assert_matches::assert_matches;
use codec::Encode;
use futures::Future;
use jsonrpsee::{core::error::Error, rpc_params, RpcModule};
use jsonrpsee::{rpc_params, MethodsError as Error, RpcModule};
use sc_transaction_pool::*;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool};
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
@@ -194,7 +194,7 @@ async fn tx_broadcast_invalid_tx() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
);
assert_eq!(0, pool.status().ready);
@@ -219,7 +219,7 @@ async fn tx_broadcast_invalid_tx() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
@@ -233,6 +233,6 @@ async fn tx_invalid_stop() {
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
+1 -1
View File
@@ -18,7 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.1" }
futures = "0.3.21"
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
log = { workspace = true, default-features = true }
parking_lot = "0.12.1"
serde_json = "1.0.111"
+4 -7
View File
@@ -21,10 +21,7 @@ use super::*;
use crate::testing::{test_executor, timeout_secs};
use assert_matches::assert_matches;
use codec::Encode;
use jsonrpsee::{
core::{EmptyServerParams as EmptyParams, Error as RpcError},
RpcModule,
};
use jsonrpsee::{core::EmptyServerParams as EmptyParams, MethodsError as RpcError, RpcModule};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool_api::TransactionStatus;
use sp_core::{
@@ -103,7 +100,7 @@ async fn author_submit_transaction_should_not_cause_error() {
assert_matches!(
api.call::<_, H256>("author_submitExtrinsic", [xt]).await,
Err(RpcError::Call(err)) if err.message().contains("Already Imported") && err.code() == 1013
Err(RpcError::JsonRpc(err)) if err.message().contains("Already Imported") && err.code() == 1013
);
}
@@ -160,7 +157,7 @@ async fn author_should_return_watch_validation_error() {
assert_matches!(
failed_sub,
Err(RpcError::Call(err)) if err.message().contains("Invalid Transaction") && err.code() == 1010
Err(RpcError::JsonRpc(err)) if err.message().contains("Invalid Transaction") && err.code() == 1010
);
}
@@ -276,7 +273,7 @@ async fn author_has_session_keys() {
assert_matches!(
api.call::<_, bool>("author_hasSessionKeys", vec![Bytes::from(vec![1, 2, 3])]).await,
Err(RpcError::Call(err)) if err.message().contains("Session keys are not encoded correctly")
Err(RpcError::JsonRpc(err)) if err.message().contains("Session keys are not encoded correctly")
);
}
+1 -1
View File
@@ -100,7 +100,7 @@ async fn deny_unsafe_works() {
let (resp, _) = api.raw_json_request(&request, 1).await.expect("Raw calls should succeed");
assert_eq!(
resp.result,
resp,
r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"RPC call is unsafe to be called externally"},"id":1}"#
);
}
+2 -2
View File
@@ -21,7 +21,7 @@ use super::*;
use crate::testing::{test_executor, timeout_secs};
use assert_matches::assert_matches;
use futures::executor;
use jsonrpsee::core::{EmptyServerParams as EmptyParams, Error as RpcError};
use jsonrpsee::{core::EmptyServerParams as EmptyParams, MethodsError as RpcError};
use sc_block_builder::BlockBuilderBuilder;
use sc_rpc_api::DenyUnsafe;
use sp_consensus::BlockOrigin;
@@ -525,7 +525,7 @@ async fn wildcard_storage_subscriptions_are_rpc_unsafe() {
let api_rpc = api.into_rpc();
let err = api_rpc.subscribe_unbounded("state_subscribeStorage", EmptyParams::new()).await;
assert_matches!(err, Err(RpcError::Call(e)) if e.message() == "RPC call is unsafe to be called externally");
assert_matches!(err, Err(RpcError::JsonRpc(e)) if e.message() == "RPC call is unsafe to be called externally");
}
#[tokio::test]
+3 -6
View File
@@ -19,10 +19,7 @@
use super::{helpers::SyncState, *};
use assert_matches::assert_matches;
use futures::prelude::*;
use jsonrpsee::{
core::{EmptyServerParams as EmptyParams, Error as RpcError},
RpcModule,
};
use jsonrpsee::{core::EmptyServerParams as EmptyParams, MethodsError as RpcError, RpcModule};
use sc_network::{self, config::Role, PeerId};
use sc_rpc_api::system::helpers::PeerInfo;
use sc_utils::mpsc::tracing_unbounded;
@@ -311,7 +308,7 @@ async fn system_network_add_reserved() {
let bad_peer_id = ["/ip4/198.51.100.19/tcp/30333"];
assert_matches!(
api(None).call::<_, ()>("system_addReservedPeer", bad_peer_id).await,
Err(RpcError::Call(err)) if err.message().contains("Peer id is missing from the address")
Err(RpcError::JsonRpc(err)) if err.message().contains("Peer id is missing from the address")
);
}
@@ -327,7 +324,7 @@ async fn system_network_remove_reserved() {
assert_matches!(
api(None).call::<_, String>("system_removeReservedPeer", bad_peer_id).await,
Err(RpcError::Call(err)) if err.message().contains("base-58 decode error: provided string contained invalid character '/' at byte 0")
Err(RpcError::JsonRpc(err)) if err.message().contains("base-58 decode error: provided string contained invalid character '/' at byte 0")
);
}
#[tokio::test]
+8 -2
View File
@@ -80,7 +80,7 @@ where
Either::Left((Ok(sink), _)) => break sink,
Either::Right((Some(msg), f)) => {
if buf.push_back(msg).is_err() {
log::warn!(target: "rpc", "Subscription::accept failed buffer limit={} exceed; dropping subscription", buf.max_cap);
log::warn!(target: "rpc", "Subscription::accept failed buffer limit={} exceeded; dropping subscription", buf.max_cap);
return
}
accept_fut = f;
@@ -125,7 +125,13 @@ async fn inner_pipe_from_stream<S, T>(
// New item from the stream
Either::Right((Either::Right((Some(v), n)), c)) => {
if buf.push_back(v).is_err() {
log::warn!(target: "rpc", "Subscription buffer limit={} exceed; dropping subscription", buf.max_cap);
log::warn!(
target: "rpc",
"Subscription buffer limit={} exceeded for subscription={} conn_id={}; dropping subscription",
buf.max_cap,
sink.method_name(),
sink.connection_id()
);
return
}
+1 -1
View File
@@ -28,7 +28,7 @@ runtime-benchmarks = [
]
[dependencies]
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
thiserror = "1.0.48"
futures = "0.3.21"
rand = "0.8.5"
+3 -6
View File
@@ -38,7 +38,7 @@ use std::{collections::HashMap, net::SocketAddr};
use codec::{Decode, Encode};
use futures::{pin_mut, FutureExt, StreamExt};
use jsonrpsee::{core::Error as JsonRpseeError, RpcModule};
use jsonrpsee::RpcModule;
use log::{debug, error, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::{
@@ -109,17 +109,14 @@ impl RpcHandlers {
pub async fn rpc_query(
&self,
json_query: &str,
) -> Result<(String, tokio::sync::mpsc::Receiver<String>), JsonRpseeError> {
) -> Result<(String, tokio::sync::mpsc::Receiver<String>), serde_json::Error> {
// Because `tokio::sync::mpsc::channel` is used under the hood
// it will panic if it's set to usize::MAX.
//
// This limit is used to prevent panics and is large enough.
const TOKIO_MPSC_MAX_SIZE: usize = tokio::sync::Semaphore::MAX_PERMITS;
self.0
.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE)
.await
.map(|(method_res, recv)| (method_res.result, recv))
self.0.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
}
/// Provides access to the underlying `RpcModule`
+1 -1
View File
@@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.1" }
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
thiserror = "1.0.48"