From 877a7ab5310257819bb42141e07aea96f2510234 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 9 Apr 2020 20:30:34 +0200 Subject: [PATCH] Add metrics for the events in the network output channels (#5597) * Add metrics for the events in the network output channels * Documentation fixes * A couple fixes * Fix panic at destruction * Rework for direct Prometheus integration * Don't lock on the Receiver * Another review address * Address review * Update client/network/src/service/out_events.rs Co-Authored-By: Max Inden * Fix bad event name * Fix descriptions * Fix names * client/network/service/out_events: Apply remaining suggestions Co-authored-by: Max Inden --- substrate/client/network/src/service.rs | 17 +- .../client/network/src/service/out_events.rs | 269 ++++++++++++++++++ 2 files changed, 275 insertions(+), 11 deletions(-) create mode 100644 substrate/client/network/src/service/out_events.rs diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index ba4cdcecdf..f7075cf16b 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -68,6 +68,7 @@ use std::{ task::Poll, }; +mod out_events; #[cfg(test)] mod tests; @@ -386,7 +387,7 @@ impl NetworkWorker { import_queue: params.import_queue, from_worker, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), - event_streams: Vec::new(), + event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, metrics, boot_node_ids, }) @@ -576,7 +577,7 @@ impl NetworkService { /// The stream never ends (unless the `NetworkWorker` gets shut down). pub fn event_stream(&self) -> impl Stream { // Note: when transitioning to stable futures, remove the `Error` entirely - let (tx, rx) = tracing_unbounded("mpsc_network_event_stream"); + let (tx, rx) = out_events::channel(); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); rx } @@ -796,7 +797,7 @@ enum ServiceToWorkerMsg { PutValue(record::Key, Vec), AddKnownAddress(PeerId, Multiaddr), SyncFork(Vec, B::Hash, NumberFor), - EventStream(TracingUnboundedSender), + EventStream(out_events::Sender), WriteNotification { message: Vec, engine_id: ConsensusEngineId, @@ -831,7 +832,7 @@ pub struct NetworkWorker { /// Receiver for queries from the light client that must be processed. light_client_rqs: Option>>, /// Senders for events that happen on the network. - event_streams: Vec>, + event_streams: out_events::OutChannels, /// Prometheus network metrics. metrics: Option, /// The `PeerId`'s of all boot nodes. @@ -855,7 +856,6 @@ struct Metrics { network_per_sec_bytes: GaugeVec, notifications_queues_size: HistogramVec, notifications_sizes: HistogramVec, - num_event_stream_channels: Gauge, opened_notification_streams: GaugeVec, peers_count: Gauge, peerset_num_discovered: Gauge, @@ -948,10 +948,6 @@ impl Metrics { }, &["direction", "protocol"] )?, registry)?, - num_event_stream_channels: register(Gauge::new( - "sub_libp2p_num_event_stream_channels", - "Number of internal active channels that broadcast network events", - )?, registry)?, opened_notification_streams: register(GaugeVec::new( Opts::new( "sub_libp2p_opened_notification_streams", @@ -1105,10 +1101,10 @@ impl Future for NetworkWorker { } }, Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => { - this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()); if let Some(metrics) = this.metrics.as_ref() { metrics.update_with_network_event(&ev); } + this.event_streams.send(ev); }, Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) => { trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); @@ -1249,7 +1245,6 @@ impl Future for NetworkWorker { metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec()); metrics.is_major_syncing.set(is_major_syncing as u64); metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64); - metrics.num_event_stream_channels.set(this.event_streams.len() as u64); metrics.peers_count.set(num_connected_peers as u64); metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64); metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64); diff --git a/substrate/client/network/src/service/out_events.rs b/substrate/client/network/src/service/out_events.rs new file mode 100644 index 0000000000..10bb9b7e91 --- /dev/null +++ b/substrate/client/network/src/service/out_events.rs @@ -0,0 +1,269 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Registering events streams. +//! +//! This code holds the logic that is used for the network service to inform other parts of +//! Substrate about what is happening. +//! +//! # Usage +//! +//! - Create an instance of [`OutChannels`]. +//! - Create channels using the [`channel`] function. The receiving side implements the `Stream` +//! trait. +//! - You cannot directly send an event on a sender. Instead, you have to call +//! [`OutChannels::push`] to put the sender within a [`OutChannels`]. +//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the +//! collection. +//! + +use crate::Event; +use super::engine_id_to_string; + +use futures::{prelude::*, channel::mpsc, ready}; +use parking_lot::Mutex; +use prometheus_endpoint::{register, CounterVec, Gauge, Opts, PrometheusError, Registry, U64}; +use std::{ + convert::TryFrom as _, + fmt, pin::Pin, sync::Arc, + task::{Context, Poll} +}; + +/// Creates a new channel that can be associated to a [`OutChannels`]. +pub fn channel() -> (Sender, Receiver) { + let (tx, rx) = mpsc::unbounded(); + let metrics = Arc::new(Mutex::new(None)); + let tx = Sender { inner: tx, metrics: metrics.clone() }; + let rx = Receiver { inner: rx, metrics }; + (tx, rx) +} + +/// Sending side of a channel. +/// +/// Must be associated with an [`OutChannels`] before anything can be sent on it +/// +/// > **Note**: Contrary to regular channels, this `Sender` is purposefully designed to not +/// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in +/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**. +pub struct Sender { + inner: mpsc::UnboundedSender, + /// Clone of [`Receiver::metrics`]. + metrics: Arc>>>>, +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Sender").finish() + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let metrics = self.metrics.lock(); + if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) { + metrics.num_channels.dec(); + } + } +} + +/// Receiving side of a channel. +pub struct Receiver { + inner: mpsc::UnboundedReceiver, + /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] + /// is assigned to an instance of [`OutChannels`]. + metrics: Arc>>>>, +} + +impl Stream for Receiver { + type Item = Event; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { + let metrics = self.metrics.lock().clone(); + if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) { + metrics.event_out(&ev); + } else { + log::warn!("Inconsistency in out_events: event happened before sender associated"); + } + Poll::Ready(Some(ev)) + } else { + Poll::Ready(None) + } + } +} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Receiver").finish() + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + // Empty the list to properly decrease the metrics. + while let Some(Some(_)) = self.next().now_or_never() {} + } +} + +/// Collection of senders. +pub struct OutChannels { + event_streams: Vec, + /// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this + /// object. + metrics: Arc>, +} + +impl OutChannels { + /// Creates a new empty collection of senders. + pub fn new(registry: Option<&Registry>) -> Result { + let metrics = if let Some(registry) = registry { + Some(Metrics::register(registry)?) + } else { + None + }; + + Ok(OutChannels { + event_streams: Vec::new(), + metrics: Arc::new(metrics), + }) + } + + /// Adds a new [`Sender`] to the collection. + pub fn push(&mut self, sender: Sender) { + let mut metrics = sender.metrics.lock(); + debug_assert!(metrics.is_none()); + *metrics = Some(self.metrics.clone()); + drop(metrics); + self.event_streams.push(sender); + + if let Some(metrics) = &*self.metrics { + metrics.num_channels.inc(); + } + } + + /// Sends an event. + pub fn send(&mut self, event: Event) { + self.event_streams.retain(|sender| { + sender.inner.unbounded_send(event.clone()).is_ok() + }); + + if let Some(metrics) = &*self.metrics { + metrics.event_in(&event, self.event_streams.len() as u64); + } + } +} + +impl fmt::Debug for OutChannels { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("OutChannels") + .field("num_channels", &self.event_streams.len()) + .finish() + } +} + +struct Metrics { + // This list is ordered alphabetically + events_total: CounterVec, + notifications_sizes: CounterVec, + num_channels: Gauge, +} + +impl Metrics { + fn register(registry: &Registry) -> Result { + Ok(Self { + events_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_out_events_events_total", + "Number of broadcast network events that have been sent or received across all \ + channels" + ), + &["event_name", "action"] + )?, registry)?, + notifications_sizes: register(CounterVec::new( + Opts::new( + "sub_libp2p_out_events_notifications_sizes", + "Size of notification events that have been sent or received across all \ + channels" + ), + &["protocol", "action"] + )?, registry)?, + num_channels: register(Gauge::new( + "sub_libp2p_out_events_num_channels", + "Number of internal active channels that broadcast network events", + )?, registry)?, + }) + } + + fn event_in(&self, event: &Event, num: u64) { + match event { + Event::Dht(_) => { + self.events_total + .with_label_values(&["dht", "sent"]) + .inc_by(num); + } + Event::NotificationStreamOpened { engine_id, .. } => { + self.events_total + .with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent"]) + .inc_by(num); + }, + Event::NotificationStreamClosed { engine_id, .. } => { + self.events_total + .with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent"]) + .inc_by(num); + }, + Event::NotificationsReceived { messages, .. } => { + for (engine_id, message) in messages { + self.events_total + .with_label_values(&[&format!("notif-{:?}", engine_id), "sent"]) + .inc_by(num); + self.notifications_sizes + .with_label_values(&[&engine_id_to_string(engine_id), "sent"]) + .inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value()))); + } + }, + } + } + + fn event_out(&self, event: &Event) { + match event { + Event::Dht(_) => { + self.events_total + .with_label_values(&["dht", "received"]) + .inc(); + } + Event::NotificationStreamOpened { engine_id, .. } => { + self.events_total + .with_label_values(&[&format!("notif-open-{:?}", engine_id), "received"]) + .inc(); + }, + Event::NotificationStreamClosed { engine_id, .. } => { + self.events_total + .with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received"]) + .inc(); + }, + Event::NotificationsReceived { messages, .. } => { + for (engine_id, message) in messages { + self.events_total + .with_label_values(&[&format!("notif-{:?}", engine_id), "received"]) + .inc(); + self.notifications_sizes + .with_label_values(&[&engine_id_to_string(engine_id), "received"]) + .inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value())); + } + }, + } + } +}