diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs index a833002fcd..6e09a0ea36 100644 --- a/substrate/client/rpc-servers/src/lib.rs +++ b/substrate/client/rpc-servers/src/lib.rs @@ -24,6 +24,7 @@ mod middleware; use jsonrpc_core::{IoHandlerExtension, MetaIoHandler}; use log::error; +use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use pubsub::PubSubMetadata; use std::io; @@ -42,7 +43,7 @@ const HTTP_THREADS: usize = 4; pub type RpcHandler = pubsub::PubSubHandler; pub use self::inner::*; -pub use middleware::{RpcMetrics, RpcMiddleware}; +pub use middleware::{method_names, RpcMetrics, RpcMiddleware}; /// Construct rpc `IoHandler` pub fn rpc_handler( @@ -73,6 +74,43 @@ pub fn rpc_handler( io } +/// RPC server-specific prometheus metrics. +#[derive(Debug, Clone, Default)] +pub struct ServerMetrics { + /// Number of sessions opened. + session_opened: Option>, + /// Number of sessions closed. + session_closed: Option>, +} + +impl ServerMetrics { + /// Create new WebSocket RPC server metrics. + pub fn new(registry: Option<&Registry>) -> Result { + registry + .map(|r| { + Ok(Self { + session_opened: register( + Counter::new( + "rpc_sessions_opened", + "Number of persistent RPC sessions opened", + )?, + r, + )? + .into(), + session_closed: register( + Counter::new( + "rpc_sessions_closed", + "Number of persistent RPC sessions closed", + )?, + r, + )? + .into(), + }) + }) + .unwrap_or_else(|| Ok(Default::default())) + } +} + #[cfg(not(target_os = "unknown"))] mod inner { use super::*; @@ -84,6 +122,16 @@ mod inner { /// Type alias for ws server pub type WsServer = ws::Server; + impl ws::SessionStats for ServerMetrics { + fn open_session(&self, _id: ws::SessionId) { + self.session_opened.as_ref().map(|m| m.inc()); + } + + fn close_session(&self, _id: ws::SessionId) { + self.session_closed.as_ref().map(|m| m.inc()); + } + } + /// Start HTTP server listening on given address. /// /// **Note**: Only available if `not(target_os = "unknown")`. @@ -114,6 +162,7 @@ mod inner { pub fn start_ipc( addr: &str, io: RpcHandler, + server_metrics: ServerMetrics, ) -> io::Result { let builder = ipc::ServerBuilder::new(io); #[cfg(target_os = "unix")] @@ -122,7 +171,7 @@ mod inner { security_attributes.set_mode(0o600)?; security_attributes }); - builder.start(addr) + builder.session_stats(server_metrics).start(addr) } /// Start WS server listening on given address. @@ -136,6 +185,7 @@ mod inner { cors: Option<&Vec>, io: RpcHandler, maybe_max_payload_mb: Option, + server_metrics: ServerMetrics, ) -> io::Result { let rpc_max_payload = maybe_max_payload_mb .map(|mb| mb.saturating_mul(MEGABYTE)) @@ -147,6 +197,7 @@ mod inner { .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) .allowed_origins(map_cors(cors)) .allowed_hosts(hosts_filtering(cors.is_some())) + .session_stats(server_metrics) .start(addr) .map_err(|err| match err { ws::Error::Io(io) => io, diff --git a/substrate/client/rpc-servers/src/middleware.rs b/substrate/client/rpc-servers/src/middleware.rs index 5ba5c18a8e..4338097745 100644 --- a/substrate/client/rpc-servers/src/middleware.rs +++ b/substrate/client/rpc-servers/src/middleware.rs @@ -18,41 +18,104 @@ //! Middleware for RPC requests. -use jsonrpc_core::{ - FutureOutput, FutureResponse, Metadata, Middleware as RequestMiddleware, Request, Response, -}; -use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64}; +use std::collections::HashSet; -use futures::{future::Either, Future}; +use jsonrpc_core::{FutureOutput, FutureResponse, Metadata, Middleware as RequestMiddleware}; +use prometheus_endpoint::{ + register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, +}; + +use futures::{future::Either, Future, FutureExt}; +use pubsub::PubSubMetadata; + +use crate::RpcHandler; /// Metrics for RPC middleware #[derive(Debug, Clone)] pub struct RpcMetrics { - rpc_calls: Option>, + requests_started: CounterVec, + requests_finished: CounterVec, + calls_time: HistogramVec, + calls_started: CounterVec, + calls_finished: CounterVec, } impl RpcMetrics { /// Create an instance of metrics - pub fn new(metrics_registry: Option<&Registry>) -> Result { - Ok(Self { - rpc_calls: metrics_registry - .map(|r| { - register( - CounterVec::new( - Opts::new("rpc_calls_total", "Number of rpc calls received"), - &["protocol"], - )?, - r, - ) - }) - .transpose()?, - }) + pub fn new(metrics_registry: Option<&Registry>) -> Result, PrometheusError> { + if let Some(r) = metrics_registry { + Ok(Some(Self { + requests_started: register( + CounterVec::new( + Opts::new( + "rpc_requests_started", + "Number of RPC requests (not calls) received by the server.", + ), + &["protocol"], + )?, + r, + )?, + requests_finished: register( + CounterVec::new( + Opts::new( + "rpc_requests_finished", + "Number of RPC requests (not calls) processed by the server.", + ), + &["protocol"], + )?, + r, + )?, + calls_time: register( + HistogramVec::new( + HistogramOpts::new( + "rpc_calls_time", + "Total time [μs] of processed RPC calls", + ), + &["protocol", "method"], + )?, + r, + )?, + calls_started: register( + CounterVec::new( + Opts::new( + "rpc_calls_started", + "Number of received RPC calls (unique un-batched requests)", + ), + &["protocol", "method"], + )?, + r, + )?, + calls_finished: register( + CounterVec::new( + Opts::new( + "rpc_calls_finished", + "Number of processed RPC calls (unique un-batched requests)", + ), + &["protocol", "method", "is_error"], + )?, + r, + )?, + })) + } else { + Ok(None) + } } } +/// Instantiates a dummy `IoHandler` given a builder function to extract supported method names. +pub fn method_names(gen_handler: F) -> Result, E> +where + F: FnOnce(RpcMiddleware) -> Result, E>, + M: PubSubMetadata, +{ + let io = gen_handler(RpcMiddleware::new(None, HashSet::new(), "dummy"))?; + Ok(io.iter().map(|x| x.0.clone()).collect()) +} + /// Middleware for RPC calls pub struct RpcMiddleware { - metrics: RpcMetrics, + metrics: Option, + known_rpc_method_names: HashSet, transport_label: String, } @@ -61,8 +124,12 @@ impl RpcMiddleware { /// /// - `metrics`: Will be used to report statistics. /// - `transport_label`: The label that is used when reporting the statistics. - pub fn new(metrics: RpcMetrics, transport_label: &str) -> Self { - RpcMiddleware { metrics, transport_label: String::from(transport_label) } + pub fn new( + metrics: Option, + known_rpc_method_names: HashSet, + transport_label: &str, + ) -> Self { + RpcMiddleware { metrics, known_rpc_method_names, transport_label: transport_label.into() } } } @@ -70,15 +137,113 @@ impl RequestMiddleware for RpcMiddleware { type Future = FutureResponse; type CallFuture = FutureOutput; - fn on_request(&self, request: Request, meta: M, next: F) -> Either + fn on_request( + &self, + request: jsonrpc_core::Request, + meta: M, + next: F, + ) -> Either where - F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + F: Fn(jsonrpc_core::Request, M) -> X + Send + Sync, + X: Future> + Send + 'static, { - if let Some(ref rpc_calls) = self.metrics.rpc_calls { - rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc(); + let metrics = self.metrics.clone(); + let transport_label = self.transport_label.clone(); + if let Some(ref metrics) = metrics { + metrics.requests_started.with_label_values(&[transport_label.as_str()]).inc(); } + let r = next(request, meta); + Either::Left( + async move { + let r = r.await; + if let Some(ref metrics) = metrics { + metrics.requests_finished.with_label_values(&[transport_label.as_str()]).inc(); + } + r + } + .boxed(), + ) + } - Either::Right(next(request, meta)) + fn on_call( + &self, + call: jsonrpc_core::Call, + meta: M, + next: F, + ) -> Either + where + F: Fn(jsonrpc_core::Call, M) -> X + Send + Sync, + X: Future> + Send + 'static, + { + #[cfg(not(target_os = "unknown"))] + let start = std::time::Instant::now(); + let name = call_name(&call, &self.known_rpc_method_names).to_owned(); + let metrics = self.metrics.clone(); + let transport_label = self.transport_label.clone(); + log::trace!(target: "rpc_metrics", "[{}] {} call: {:?}", transport_label, name, &call); + if let Some(ref metrics) = metrics { + metrics + .calls_started + .with_label_values(&[transport_label.as_str(), name.as_str()]) + .inc(); + } + let r = next(call, meta); + Either::Left( + async move { + let r = r.await; + #[cfg(not(target_os = "unknown"))] + let micros = start.elapsed().as_micros(); + // seems that std::time is not implemented for browser target + #[cfg(target_os = "unknown")] + let micros = 1; + if let Some(ref metrics) = metrics { + metrics + .calls_time + .with_label_values(&[transport_label.as_str(), name.as_str()]) + .observe(micros as _); + metrics + .calls_finished + .with_label_values(&[ + transport_label.as_str(), + name.as_str(), + if is_success(&r) { "true" } else { "false" }, + ]) + .inc(); + } + log::debug!( + target: "rpc_metrics", + "[{}] {} call took {} μs", + transport_label, + name, + micros, + ); + r + } + .boxed(), + ) + } +} + +fn call_name<'a>(call: &'a jsonrpc_core::Call, known_methods: &HashSet) -> &'a str { + // To prevent bloating metric with all invalid method names we filter them out here. + let only_known = |method: &'a String| { + if known_methods.contains(method) { + method.as_str() + } else { + "invalid method" + } + }; + + match call { + jsonrpc_core::Call::Invalid { .. } => "invalid call", + jsonrpc_core::Call::MethodCall(ref call) => only_known(&call.method), + jsonrpc_core::Call::Notification(ref notification) => only_known(¬ification.method), + } +} + +fn is_success(output: &Option) -> bool { + match output { + Some(jsonrpc_core::Output::Success(..)) => true, + _ => false, } } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index dea9953633..a1fb1b9097 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -639,12 +639,15 @@ where ) }; let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?; - let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone())?; + let server_metrics = sc_rpc_server::ServerMetrics::new(config.prometheus_registry())?; + let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone(), server_metrics)?; // This is used internally, so don't restrict access to unsafe RPC + let known_rpc_method_names = + sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?; let rpc_handlers = RpcHandlers(Arc::new( gen_handler( sc_rpc::DenyUnsafe::No, - sc_rpc_server::RpcMiddleware::new(rpc_metrics, "inbrowser"), + sc_rpc_server::RpcMiddleware::new(rpc_metrics, known_rpc_method_names, "inbrowser"), )? .into(), )); diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 24506a977e..883ece4236 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -349,7 +349,8 @@ fn start_rpc_servers< >( config: &Configuration, mut gen_handler: H, - rpc_metrics: sc_rpc_server::RpcMetrics, + rpc_metrics: Option, + server_metrics: sc_rpc_server::ServerMetrics, ) -> Result, Error> { fn maybe_start_server( address: Option, @@ -383,6 +384,7 @@ fn start_rpc_servers< } } + let rpc_method_names = sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?; Ok(Box::new(( config .rpc_ipc @@ -392,8 +394,13 @@ fn start_rpc_servers< &*path, gen_handler( sc_rpc::DenyUnsafe::No, - sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "ipc"), + sc_rpc_server::RpcMiddleware::new( + rpc_metrics.clone(), + rpc_method_names.clone(), + "ipc", + ), )?, + server_metrics.clone(), ) .map_err(Error::from) }) @@ -405,7 +412,11 @@ fn start_rpc_servers< config.rpc_cors.as_ref(), gen_handler( deny_unsafe(&address, &config.rpc_methods), - sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "http"), + sc_rpc_server::RpcMiddleware::new( + rpc_metrics.clone(), + rpc_method_names.clone(), + "http", + ), )?, config.rpc_max_payload, ) @@ -419,9 +430,14 @@ fn start_rpc_servers< config.rpc_cors.as_ref(), gen_handler( deny_unsafe(&address, &config.rpc_methods), - sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "ws"), + sc_rpc_server::RpcMiddleware::new( + rpc_metrics.clone(), + rpc_method_names.clone(), + "ws", + ), )?, config.rpc_max_payload, + server_metrics.clone(), ) .map_err(Error::from) })? @@ -440,8 +456,9 @@ fn start_rpc_servers< >( _: &Configuration, _: H, - _: sc_rpc_server::RpcMetrics, -) -> Result, error::Error> { + _: Option, + _: sc_rpc_server::ServerMetrics, +) -> Result, error::Error> { Ok(Box::new(())) }