Runtime diagnostics for leaked messages in unbounded channels (#12971)

This commit is contained in:
Dmitry Markin
2022-12-23 16:03:08 +03:00
committed by GitHub
parent 70e9f8e920
commit 34eb463d99
37 changed files with 257 additions and 134 deletions
@@ -32,25 +32,40 @@
//! collection.
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::{Backtrace, BacktraceStatus},
cell::RefCell,
fmt,
pin::Pin,
sync::Arc,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
task::{Context, Poll},
};
/// Creates a new channel that can be associated to a [`OutChannels`].
///
/// The name is used in Prometheus reports.
pub fn channel(name: &'static str) -> (Sender, Receiver) {
/// The name is used in Prometheus reports, the queue size threshold is used
/// to warn if there are too many unprocessed events in the channel.
pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender { inner: tx, name, metrics: metrics.clone() };
let rx = Receiver { inner: rx, name, metrics };
let queue_size = Arc::new(AtomicI64::new(0));
let tx = Sender {
inner: tx,
name,
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::capture(),
metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
(tx, rx)
}
@@ -63,7 +78,19 @@ pub fn channel(name: &'static str) -> (Sender, Receiver) {
/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
/// Name to identify the channel (e.g., in Prometheus and logs).
name: &'static str,
/// Number of events in the queue. Clone of [`Receiver::in_transit`].
// To not bother with ordering and possible underflow errors of the unsigned counter
// we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate.
// It can turn < 0 though.
queue_size: Arc<AtomicI64>,
/// Threshold queue size to generate an error message in the logs.
queue_size_warning: i64,
/// We generate the error message only once to not spam the logs.
warning_fired: bool,
/// Backtrace of a place where the channel was created.
creation_backtrace: Backtrace,
/// Clone of [`Receiver::metrics`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
@@ -87,6 +114,7 @@ impl Drop for Sender {
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
name: &'static str,
queue_size: Arc<AtomicI64>,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
@@ -97,6 +125,7 @@ impl Stream for Receiver {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed);
let metrics = self.metrics.lock().clone();
match metrics.as_ref().map(|m| m.as_ref()) {
Some(Some(metrics)) => metrics.event_out(&ev, self.name),
@@ -160,12 +189,28 @@ impl OutChannels {
/// Sends an event.
pub fn send(&mut self, event: Event) {
self.event_streams
.retain(|sender| sender.inner.unbounded_send(event.clone()).is_ok());
self.event_streams.retain_mut(|sender| {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
match sender.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
),
_ => error!(
"The number of unprocessed events in channel `{}` reached {}.",
sender.name, sender.queue_size_warning,
),
}
}
sender.inner.unbounded_send(event.clone()).is_ok()
});
if let Some(metrics) = &*self.metrics {
for ev in &self.event_streams {
metrics.event_in(&event, 1, ev.name);
metrics.event_in(&event, ev.name);
}
}
}
@@ -232,45 +277,35 @@ impl Metrics {
})
}
fn event_in(&self, event: &Event, num: u64, name: &str) {
fn event_in(&self, event: &Event, name: &str) {
match event {
Event::Dht(_) => {
self.events_total.with_label_values(&["dht", "sent", name]).inc_by(num);
self.events_total.with_label_values(&["dht", "sent", name]).inc();
},
Event::SyncConnected { .. } => {
self.events_total
.with_label_values(&["sync-connected", "sent", name])
.inc_by(num);
self.events_total.with_label_values(&["sync-connected", "sent", name]).inc();
},
Event::SyncDisconnected { .. } => {
self.events_total
.with_label_values(&["sync-disconnected", "sent", name])
.inc_by(num);
self.events_total.with_label_values(&["sync-disconnected", "sent", name]).inc();
},
Event::NotificationStreamOpened { protocol, .. } => {
format_label("notif-open-", protocol, |protocol_label| {
self.events_total
.with_label_values(&[protocol_label, "sent", name])
.inc_by(num);
self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
});
},
Event::NotificationStreamClosed { protocol, .. } => {
format_label("notif-closed-", protocol, |protocol_label| {
self.events_total
.with_label_values(&[protocol_label, "sent", name])
.inc_by(num);
self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
});
},
Event::NotificationsReceived { messages, .. } =>
for (protocol, message) in messages {
format_label("notif-", protocol, |protocol_label| {
self.events_total
.with_label_values(&[protocol_label, "sent", name])
.inc_by(num);
self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
});
self.notifications_sizes.with_label_values(&[protocol, "sent", name]).inc_by(
num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::MAX)),
);
self.notifications_sizes
.with_label_values(&[protocol, "sent", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
},
}
}