Add a sub_libp2p_notifications_queues_size Prometheus metric (#5503)

* Add a sub_libp2p_notifications_queues_size Prometheus metric

* Fix network tests

* Address review
This commit is contained in:
Pierre Krieger
2020-04-03 08:43:55 +02:00
committed by GitHub
parent 7983a8184b
commit e8dfb37aaf
8 changed files with 81 additions and 13 deletions
@@ -24,6 +24,7 @@ use futures::prelude::*;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use log::{debug, error, trace, warn};
use prometheus_endpoint::HistogramVec;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
@@ -99,6 +100,9 @@ pub struct GenericProto {
/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>; 4]>,
/// If `Some`, report the message queue sizes on this `Histogram`.
queue_size_report: Option<HistogramVec>,
}
/// State of a peer we're connected to.
@@ -267,10 +271,14 @@ pub enum GenericProtoOut {
impl GenericProto {
/// Creates a `CustomProtos`.
///
/// The `queue_size_report` is an optional Prometheus metric that can report the size of the
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
protocol: impl Into<ProtocolId>,
versions: &[u8],
peerset: sc_peerset::Peerset,
queue_size_report: Option<HistogramVec>,
) -> Self {
let legacy_protocol = RegisteredProtocol::new(protocol, versions);
@@ -282,6 +290,7 @@ impl GenericProto {
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: SmallVec::new(),
queue_size_report,
}
}
@@ -723,7 +732,11 @@ impl NetworkBehaviour for GenericProto {
type OutEvent = GenericProtoOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NotifsHandlerProto::new(self.legacy_protocol.clone(), self.notif_protocols.clone())
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
self.queue_size_report.clone()
)
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
@@ -64,6 +64,7 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{debug, error};
use prometheus_endpoint::HistogramVec;
use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
@@ -225,12 +226,30 @@ pub enum NotifsHandlerOut {
impl NotifsHandlerProto {
/// Builds a new handler.
pub fn new(legacy: RegisteredProtocol, list: impl Into<Vec<(Cow<'static, [u8]>, Vec<u8>)>>) -> Self {
///
/// The `queue_size_report` is an optional Prometheus metric that can report the size of the
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(legacy: RegisteredProtocol, list: impl Into<Vec<(Cow<'static, [u8]>, Vec<u8>)>>, queue_size_report: Option<HistogramVec>) -> Self {
let list = list.into();
let out_handlers = list
.clone()
.into_iter()
.map(|(p, _)| {
let queue_size_report = queue_size_report.as_ref().and_then(|qs| {
if let Ok(utf8) = str::from_utf8(&p) {
Some(qs.with_label_values(&[utf8]))
} else {
log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", p);
None
}
});
NotifsOutHandlerProto::new(p, queue_size_report)
}).collect();
NotifsHandlerProto {
in_handlers: list.clone().into_iter().map(|(p, _)| NotifsInHandlerProto::new(p)).collect(),
out_handlers: list.clone().into_iter().map(|(p, _)| NotifsOutHandlerProto::new(p)).collect(),
out_handlers,
legacy: LegacyProtoHandlerProto::new(legacy),
}
}
@@ -34,6 +34,7 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::error;
use prometheus_endpoint::Histogram;
use smallvec::SmallVec;
use std::{borrow::Cow, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
@@ -56,14 +57,17 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsOutHandlerProto {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>,
/// Optional Prometheus histogram to report message queue size variations.
queue_size_report: Option<Histogram>,
}
impl NotifsOutHandlerProto {
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>, queue_size_report: Option<Histogram>) -> Self {
NotifsOutHandlerProto {
protocol_name: protocol_name.into(),
queue_size_report,
}
}
}
@@ -79,6 +83,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
NotifsOutHandler {
protocol_name: self.protocol_name,
when_connection_open: Instant::now(),
queue_size_report: self.queue_size_report,
state: State::Disabled,
events_queue: SmallVec::new(),
}
@@ -103,6 +108,9 @@ pub struct NotifsOutHandler {
/// When the connection with the remote has been successfully established.
when_connection_open: Instant,
/// Optional prometheus histogram to report message queue sizes variations.
queue_size_report: Option<Histogram>,
/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
@@ -301,6 +309,9 @@ impl ProtocolsHandler for NotifsOutHandler {
NotifsOutHandlerIn::Send(msg) =>
if let State::Open { substream, .. } = &mut self.state {
if let Some(Ok(_)) = substream.send(msg).now_or_never() {
if let Some(metric) = &self.queue_size_report {
metric.observe(substream.queue_len() as f64);
}
} else {
log::warn!(
target: "sub-libp2p",
@@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(&b"test"[..], &[1], peerset),
inner: GenericProto::new(&b"test"[..], &[1], peerset, None),
addrs: addrs
.iter()
.enumerate()
@@ -38,7 +38,7 @@ use futures::{prelude::*, ready};
use futures_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, collections::VecDeque, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, collections::VecDeque, convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
@@ -280,6 +280,13 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
}
}
impl<TSubstream> NotificationsOutSubstream<TSubstream> {
/// Returns the number of items in the queue, capped to `u32::max_value()`.
pub fn queue_len(&self) -> u32 {
u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value())
}
}
impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
{