Several tweaks to networking Prometheus metrics (#5636)

This commit is contained in:
Pierre Krieger
2020-04-16 15:18:35 +02:00
committed by GitHub
parent 9a60df2c56
commit 239d0998ea
2 changed files with 65 additions and 43 deletions
+53 -35
View File
@@ -859,12 +859,12 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
struct Metrics {
// This list is ordered alphabetically
connections: GaugeVec<U64>,
connections_closed_total: CounterVec<U64>,
connections_opened_total: CounterVec<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_finality_proofs_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
incoming_connections_errors_total: Counter<U64>,
incoming_connections_errors_total: CounterVec<U64>,
incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>,
@@ -874,7 +874,8 @@ struct Metrics {
network_per_sec_bytes: GaugeVec<U64>,
notifications_queues_size: HistogramVec,
notifications_sizes: HistogramVec,
opened_notification_streams: GaugeVec<U64>,
notifications_streams_closed_total: CounterVec<U64>,
notifications_streams_opened_total: CounterVec<U64>,
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
peerset_num_requested: Gauge<U64>,
@@ -887,19 +888,19 @@ impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
// This list is ordered alphabetically
connections: register(GaugeVec::new(
Opts::new(
"sub_libp2p_connections",
"Number of established libp2p connections"
),
&["direction"]
)?, registry)?,
connections_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_closed_total",
"Total number of connections closed, by reason"
"Total number of connections closed, by reason and direction"
),
&["reason"]
&["direction", "reason"]
)?, registry)?,
connections_opened_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_opened_total",
"Total number of connections opened"
),
&["direction"]
)?, registry)?,
import_queue_blocks_submitted: register(Counter::new(
"import_queue_blocks_submitted",
@@ -913,9 +914,13 @@ impl Metrics {
"import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?, registry)?,
incoming_connections_errors_total: register(Counter::new(
"sub_libp2p_incoming_connections_handshake_errors_total",
"Total number of incoming connections that have failed during the initial handshake"
incoming_connections_errors_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_incoming_connections_handshake_errors_total",
"Total number of incoming connections that have failed during the \
initial handshake"
),
&["reason"]
)?, registry)?,
incoming_connections_total: register(Counter::new(
"sub_libp2p_incoming_connections_total",
@@ -966,10 +971,17 @@ impl Metrics {
},
&["direction", "protocol"]
)?, registry)?,
opened_notification_streams: register(GaugeVec::new(
notifications_streams_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_opened_notification_streams",
"Number of open notification substreams"
"sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed"
),
&["protocol"]
)?, registry)?,
notifications_streams_opened_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened"
),
&["protocol"]
)?, registry)?,
@@ -1002,10 +1014,10 @@ impl Metrics {
fn update_with_network_event(&self, event: &Event) {
match event {
Event::NotificationStreamOpened { engine_id, .. } => {
self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
self.notifications_streams_opened_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).dec();
self.notifications_streams_closed_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
@@ -1129,34 +1141,33 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
if let Some(metrics) = this.metrics.as_ref() {
match endpoint {
ConnectedPoint::Dialer { .. } =>
metrics.connections.with_label_values(&["out"]).inc(),
metrics.connections_opened_total.with_label_values(&["out"]).inc(),
ConnectedPoint::Listener { .. } =>
metrics.connections.with_label_values(&["in"]).inc(),
metrics.connections_opened_total.with_label_values(&["in"]).inc(),
}
}
},
Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, .. }) => {
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
if let Some(metrics) = this.metrics.as_ref() {
match endpoint {
ConnectedPoint::Dialer { .. } =>
metrics.connections.with_label_values(&["out"]).dec(),
ConnectedPoint::Listener { .. } =>
metrics.connections.with_label_values(&["in"]).dec(),
}
let dir = match endpoint {
ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in",
};
match cause {
ConnectionError::IO(_) =>
metrics.connections_closed_total.with_label_values(&["transport-error"]).inc(),
metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) =>
metrics.connections_closed_total.with_label_values(&["ping-timeout"]).inc(),
metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) =>
metrics.connections_closed_total.with_label_values(&["force-closed"]).inc(),
metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) =>
metrics.connections_closed_total.with_label_values(&["protocol-error"]).inc(),
metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) =>
metrics.connections_closed_total.with_label_values(&["keep-alive-timeout"]).inc(),
metrics.connections_closed_total.with_label_values(&[dir, "keep-alive-timeout"]).inc(),
}
}
},
@@ -1214,14 +1225,21 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}",
local_addr, send_back_addr, error);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_errors_total.inc();
let reason = match error {
PendingConnectionError::ConnectionLimit(_) => "limit-reached",
PendingConnectionError::InvalidPeerId => "invalid-peer-id",
PendingConnectionError::Transport(_) |
PendingConnectionError::IO(_) => "transport-error",
};
metrics.incoming_connections_errors_total.with_label_values(&[reason]).inc();
}
},
Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => {
trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.",
peer_id, endpoint);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_errors_total.inc();
metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc();
}
},
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) =>
@@ -35,7 +35,7 @@ use super::engine_id_to_string;
use futures::{prelude::*, channel::mpsc, ready};
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, Gauge, Opts, PrometheusError, Registry, U64};
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use std::{
convert::TryFrom as _,
fmt, pin::Pin, sync::Arc,
@@ -77,7 +77,7 @@ impl Drop for Sender {
fn drop(&mut self) {
let metrics = self.metrics.lock();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.num_channels.dec();
metrics.num_channels.with_label_values(&[self.name]).dec();
}
}
}
@@ -151,11 +151,12 @@ impl OutChannels {
debug_assert!(metrics.is_none());
*metrics = Some(self.metrics.clone());
drop(metrics);
self.event_streams.push(sender);
if let Some(metrics) = &*self.metrics {
metrics.num_channels.inc();
metrics.num_channels.with_label_values(&[sender.name]).inc();
}
self.event_streams.push(sender);
}
/// Sends an event.
@@ -184,7 +185,7 @@ struct Metrics {
// This list is ordered alphabetically
events_total: CounterVec<U64>,
notifications_sizes: CounterVec<U64>,
num_channels: Gauge<U64>,
num_channels: GaugeVec<U64>,
}
impl Metrics {
@@ -206,9 +207,12 @@ impl Metrics {
),
&["protocol", "action", "name"]
)?, registry)?,
num_channels: register(Gauge::new(
"sub_libp2p_out_events_num_channels",
"Number of internal active channels that broadcast network events",
num_channels: register(GaugeVec::new(
Opts::new(
"sub_libp2p_out_events_num_channels",
"Number of internal active channels that broadcast network events",
),
&["name"]
)?, registry)?,
})
}