Remove redundant sync primitives for metrics (#14564)

* Remove redundant locks

* Re-enable warning for a sender when a queue got processed

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Anton <anton.kalyaev@gmail.com>

* Use debug for subsequent logging

* Update client/network/src/service/out_events.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Anton <anton.kalyaev@gmail.com>
Co-authored-by: parity-processbot <>
This commit is contained in:
Vsevolod Stakhov
2023-07-14 11:47:38 +01:00
committed by GitHub
parent d908e9bebe
commit 242e31ec2d
@@ -34,37 +34,48 @@
use crate::event::Event; use crate::event::Event;
use futures::{prelude::*, ready, stream::FusedStream}; use futures::{prelude::*, ready, stream::FusedStream};
use log::error; use log::{debug, error};
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use std::{ use std::{
backtrace::Backtrace, backtrace::Backtrace,
cell::RefCell, cell::RefCell,
fmt, fmt,
pin::Pin, pin::Pin,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
/// Log target for this file.
pub const LOG_TARGET: &str = "sub-libp2p::out_events";
/// Creates a new channel that can be associated to a [`OutChannels`]. /// Creates a new channel that can be associated to a [`OutChannels`].
/// ///
/// The name is used in Prometheus reports, the queue size threshold is used /// The name is used in Prometheus reports, the queue size threshold is used
/// to warn if there are too many unprocessed events in the channel. /// to warn if there are too many unprocessed events in the channel.
pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
let (tx, rx) = async_channel::unbounded(); let (tx, rx) = async_channel::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender { let tx = Sender {
inner: tx, inner: tx,
name, name,
queue_size_warning, queue_size_warning,
warning_fired: false, warning_fired: SenderWarningState::NotFired,
creation_backtrace: Backtrace::force_capture(), creation_backtrace: Backtrace::force_capture(),
metrics: metrics.clone(), metrics: None,
}; };
let rx = Receiver { inner: rx, name, metrics }; let rx = Receiver { inner: rx, name, metrics: None };
(tx, rx) (tx, rx)
} }
/// A state of a sender warning that is used to avoid spamming the logs.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SenderWarningState {
/// The warning has not been fired yet.
NotFired,
/// The warning has been fired, and the channel is full
FiredFull,
/// The warning has been fired and the channel is not full anymore.
FiredFree,
}
/// Sending side of a channel. /// Sending side of a channel.
/// ///
/// Must be associated with an [`OutChannels`] before anything can be sent on it /// Must be associated with an [`OutChannels`] before anything can be sent on it
@@ -78,13 +89,14 @@ pub struct Sender {
name: &'static str, name: &'static str,
/// Threshold queue size to generate an error message in the logs. /// Threshold queue size to generate an error message in the logs.
queue_size_warning: usize, queue_size_warning: usize,
/// We generate the error message only once to not spam the logs. /// We generate the error message only once to not spam the logs after the first error.
warning_fired: bool, /// Subsequently we indicate channel fullness on debug level.
warning_fired: SenderWarningState,
/// Backtrace of a place where the channel was created. /// Backtrace of a place where the channel was created.
creation_backtrace: Backtrace, creation_backtrace: Backtrace,
/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to /// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
/// [`OutChannels`] with `OutChannels::push()`. /// [`OutChannels`] with `OutChannels::push()`.
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>, metrics: Option<Metrics>,
} }
impl fmt::Debug for Sender { impl fmt::Debug for Sender {
@@ -95,8 +107,7 @@ impl fmt::Debug for Sender {
impl Drop for Sender { impl Drop for Sender {
fn drop(&mut self) { fn drop(&mut self) {
let metrics = self.metrics.lock(); if let Some(metrics) = self.metrics.as_ref() {
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.num_channels.with_label_values(&[self.name]).dec(); metrics.num_channels.with_label_values(&[self.name]).dec();
} }
} }
@@ -108,7 +119,7 @@ pub struct Receiver {
name: &'static str, name: &'static str,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`]. /// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>, metrics: Option<Metrics>,
} }
impl Stream for Receiver { impl Stream for Receiver {
@@ -116,13 +127,8 @@ impl Stream for Receiver {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone(); if let Some(metrics) = &self.metrics {
match metrics.as_ref().map(|m| m.as_ref()) { metrics.event_out(&ev, self.name);
Some(Some(metrics)) => metrics.event_out(&ev, self.name),
Some(None) => (), // no registry
None => log::warn!(
"Inconsistency in out_events: event happened before sender associated"
),
} }
Poll::Ready(Some(ev)) Poll::Ready(Some(ev))
} else { } else {
@@ -151,7 +157,7 @@ pub struct OutChannels {
event_streams: Vec<Sender>, event_streams: Vec<Sender>,
/// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this /// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this
/// object. /// object.
metrics: Arc<Option<Metrics>>, metrics: Option<Metrics>,
} }
impl OutChannels { impl OutChannels {
@@ -160,17 +166,15 @@ impl OutChannels {
let metrics = let metrics =
if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None }; if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };
Ok(Self { event_streams: Vec::new(), metrics: Arc::new(metrics) }) Ok(Self { event_streams: Vec::new(), metrics })
} }
/// Adds a new [`Sender`] to the collection. /// Adds a new [`Sender`] to the collection.
pub fn push(&mut self, sender: Sender) { pub fn push(&mut self, mut sender: Sender) {
let mut metrics = sender.metrics.lock(); debug_assert!(sender.metrics.is_none());
debug_assert!(metrics.is_none()); sender.metrics = self.metrics.clone();
*metrics = Some(self.metrics.clone());
drop(metrics);
if let Some(metrics) = &*self.metrics { if let Some(metrics) = &self.metrics {
metrics.num_channels.with_label_values(&[sender.name]).inc(); metrics.num_channels.with_label_values(&[sender.name]).inc();
} }
@@ -180,22 +184,42 @@ impl OutChannels {
/// Sends an event. /// Sends an event.
pub fn send(&mut self, event: Event) { pub fn send(&mut self, event: Event) {
self.event_streams.retain_mut(|sender| { self.event_streams.retain_mut(|sender| {
if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired { let current_pending = sender.inner.len();
sender.warning_fired = true; if current_pending >= sender.queue_size_warning {
error!( if sender.warning_fired == SenderWarningState::NotFired {
"The number of unprocessed events in channel `{}` exceeded {}.\n\ error!(
The channel was created at:\n{:}\n "The number of unprocessed events in channel `{}` exceeded {}.\n\
The last event was sent from:\n{:}", The channel was created at:\n{:}\n
sender.name, The last event was sent from:\n{:}",
sender.queue_size_warning, sender.name,
sender.creation_backtrace, sender.queue_size_warning,
Backtrace::force_capture(), sender.creation_backtrace,
Backtrace::force_capture(),
);
} else if sender.warning_fired == SenderWarningState::FiredFree {
// We don't want to spam the logs, so we only log on debug level
debug!(
target: LOG_TARGET,
"Channel `{}` is overflowed again. Number of events: {}",
sender.name, current_pending
);
}
sender.warning_fired = SenderWarningState::FiredFull;
} else if sender.warning_fired == SenderWarningState::FiredFull &&
current_pending < sender.queue_size_warning.wrapping_div(2)
{
sender.warning_fired = SenderWarningState::FiredFree;
debug!(
target: LOG_TARGET,
"Channel `{}` is no longer overflowed. Number of events: {}",
sender.name, current_pending
); );
} }
sender.inner.try_send(event.clone()).is_ok() sender.inner.try_send(event.clone()).is_ok()
}); });
if let Some(metrics) = &*self.metrics { if let Some(metrics) = &self.metrics {
for ev in &self.event_streams { for ev in &self.event_streams {
metrics.event_in(&event, ev.name); metrics.event_in(&event, ev.name);
} }
@@ -211,6 +235,7 @@ impl fmt::Debug for OutChannels {
} }
} }
#[derive(Clone)]
struct Metrics { struct Metrics {
// This list is ordered alphabetically // This list is ordered alphabetically
events_total: CounterVec<U64>, events_total: CounterVec<U64>,