Make unbounded channels size warning exact (part 1) (#13490)

* Replace `futures-channel` with `async-channel` in `out_events`

* Apply suggestions from code review

Co-authored-by: Koute <koute@users.noreply.github.com>

* Also print the backtrace of `send()` call

* Switch from `backtrace` crate to `std::backtrace`

* Remove outdated `backtrace` dependency

* Remove `backtrace` from `Cargo.lock`

---------

Co-authored-by: Koute <koute@users.noreply.github.com>
This commit is contained in:
Dmitry Markin
2023-03-01 12:54:31 +03:00
committed by GitHub
parent 55263fa2a1
commit ad410738e7
3 changed files with 32 additions and 31 deletions
+12 -1
View File
@@ -326,6 +326,17 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-io"
version = "1.12.0"
@@ -8626,9 +8637,9 @@ version = "0.10.0-dev"
dependencies = [
"array-bytes",
"assert_matches",
"async-channel",
"async-trait",
"asynchronous-codec",
"backtrace",
"bytes",
"either",
"fnv",
+1 -1
View File
@@ -15,9 +15,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
array-bytes = "4.1"
async-channel = "1.8.0"
async-trait = "0.1"
asynchronous-codec = "0.6"
backtrace = "0.3.67"
bytes = "1"
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
either = "1.5.3"
@@ -31,20 +31,17 @@
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.
use backtrace::Backtrace;
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use futures::{prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::Backtrace,
cell::RefCell,
fmt,
pin::Pin,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
sync::Arc,
task::{Context, Poll},
};
@@ -52,20 +49,18 @@ use std::{
///
/// The name is used in Prometheus reports, the queue size threshold is used
/// to warn if there are too many unprocessed events in the channel.
pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
let (tx, rx) = async_channel::unbounded();
let metrics = Arc::new(Mutex::new(None));
let queue_size = Arc::new(AtomicI64::new(0));
let tx = Sender {
inner: tx,
name,
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::new_unresolved(),
creation_backtrace: Backtrace::force_capture(),
metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
let rx = Receiver { inner: rx, name, metrics };
(tx, rx)
}
@@ -77,16 +72,11 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver
/// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in
/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
inner: async_channel::Sender<Event>,
/// Name to identify the channel (e.g., in Prometheus and logs).
name: &'static str,
/// Number of events in the queue. Clone of [`Receiver::in_transit`].
// To not bother with ordering and possible underflow errors of the unsigned counter
// we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate.
// It can turn < 0 though.
queue_size: Arc<AtomicI64>,
/// Threshold queue size to generate an error message in the logs.
queue_size_warning: i64,
queue_size_warning: usize,
/// We generate the error message only once to not spam the logs.
warning_fired: bool,
/// Backtrace of a place where the channel was created.
@@ -113,9 +103,8 @@ impl Drop for Sender {
/// Receiving side of a channel.
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
inner: async_channel::Receiver<Event>,
name: &'static str,
queue_size: Arc<AtomicI64>,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
@@ -126,7 +115,6 @@ impl Stream for Receiver {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed);
let metrics = self.metrics.lock().clone();
match metrics.as_ref().map(|m| m.as_ref()) {
Some(Some(metrics)) => metrics.event_out(&ev, self.name),
@@ -191,17 +179,19 @@ impl OutChannels {
/// Sends an event.
pub fn send(&mut self, event: Event) {
self.event_streams.retain_mut(|sender| {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
sender.creation_backtrace.resolve();
error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
"The number of unprocessed events in channel `{}` exceeded {}.\n\
The channel was created at:\n{:}\n
The last event was sent from:\n{:}",
sender.name,
sender.queue_size_warning,
sender.creation_backtrace,
Backtrace::force_capture(),
);
}
sender.inner.unbounded_send(event.clone()).is_ok()
sender.inner.try_send(event.clone()).is_ok()
});
if let Some(metrics) = &*self.metrics {