Better RPC prometheus metrics. (#9358)

* Better RPC prometehus metrics.

* Add session metrics.

* Add counting requests as well.

* Fix type for web build.

* Fix browser-node

* Filter out unknown method names.

* Change Gauge to Counters

* Use micros instead of millis.

* cargo fmt

* Update client/rpc-servers/src/lib.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* move log to separate lines.

* Fix compilation.

* cargo +nightly fmt --all

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Tomasz Drwięga
2021-08-24 12:48:23 +02:00
committed by GitHub
parent 3abe7cc4d0
commit d722c44248
4 changed files with 275 additions and 39 deletions
+53 -2
View File
@@ -24,6 +24,7 @@ mod middleware;
use jsonrpc_core::{IoHandlerExtension, MetaIoHandler}; use jsonrpc_core::{IoHandlerExtension, MetaIoHandler};
use log::error; use log::error;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use pubsub::PubSubMetadata; use pubsub::PubSubMetadata;
use std::io; use std::io;
@@ -42,7 +43,7 @@ const HTTP_THREADS: usize = 4;
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>; pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;
pub use self::inner::*; pub use self::inner::*;
pub use middleware::{RpcMetrics, RpcMiddleware}; pub use middleware::{method_names, RpcMetrics, RpcMiddleware};
/// Construct rpc `IoHandler` /// Construct rpc `IoHandler`
pub fn rpc_handler<M: PubSubMetadata>( pub fn rpc_handler<M: PubSubMetadata>(
@@ -73,6 +74,43 @@ pub fn rpc_handler<M: PubSubMetadata>(
io io
} }
/// RPC server-specific prometheus metrics.
#[derive(Debug, Clone, Default)]
pub struct ServerMetrics {
/// Number of sessions opened.
session_opened: Option<Counter<U64>>,
/// Number of sessions closed.
session_closed: Option<Counter<U64>>,
}
impl ServerMetrics {
/// Create new WebSocket RPC server metrics.
pub fn new(registry: Option<&Registry>) -> Result<Self, PrometheusError> {
registry
.map(|r| {
Ok(Self {
session_opened: register(
Counter::new(
"rpc_sessions_opened",
"Number of persistent RPC sessions opened",
)?,
r,
)?
.into(),
session_closed: register(
Counter::new(
"rpc_sessions_closed",
"Number of persistent RPC sessions closed",
)?,
r,
)?
.into(),
})
})
.unwrap_or_else(|| Ok(Default::default()))
}
}
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
mod inner { mod inner {
use super::*; use super::*;
@@ -84,6 +122,16 @@ mod inner {
/// Type alias for ws server /// Type alias for ws server
pub type WsServer = ws::Server; pub type WsServer = ws::Server;
impl ws::SessionStats for ServerMetrics {
fn open_session(&self, _id: ws::SessionId) {
self.session_opened.as_ref().map(|m| m.inc());
}
fn close_session(&self, _id: ws::SessionId) {
self.session_closed.as_ref().map(|m| m.inc());
}
}
/// Start HTTP server listening on given address. /// Start HTTP server listening on given address.
/// ///
/// **Note**: Only available if `not(target_os = "unknown")`. /// **Note**: Only available if `not(target_os = "unknown")`.
@@ -114,6 +162,7 @@ mod inner {
pub fn start_ipc<M: pubsub::PubSubMetadata + Default>( pub fn start_ipc<M: pubsub::PubSubMetadata + Default>(
addr: &str, addr: &str,
io: RpcHandler<M>, io: RpcHandler<M>,
server_metrics: ServerMetrics,
) -> io::Result<ipc::Server> { ) -> io::Result<ipc::Server> {
let builder = ipc::ServerBuilder::new(io); let builder = ipc::ServerBuilder::new(io);
#[cfg(target_os = "unix")] #[cfg(target_os = "unix")]
@@ -122,7 +171,7 @@ mod inner {
security_attributes.set_mode(0o600)?; security_attributes.set_mode(0o600)?;
security_attributes security_attributes
}); });
builder.start(addr) builder.session_stats(server_metrics).start(addr)
} }
/// Start WS server listening on given address. /// Start WS server listening on given address.
@@ -136,6 +185,7 @@ mod inner {
cors: Option<&Vec<String>>, cors: Option<&Vec<String>>,
io: RpcHandler<M>, io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>, maybe_max_payload_mb: Option<usize>,
server_metrics: ServerMetrics,
) -> io::Result<ws::Server> { ) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb let rpc_max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE)) .map(|mb| mb.saturating_mul(MEGABYTE))
@@ -147,6 +197,7 @@ mod inner {
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors)) .allowed_origins(map_cors(cors))
.allowed_hosts(hosts_filtering(cors.is_some())) .allowed_hosts(hosts_filtering(cors.is_some()))
.session_stats(server_metrics)
.start(addr) .start(addr)
.map_err(|err| match err { .map_err(|err| match err {
ws::Error::Io(io) => io, ws::Error::Io(io) => io,
+194 -29
View File
@@ -18,41 +18,104 @@
//! Middleware for RPC requests. //! Middleware for RPC requests.
use jsonrpc_core::{ use std::collections::HashSet;
FutureOutput, FutureResponse, Metadata, Middleware as RequestMiddleware, Request, Response,
};
use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};
use futures::{future::Either, Future}; use jsonrpc_core::{FutureOutput, FutureResponse, Metadata, Middleware as RequestMiddleware};
use prometheus_endpoint::{
register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64,
};
use futures::{future::Either, Future, FutureExt};
use pubsub::PubSubMetadata;
use crate::RpcHandler;
/// Metrics for RPC middleware /// Metrics for RPC middleware
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RpcMetrics { pub struct RpcMetrics {
rpc_calls: Option<CounterVec<U64>>, requests_started: CounterVec<U64>,
requests_finished: CounterVec<U64>,
calls_time: HistogramVec,
calls_started: CounterVec<U64>,
calls_finished: CounterVec<U64>,
} }
impl RpcMetrics { impl RpcMetrics {
/// Create an instance of metrics /// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Self, PrometheusError> { pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
Ok(Self { if let Some(r) = metrics_registry {
rpc_calls: metrics_registry Ok(Some(Self {
.map(|r| { requests_started: register(
register( CounterVec::new(
CounterVec::new( Opts::new(
Opts::new("rpc_calls_total", "Number of rpc calls received"), "rpc_requests_started",
&["protocol"], "Number of RPC requests (not calls) received by the server.",
)?, ),
r, &["protocol"],
) )?,
}) r,
.transpose()?, )?,
}) requests_finished: register(
CounterVec::new(
Opts::new(
"rpc_requests_finished",
"Number of RPC requests (not calls) processed by the server.",
),
&["protocol"],
)?,
r,
)?,
calls_time: register(
HistogramVec::new(
HistogramOpts::new(
"rpc_calls_time",
"Total time [μs] of processed RPC calls",
),
&["protocol", "method"],
)?,
r,
)?,
calls_started: register(
CounterVec::new(
Opts::new(
"rpc_calls_started",
"Number of received RPC calls (unique un-batched requests)",
),
&["protocol", "method"],
)?,
r,
)?,
calls_finished: register(
CounterVec::new(
Opts::new(
"rpc_calls_finished",
"Number of processed RPC calls (unique un-batched requests)",
),
&["protocol", "method", "is_error"],
)?,
r,
)?,
}))
} else {
Ok(None)
}
} }
} }
/// Instantiates a dummy `IoHandler` given a builder function to extract supported method names.
pub fn method_names<F, M, E>(gen_handler: F) -> Result<HashSet<String>, E>
where
F: FnOnce(RpcMiddleware) -> Result<RpcHandler<M>, E>,
M: PubSubMetadata,
{
let io = gen_handler(RpcMiddleware::new(None, HashSet::new(), "dummy"))?;
Ok(io.iter().map(|x| x.0.clone()).collect())
}
/// Middleware for RPC calls /// Middleware for RPC calls
pub struct RpcMiddleware { pub struct RpcMiddleware {
metrics: RpcMetrics, metrics: Option<RpcMetrics>,
known_rpc_method_names: HashSet<String>,
transport_label: String, transport_label: String,
} }
@@ -61,8 +124,12 @@ impl RpcMiddleware {
/// ///
/// - `metrics`: Will be used to report statistics. /// - `metrics`: Will be used to report statistics.
/// - `transport_label`: The label that is used when reporting the statistics. /// - `transport_label`: The label that is used when reporting the statistics.
pub fn new(metrics: RpcMetrics, transport_label: &str) -> Self { pub fn new(
RpcMiddleware { metrics, transport_label: String::from(transport_label) } metrics: Option<RpcMetrics>,
known_rpc_method_names: HashSet<String>,
transport_label: &str,
) -> Self {
RpcMiddleware { metrics, known_rpc_method_names, transport_label: transport_label.into() }
} }
} }
@@ -70,15 +137,113 @@ impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
type Future = FutureResponse; type Future = FutureResponse;
type CallFuture = FutureOutput; type CallFuture = FutureOutput;
fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<FutureResponse, X> fn on_request<F, X>(
&self,
request: jsonrpc_core::Request,
meta: M,
next: F,
) -> Either<Self::Future, X>
where where
F: Fn(Request, M) -> X + Send + Sync, F: Fn(jsonrpc_core::Request, M) -> X + Send + Sync,
X: Future<Output = Option<Response>> + Send + 'static, X: Future<Output = Option<jsonrpc_core::Response>> + Send + 'static,
{ {
if let Some(ref rpc_calls) = self.metrics.rpc_calls { let metrics = self.metrics.clone();
rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc(); let transport_label = self.transport_label.clone();
if let Some(ref metrics) = metrics {
metrics.requests_started.with_label_values(&[transport_label.as_str()]).inc();
} }
let r = next(request, meta);
Either::Left(
async move {
let r = r.await;
if let Some(ref metrics) = metrics {
metrics.requests_finished.with_label_values(&[transport_label.as_str()]).inc();
}
r
}
.boxed(),
)
}
Either::Right(next(request, meta)) fn on_call<F, X>(
&self,
call: jsonrpc_core::Call,
meta: M,
next: F,
) -> Either<Self::CallFuture, X>
where
F: Fn(jsonrpc_core::Call, M) -> X + Send + Sync,
X: Future<Output = Option<jsonrpc_core::Output>> + Send + 'static,
{
#[cfg(not(target_os = "unknown"))]
let start = std::time::Instant::now();
let name = call_name(&call, &self.known_rpc_method_names).to_owned();
let metrics = self.metrics.clone();
let transport_label = self.transport_label.clone();
log::trace!(target: "rpc_metrics", "[{}] {} call: {:?}", transport_label, name, &call);
if let Some(ref metrics) = metrics {
metrics
.calls_started
.with_label_values(&[transport_label.as_str(), name.as_str()])
.inc();
}
let r = next(call, meta);
Either::Left(
async move {
let r = r.await;
#[cfg(not(target_os = "unknown"))]
let micros = start.elapsed().as_micros();
// seems that std::time is not implemented for browser target
#[cfg(target_os = "unknown")]
let micros = 1;
if let Some(ref metrics) = metrics {
metrics
.calls_time
.with_label_values(&[transport_label.as_str(), name.as_str()])
.observe(micros as _);
metrics
.calls_finished
.with_label_values(&[
transport_label.as_str(),
name.as_str(),
if is_success(&r) { "true" } else { "false" },
])
.inc();
}
log::debug!(
target: "rpc_metrics",
"[{}] {} call took {} μs",
transport_label,
name,
micros,
);
r
}
.boxed(),
)
}
}
fn call_name<'a>(call: &'a jsonrpc_core::Call, known_methods: &HashSet<String>) -> &'a str {
// To prevent bloating metric with all invalid method names we filter them out here.
let only_known = |method: &'a String| {
if known_methods.contains(method) {
method.as_str()
} else {
"invalid method"
}
};
match call {
jsonrpc_core::Call::Invalid { .. } => "invalid call",
jsonrpc_core::Call::MethodCall(ref call) => only_known(&call.method),
jsonrpc_core::Call::Notification(ref notification) => only_known(&notification.method),
}
}
fn is_success(output: &Option<jsonrpc_core::Output>) -> bool {
match output {
Some(jsonrpc_core::Output::Success(..)) => true,
_ => false,
} }
} }
+5 -2
View File
@@ -639,12 +639,15 @@ where
) )
}; };
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?; let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone())?; let server_metrics = sc_rpc_server::ServerMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone(), server_metrics)?;
// This is used internally, so don't restrict access to unsafe RPC // This is used internally, so don't restrict access to unsafe RPC
let known_rpc_method_names =
sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?;
let rpc_handlers = RpcHandlers(Arc::new( let rpc_handlers = RpcHandlers(Arc::new(
gen_handler( gen_handler(
sc_rpc::DenyUnsafe::No, sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "inbrowser"), sc_rpc_server::RpcMiddleware::new(rpc_metrics, known_rpc_method_names, "inbrowser"),
)? )?
.into(), .into(),
)); ));
+23 -6
View File
@@ -349,7 +349,8 @@ fn start_rpc_servers<
>( >(
config: &Configuration, config: &Configuration,
mut gen_handler: H, mut gen_handler: H,
rpc_metrics: sc_rpc_server::RpcMetrics, rpc_metrics: Option<sc_rpc_server::RpcMetrics>,
server_metrics: sc_rpc_server::ServerMetrics,
) -> Result<Box<dyn std::any::Any + Send>, Error> { ) -> Result<Box<dyn std::any::Any + Send>, Error> {
fn maybe_start_server<T, F>( fn maybe_start_server<T, F>(
address: Option<SocketAddr>, address: Option<SocketAddr>,
@@ -383,6 +384,7 @@ fn start_rpc_servers<
} }
} }
let rpc_method_names = sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?;
Ok(Box::new(( Ok(Box::new((
config config
.rpc_ipc .rpc_ipc
@@ -392,8 +394,13 @@ fn start_rpc_servers<
&*path, &*path,
gen_handler( gen_handler(
sc_rpc::DenyUnsafe::No, sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "ipc"), sc_rpc_server::RpcMiddleware::new(
rpc_metrics.clone(),
rpc_method_names.clone(),
"ipc",
),
)?, )?,
server_metrics.clone(),
) )
.map_err(Error::from) .map_err(Error::from)
}) })
@@ -405,7 +412,11 @@ fn start_rpc_servers<
config.rpc_cors.as_ref(), config.rpc_cors.as_ref(),
gen_handler( gen_handler(
deny_unsafe(&address, &config.rpc_methods), deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "http"), sc_rpc_server::RpcMiddleware::new(
rpc_metrics.clone(),
rpc_method_names.clone(),
"http",
),
)?, )?,
config.rpc_max_payload, config.rpc_max_payload,
) )
@@ -419,9 +430,14 @@ fn start_rpc_servers<
config.rpc_cors.as_ref(), config.rpc_cors.as_ref(),
gen_handler( gen_handler(
deny_unsafe(&address, &config.rpc_methods), deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "ws"), sc_rpc_server::RpcMiddleware::new(
rpc_metrics.clone(),
rpc_method_names.clone(),
"ws",
),
)?, )?,
config.rpc_max_payload, config.rpc_max_payload,
server_metrics.clone(),
) )
.map_err(Error::from) .map_err(Error::from)
})? })?
@@ -440,8 +456,9 @@ fn start_rpc_servers<
>( >(
_: &Configuration, _: &Configuration,
_: H, _: H,
_: sc_rpc_server::RpcMetrics, _: Option<sc_rpc_server::RpcMetrics>,
) -> Result<Box<dyn std::any::Any + Send>, error::Error> { _: sc_rpc_server::ServerMetrics,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(())) Ok(Box::new(()))
} }