diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 357cd3baca..24d502ef2f 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -40,7 +40,7 @@ use sp_runtime::traits::{ use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, Message}; use message::generic::{Message as GenericMessage, ConsensusMessage}; -use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64}; +use prometheus_endpoint::{Registry, Gauge, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64}; use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; @@ -324,6 +324,7 @@ impl Protocol { block_announce_validator: Box + Send>, metrics_registry: Option<&Registry>, boot_node_ids: Arc>, + queue_size_report: Option, ) -> error::Result<(Protocol, sc_peerset::PeersetHandle)> { let info = chain.info(); let sync = ChainSync::new( @@ -346,7 +347,7 @@ impl Protocol { let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let mut behaviour = GenericProto::new(protocol_id.clone(), versions, peerset); + let mut behaviour = GenericProto::new(protocol_id.clone(), versions, peerset, queue_size_report); let mut legacy_equiv_by_name = HashMap::new(); @@ -2052,6 +2053,7 @@ mod tests { Box::new(DefaultBlockAnnounceValidator::new(client.clone())), None, Default::default(), + None, ).unwrap(); let dummy_peer_id = PeerId::random(); diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index e52ac5575f..c398f6df2d 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -24,6 +24,7 @@ use futures::prelude::*; use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use log::{debug, error, trace, warn}; +use prometheus_endpoint::HistogramVec; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; use std::task::{Context, Poll}; @@ -99,6 +100,9 @@ pub struct GenericProto { /// Events to produce from `poll()`. events: SmallVec<[NetworkBehaviourAction; 4]>, + + /// If `Some`, report the message queue sizes on this `Histogram`. + queue_size_report: Option, } /// State of a peer we're connected to. @@ -267,10 +271,14 @@ pub enum GenericProtoOut { impl GenericProto { /// Creates a `CustomProtos`. + /// + /// The `queue_size_report` is an optional Prometheus metric that can report the size of the + /// messages queue. If passed, it must have one label for the protocol name. pub fn new( protocol: impl Into, versions: &[u8], peerset: sc_peerset::Peerset, + queue_size_report: Option, ) -> Self { let legacy_protocol = RegisteredProtocol::new(protocol, versions); @@ -282,6 +290,7 @@ impl GenericProto { incoming: SmallVec::new(), next_incoming_index: sc_peerset::IncomingIndex(0), events: SmallVec::new(), + queue_size_report, } } @@ -723,7 +732,11 @@ impl NetworkBehaviour for GenericProto { type OutEvent = GenericProtoOut; fn new_handler(&mut self) -> Self::ProtocolsHandler { - NotifsHandlerProto::new(self.legacy_protocol.clone(), self.notif_protocols.clone()) + NotifsHandlerProto::new( + self.legacy_protocol.clone(), + self.notif_protocols.clone(), + self.queue_size_report.clone() + ) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs index 6b23263b14..21dc4091c0 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -64,6 +64,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{debug, error}; +use prometheus_endpoint::HistogramVec; use std::{borrow::Cow, error, io, str, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. @@ -225,12 +226,30 @@ pub enum NotifsHandlerOut { impl NotifsHandlerProto { /// Builds a new handler. - pub fn new(legacy: RegisteredProtocol, list: impl Into, Vec)>>) -> Self { + /// + /// The `queue_size_report` is an optional Prometheus metric that can report the size of the + /// messages queue. If passed, it must have one label for the protocol name. + pub fn new(legacy: RegisteredProtocol, list: impl Into, Vec)>>, queue_size_report: Option) -> Self { let list = list.into(); + let out_handlers = list + .clone() + .into_iter() + .map(|(p, _)| { + let queue_size_report = queue_size_report.as_ref().and_then(|qs| { + if let Ok(utf8) = str::from_utf8(&p) { + Some(qs.with_label_values(&[utf8])) + } else { + log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", p); + None + } + }); + NotifsOutHandlerProto::new(p, queue_size_report) + }).collect(); + NotifsHandlerProto { in_handlers: list.clone().into_iter().map(|(p, _)| NotifsInHandlerProto::new(p)).collect(), - out_handlers: list.clone().into_iter().map(|(p, _)| NotifsOutHandlerProto::new(p)).collect(), + out_handlers, legacy: LegacyProtoHandlerProto::new(legacy), } } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs index 62d2409be8..63b460bf8c 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -34,6 +34,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::error; +use prometheus_endpoint::Histogram; use smallvec::SmallVec; use std::{borrow::Cow, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; use wasm_timer::Instant; @@ -56,14 +57,17 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); pub struct NotifsOutHandlerProto { /// Name of the protocol to negotiate. protocol_name: Cow<'static, [u8]>, + /// Optional Prometheus histogram to report message queue size variations. + queue_size_report: Option, } impl NotifsOutHandlerProto { /// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the /// notifications substream. - pub fn new(protocol_name: impl Into>) -> Self { + pub fn new(protocol_name: impl Into>, queue_size_report: Option) -> Self { NotifsOutHandlerProto { protocol_name: protocol_name.into(), + queue_size_report, } } } @@ -79,6 +83,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { NotifsOutHandler { protocol_name: self.protocol_name, when_connection_open: Instant::now(), + queue_size_report: self.queue_size_report, state: State::Disabled, events_queue: SmallVec::new(), } @@ -103,6 +108,9 @@ pub struct NotifsOutHandler { /// When the connection with the remote has been successfully established. when_connection_open: Instant, + /// Optional prometheus histogram to report message queue sizes variations. + queue_size_report: Option, + /// Queue of events to send to the outside. /// /// This queue must only ever be modified to insert elements at the back, or remove the first @@ -301,6 +309,9 @@ impl ProtocolsHandler for NotifsOutHandler { NotifsOutHandlerIn::Send(msg) => if let State::Open { substream, .. } = &mut self.state { if let Some(Ok(_)) = substream.send(msg).now_or_never() { + if let Some(metric) = &self.queue_size_report { + metric.observe(substream.queue_len() as f64); + } } else { log::warn!( target: "sub-libp2p", diff --git a/substrate/client/network/src/protocol/generic_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs index 00b840d581..c0582d5f5c 100644 --- a/substrate/client/network/src/protocol/generic_proto/tests.rs +++ b/substrate/client/network/src/protocol/generic_proto/tests.rs @@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new(&b"test"[..], &[1], peerset), + inner: GenericProto::new(&b"test"[..], &[1], peerset, None), addrs: addrs .iter() .enumerate() diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs index b6ae1425f1..de42d85c80 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -38,7 +38,7 @@ use futures::{prelude::*, ready}; use futures_codec::Framed; use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade}; use log::error; -use std::{borrow::Cow, collections::VecDeque, io, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}}; use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. @@ -280,6 +280,13 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, } } +impl NotificationsOutSubstream { + /// Returns the number of items in the queue, capped to `u32::max_value()`. + pub fn queue_len(&self) -> u32 { + u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value()) + } +} + impl Sink> for NotificationsOutSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index de22f203bc..8d2f0e033f 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -42,7 +42,7 @@ use libp2p::{kad::record, Multiaddr, PeerId}; use log::{error, info, trace, warn}; use parking_lot::Mutex; use prometheus_endpoint::{ - register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64, + register, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, }; use sc_peerset::PeersetHandle; use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; @@ -239,6 +239,12 @@ impl NetworkWorker { let local_peer_id = local_public.clone().into_peer_id(); info!(target: "sub-libp2p", "🏷 Local node identity is: {}", local_peer_id.to_base58()); + // Initialize the metrics. + let metrics = match ¶ms.metrics_registry { + Some(registry) => Some(Metrics::register(®istry)?), + None => None + }; + let checker = params.on_demand.as_ref() .map(|od| od.checker().clone()) .unwrap_or(Arc::new(AlwaysBadChecker)); @@ -259,6 +265,7 @@ impl NetworkWorker { params.block_announce_validator, params.metrics_registry.as_ref(), boot_node_ids.clone(), + metrics.as_ref().map(|m| m.notifications_queues_size.clone()), )?; // Build the swarm. @@ -343,10 +350,7 @@ impl NetworkWorker { from_worker, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), event_streams: Vec::new(), - metrics: match params.metrics_registry { - Some(registry) => Some(Metrics::register(®istry)?), - None => None - }, + metrics, boot_node_ids, }) } @@ -802,6 +806,7 @@ struct Metrics { issued_light_requests: Counter, kbuckets_num_nodes: Gauge, network_per_sec_bytes: GaugeVec, + notifications_queues_size: HistogramVec, notifications_total: CounterVec, num_event_stream_channels: Gauge, opened_notification_streams: GaugeVec, @@ -847,6 +852,16 @@ impl Metrics { ), &["direction"] )?, registry)?, + notifications_queues_size: register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_notifications_queues_size", + "Total size of all the notification queues" + ), + buckets: vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0], + }, + &["protocol"] + )?, registry)?, notifications_total: register(CounterVec::new( Opts::new( "sub_libp2p_notifications_total", diff --git a/substrate/utils/prometheus/src/lib.rs b/substrate/utils/prometheus/src/lib.rs index 4a8f05d929..9dab2a2695 100644 --- a/substrate/utils/prometheus/src/lib.rs +++ b/substrate/utils/prometheus/src/lib.rs @@ -17,6 +17,7 @@ use futures_util::{FutureExt, future::Future}; pub use prometheus::{ Registry, Error as PrometheusError, Opts, + Histogram, HistogramOpts, HistogramVec, core::{ GenericGauge as Gauge, GenericCounter as Counter, GenericGaugeVec as GaugeVec, GenericCounterVec as CounterVec,