rpc server: add prometheus label is_rate_limited (#3504)

After some discussion with @kogeler after the we added the rate-limit
middleware it may slow down
the rpc call timings metrics significantly because it works as follows:

1. The rate limit guard is checked when the call comes and if a slot is
available -> process the call
2. If no free spot is available then the call will be sleeping
`jitter_delay + min_time_rate_guard` then woken up and checked at most
ten times
3. If no spot is available after 10 iterations -> the call is rejected
(this may take tens of seconds)

Thus, this PR adds a label "is_rate_limited" to filter those out on the
metrics "substrate_rpc_calls_time" and "substrate_rpc_calls_finished".

I had to merge two middleware layers Metrics and RateLimit to avoid
shared state in a hacky way.

---------

Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Niklas Adolfsson
2024-03-05 10:50:57 +01:00
committed by GitHub
parent 0eda0b3f17
commit efcea0edab
7 changed files with 240 additions and 213 deletions
Generated
-1
View File
@@ -16745,7 +16745,6 @@ dependencies = [
"hyper",
"jsonrpsee",
"log",
"pin-project",
"serde_json",
"substrate-prometheus-endpoint",
"tokio",
+13
View File
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
title: add prometheus label "is_rate_limited" to rpc calls
doc:
- audience: Node Operator
description: |
This PR adds a label "is_rate_limited" to the prometheus metrics "substrate_rpc_calls_time" and "substrate_rpc_calls_finished"
than can be used to distinguish rate-limited RPC calls from other RPC calls. Because rate-limited RPC calls may take
tens of seconds.
crates: [ ]
-1
View File
@@ -26,5 +26,4 @@ tower = { version = "0.4.13", features = ["util"] }
http = "0.2.8"
hyper = "0.14.27"
futures = "0.3.29"
pin-project = "1.1.3"
governor = "0.6.0"
+17 -8
View File
@@ -49,7 +49,7 @@ pub use jsonrpsee::{
},
server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
};
pub use middleware::{MetricsLayer, RateLimitLayer, RpcMetrics};
pub use middleware::{Metrics, MiddlewareLayer, RpcMetrics};
const MEGABYTE: u32 = 1024 * 1024;
@@ -173,13 +173,22 @@ where
let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };
let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label));
let rate_limit = rate_limit.map(|r| RateLimitLayer::per_minute(r));
let middleware_layer = match (metrics, rate_limit) {
(None, None) => None,
(Some(metrics), None) => Some(
MiddlewareLayer::new().with_metrics(Metrics::new(metrics, transport_label)),
),
(None, Some(rate_limit)) =>
Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
(Some(metrics), Some(rate_limit)) => Some(
MiddlewareLayer::new()
.with_metrics(Metrics::new(metrics, transport_label))
.with_rate_limit_per_minute(rate_limit),
),
};
// NOTE: The metrics needs to run first to include rate-limited calls in the
// metrics.
let rpc_middleware =
RpcServiceBuilder::new().option_layer(metrics.clone()).option_layer(rate_limit);
RpcServiceBuilder::new().option_layer(middleware_layer.clone());
let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);
@@ -191,9 +200,9 @@ where
// Spawn a task to handle when the connection is closed.
tokio_handle.spawn(async move {
let now = std::time::Instant::now();
metrics.as_ref().map(|m| m.ws_connect());
middleware_layer.as_ref().map(|m| m.ws_connect());
on_disconnect.await;
metrics.as_ref().map(|m| m.ws_disconnect(now));
middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
});
}
@@ -18,15 +18,9 @@
//! RPC middleware to collect prometheus metrics on RPC calls.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Instant,
};
use std::time::Instant;
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse};
use pin_project::pin_project;
use jsonrpsee::{types::Request, MethodResponse};
use prometheus_endpoint::{
register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
@@ -77,7 +71,7 @@ impl RpcMetrics {
"Total time [μs] of processed RPC calls",
)
.buckets(HISTOGRAM_BUCKETS.to_vec()),
&["protocol", "method"],
&["protocol", "method", "is_rate_limited"],
)?,
metrics_registry,
)?,
@@ -97,7 +91,7 @@ impl RpcMetrics {
"substrate_rpc_calls_finished",
"Number of processed RPC calls (unique un-batched requests)",
),
&["protocol", "method", "is_error"],
&["protocol", "method", "is_error", "is_rate_limited"],
)?,
metrics_registry,
)?,
@@ -144,17 +138,67 @@ impl RpcMetrics {
self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _);
}
pub(crate) fn on_call(&self, req: &Request, transport_label: &'static str) {
log::trace!(
target: "rpc_metrics",
"[{transport_label}] on_call name={} params={:?}",
req.method_name(),
req.params(),
);
self.calls_started
.with_label_values(&[transport_label, req.method_name()])
.inc();
}
pub(crate) fn on_response(
&self,
req: &Request,
rp: &MethodResponse,
is_rate_limited: bool,
transport_label: &'static str,
now: Instant,
) {
log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={}", rp.as_result());
let micros = now.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
"[{transport_label}] {} call took {} μs",
req.method_name(),
micros,
);
self.calls_time
.with_label_values(&[
transport_label,
req.method_name(),
if is_rate_limited { "true" } else { "false" },
])
.observe(micros as _);
self.calls_finished
.with_label_values(&[
transport_label,
req.method_name(),
// the label "is_error", so `success` should be regarded as false
// and vice-versa to be registrered correctly.
if rp.is_success() { "false" } else { "true" },
if is_rate_limited { "true" } else { "false" },
])
.inc();
}
}
/// Metrics layer.
#[derive(Clone)]
pub struct MetricsLayer {
inner: RpcMetrics,
transport_label: &'static str,
/// Metrics with transport label.
#[derive(Clone, Debug)]
pub struct Metrics {
pub(crate) inner: RpcMetrics,
pub(crate) transport_label: &'static str,
}
impl MetricsLayer {
/// Create a new [`MetricsLayer`].
impl Metrics {
/// Create a new [`Metrics`].
pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
Self { inner: metrics, transport_label }
}
@@ -166,116 +210,18 @@ impl MetricsLayer {
pub(crate) fn ws_disconnect(&self, now: Instant) {
self.inner.ws_disconnect(now)
}
}
impl<S> tower::Layer<S> for MetricsLayer {
type Service = Metrics<S>;
pub(crate) fn on_call(&self, req: &Request) {
self.inner.on_call(req, self.transport_label)
}
fn layer(&self, inner: S) -> Self::Service {
Metrics::new(inner, self.inner.clone(), self.transport_label)
}
}
/// Metrics middleware.
#[derive(Clone)]
pub struct Metrics<S> {
service: S,
metrics: RpcMetrics,
transport_label: &'static str,
}
impl<S> Metrics<S> {
/// Create a new metrics middleware.
pub fn new(service: S, metrics: RpcMetrics, transport_label: &'static str) -> Metrics<S> {
Metrics { service, metrics, transport_label }
}
}
impl<'a, S> RpcServiceT<'a> for Metrics<S>
where
S: Send + Sync + RpcServiceT<'a>,
{
type Future = ResponseFuture<'a, S::Future>;
fn call(&self, req: Request<'a>) -> Self::Future {
let now = Instant::now();
log::trace!(
target: "rpc_metrics",
"[{}] on_call name={} params={:?}",
self.transport_label,
req.method_name(),
req.params(),
);
self.metrics
.calls_started
.with_label_values(&[self.transport_label, req.method_name()])
.inc();
ResponseFuture {
fut: self.service.call(req.clone()),
metrics: self.metrics.clone(),
req,
now,
transport_label: self.transport_label,
}
}
}
/// Response future for metrics.
#[pin_project]
pub struct ResponseFuture<'a, F> {
#[pin]
fut: F,
metrics: RpcMetrics,
req: Request<'a>,
now: Instant,
transport_label: &'static str,
}
impl<'a, F> std::fmt::Debug for ResponseFuture<'a, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("ResponseFuture")
}
}
impl<'a, F: Future<Output = MethodResponse>> Future for ResponseFuture<'a, F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.fut.poll(cx);
if let Poll::Ready(rp) = &res {
let method_name = this.req.method_name();
let transport_label = &this.transport_label;
let now = this.now;
let metrics = &this.metrics;
log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={:?}", rp);
let micros = now.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
"[{transport_label}] {method_name} call took {} μs",
micros,
);
metrics
.calls_time
.with_label_values(&[transport_label, method_name])
.observe(micros as _);
metrics
.calls_finished
.with_label_values(&[
transport_label,
method_name,
// the label "is_error", so `success` should be regarded as false
// and vice-versa to be registrered correctly.
if rp.is_success() { "false" } else { "true" },
])
.inc();
}
res
pub(crate) fn on_response(
&self,
req: &Request,
rp: &MethodResponse,
is_rate_limited: bool,
now: Instant,
) {
self.inner.on_response(req, rp, is_rate_limited, self.transport_label, now)
}
}
@@ -18,10 +18,131 @@
//! JSON-RPC specific middleware.
/// Grafana metrics middleware.
pub mod metrics;
/// Rate limit middleware.
pub mod rate_limit;
use std::{
num::NonZeroU32,
time::{Duration, Instant},
};
use futures::future::{BoxFuture, FutureExt};
use governor::{clock::Clock, Jitter};
use jsonrpsee::{
server::middleware::rpc::RpcServiceT,
types::{ErrorObject, Id, Request},
MethodResponse,
};
mod metrics;
mod rate_limit;
pub use metrics::*;
pub use rate_limit::*;
const MAX_JITTER: Duration = Duration::from_millis(50);
const MAX_RETRIES: usize = 10;
/// JSON-RPC middleware layer.
#[derive(Debug, Clone, Default)]
pub struct MiddlewareLayer {
rate_limit: Option<RateLimit>,
metrics: Option<Metrics>,
}
impl MiddlewareLayer {
/// Create an empty MiddlewareLayer.
pub fn new() -> Self {
Self::default()
}
/// Enable new rate limit middleware enforced per minute.
pub fn with_rate_limit_per_minute(self, n: NonZeroU32) -> Self {
Self { rate_limit: Some(RateLimit::per_minute(n)), metrics: self.metrics }
}
/// Enable metrics middleware.
pub fn with_metrics(self, metrics: Metrics) -> Self {
Self { rate_limit: self.rate_limit, metrics: Some(metrics) }
}
/// Register a new websocket connection.
pub fn ws_connect(&self) {
self.metrics.as_ref().map(|m| m.ws_connect());
}
/// Register that a websocket connection was closed.
pub fn ws_disconnect(&self, now: Instant) {
self.metrics.as_ref().map(|m| m.ws_disconnect(now));
}
}
impl<S> tower::Layer<S> for MiddlewareLayer {
type Service = Middleware<S>;
fn layer(&self, service: S) -> Self::Service {
Middleware { service, rate_limit: self.rate_limit.clone(), metrics: self.metrics.clone() }
}
}
/// JSON-RPC middleware that handles metrics
/// and rate-limiting.
///
/// These are part of the same middleware
/// because the metrics needs to know whether
/// a call was rate-limited or not because
/// it will impact the roundtrip for a call.
pub struct Middleware<S> {
service: S,
rate_limit: Option<RateLimit>,
metrics: Option<Metrics>,
}
impl<'a, S> RpcServiceT<'a> for Middleware<S>
where
S: Send + Sync + RpcServiceT<'a> + Clone + 'static,
{
type Future = BoxFuture<'a, MethodResponse>;
fn call(&self, req: Request<'a>) -> Self::Future {
let now = Instant::now();
self.metrics.as_ref().map(|m| m.on_call(&req));
let service = self.service.clone();
let rate_limit = self.rate_limit.clone();
let metrics = self.metrics.clone();
async move {
let mut is_rate_limited = false;
if let Some(limit) = rate_limit.as_ref() {
let mut attempts = 0;
let jitter = Jitter::up_to(MAX_JITTER);
loop {
if attempts >= MAX_RETRIES {
return reject_too_many_calls(req.id);
}
if let Err(rejected) = limit.inner.check() {
tokio::time::sleep(jitter + rejected.wait_time_from(limit.clock.now()))
.await;
} else {
break;
}
is_rate_limited = true;
attempts += 1;
}
}
let rp = service.call(req.clone()).await;
metrics.as_ref().map(|m| m.on_response(&req, &rp, is_rate_limited, now));
rp
}
.boxed()
}
}
fn reject_too_many_calls(id: Id) -> MethodResponse {
MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>))
}
@@ -16,92 +16,32 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! RPC rate limiting middleware.
//! RPC rate limit.
use std::{num::NonZeroU32, sync::Arc, time::Duration};
use futures::future::{BoxFuture, FutureExt};
use governor::{
clock::{Clock, DefaultClock, QuantaClock},
clock::{DefaultClock, QuantaClock},
middleware::NoOpMiddleware,
state::{InMemoryState, NotKeyed},
Jitter,
};
use jsonrpsee::{
server::middleware::rpc::RpcServiceT,
types::{ErrorObject, Id, Request},
MethodResponse,
Quota,
};
use std::{num::NonZeroU32, sync::Arc};
type RateLimitInner = governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>;
const MAX_JITTER: Duration = Duration::from_millis(50);
const MAX_RETRIES: usize = 10;
/// JSON-RPC rate limit middleware layer.
/// Rate limit.
#[derive(Debug, Clone)]
pub struct RateLimitLayer(governor::Quota);
pub struct RateLimit {
pub(crate) inner: Arc<RateLimitInner>,
pub(crate) clock: QuantaClock,
}
impl RateLimitLayer {
/// Create new rate limit enforced per minute.
impl RateLimit {
/// Create a new `RateLimit` per minute.
pub fn per_minute(n: NonZeroU32) -> Self {
Self(governor::Quota::per_minute(n))
}
}
/// JSON-RPC rate limit middleware
pub struct RateLimit<S> {
service: S,
rate_limit: Arc<RateLimitInner>,
clock: QuantaClock,
}
impl<S> tower::Layer<S> for RateLimitLayer {
type Service = RateLimit<S>;
fn layer(&self, service: S) -> Self::Service {
let clock = QuantaClock::default();
RateLimit {
service,
rate_limit: Arc::new(RateLimitInner::direct_with_clock(self.0, &clock)),
Self {
inner: Arc::new(RateLimitInner::direct_with_clock(Quota::per_minute(n), &clock)),
clock,
}
}
}
impl<'a, S> RpcServiceT<'a> for RateLimit<S>
where
S: Send + Sync + RpcServiceT<'a> + Clone + 'static,
{
type Future = BoxFuture<'a, MethodResponse>;
fn call(&self, req: Request<'a>) -> Self::Future {
let service = self.service.clone();
let rate_limit = self.rate_limit.clone();
let clock = self.clock.clone();
async move {
let mut attempts = 0;
let jitter = Jitter::up_to(MAX_JITTER);
loop {
if attempts >= MAX_RETRIES {
break reject_too_many_calls(req.id);
}
if let Err(rejected) = rate_limit.check() {
tokio::time::sleep(jitter + rejected.wait_time_from(clock.now())).await;
} else {
break service.call(req).await;
}
attempts += 1;
}
}
.boxed()
}
}
fn reject_too_many_calls(id: Id) -> MethodResponse {
MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>))
}