Runtime diagnostics for leaked messages in unbounded channels (part 2) (#13020)

* Fix code review issues

* Clarify doc

* Get rid of backtrace mutex

* kick CI
This commit is contained in:
Dmitry Markin
2022-12-27 13:05:12 +03:00
committed by GitHub
parent 9f5ed21fe9
commit 67a50ffa54
5 changed files with 71 additions and 50 deletions
+47 -21
View File
@@ -18,7 +18,16 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b"
dependencies = [
"gimli",
"gimli 0.26.1",
]
[[package]]
name = "addr2line"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97"
dependencies = [
"gimli 0.27.0",
]
[[package]]
@@ -316,16 +325,16 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.64"
version = "0.3.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f"
checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca"
dependencies = [
"addr2line",
"addr2line 0.19.0",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object 0.27.1",
"miniz_oxide 0.6.2",
"object 0.30.0",
"rustc-demangle",
]
@@ -1017,7 +1026,7 @@ dependencies = [
"cranelift-codegen-shared",
"cranelift-entity",
"cranelift-isle",
"gimli",
"gimli 0.26.1",
"log",
"regalloc2",
"smallvec",
@@ -1820,7 +1829,7 @@ dependencies = [
"crc32fast",
"libc",
"libz-sys",
"miniz_oxide",
"miniz_oxide 0.4.4",
]
[[package]]
@@ -2445,6 +2454,12 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "gimli"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec7af912d60cdbd3677c1af9352ebae6fb8394d165568a2234df0fa00f87793"
[[package]]
name = "git2"
version = "0.14.2"
@@ -3877,6 +3892,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.4"
@@ -4609,15 +4633,6 @@ dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9"
dependencies = [
"memchr",
]
[[package]]
name = "object"
version = "0.29.0"
@@ -4630,6 +4645,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "object"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "239da7f290cfa979f43f85a8efeee9a8a76d0827c356d37f9d3d7254d6b537fb"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.16.0"
@@ -7756,6 +7780,7 @@ dependencies = [
"assert_matches",
"async-trait",
"asynchronous-codec",
"backtrace",
"bytes",
"either",
"fnv",
@@ -8403,6 +8428,7 @@ dependencies = [
name = "sc-utils"
version = "4.0.0-dev"
dependencies = [
"backtrace",
"futures",
"futures-timer",
"lazy_static",
@@ -11142,7 +11168,7 @@ dependencies = [
"cranelift-frontend",
"cranelift-native",
"cranelift-wasm",
"gimli",
"gimli 0.26.1",
"log",
"object 0.29.0",
"target-lexicon",
@@ -11159,7 +11185,7 @@ checksum = "5c587c62e91c5499df62012b87b88890d0eb470b2ffecc5964e9da967b70c77c"
dependencies = [
"anyhow",
"cranelift-entity",
"gimli",
"gimli 0.26.1",
"indexmap",
"log",
"object 0.29.0",
@@ -11176,12 +11202,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "047839b5dabeae5424a078c19b8cc897e5943a7fadc69e3d888b9c9a897666b3"
dependencies = [
"addr2line",
"addr2line 0.17.0",
"anyhow",
"bincode",
"cfg-if",
"cpp_demangle",
"gimli",
"gimli 0.26.1",
"log",
"object 0.29.0",
"rustc-demangle",
+1
View File
@@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
array-bytes = "4.1"
async-trait = "0.1"
asynchronous-codec = "0.6"
backtrace = "0.3.67"
bytes = "1"
codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] }
either = "1.5.3"
@@ -31,13 +31,13 @@
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.
use backtrace::Backtrace;
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::{Backtrace, BacktraceStatus},
cell::RefCell,
fmt,
pin::Pin,
@@ -62,7 +62,7 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::capture(),
creation_backtrace: Backtrace::new_unresolved(),
metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
@@ -91,7 +91,8 @@ pub struct Sender {
warning_fired: bool,
/// Backtrace of a place where the channel was created.
creation_backtrace: Backtrace,
/// Clone of [`Receiver::metrics`].
/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
/// [`OutChannels`] with `OutChannels::push()`.
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
@@ -193,17 +194,12 @@ impl OutChannels {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
match sender.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
),
_ => error!(
"The number of unprocessed events in channel `{}` reached {}.",
sender.name, sender.queue_size_warning,
),
}
sender.creation_backtrace.resolve();
error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
);
}
sender.inner.unbounded_send(event.clone()).is_ok()
});
+1
View File
@@ -10,6 +10,7 @@ description = "I/O for Substrate runtimes"
readme = "README.md"
[dependencies]
backtrace = "0.3.67"
futures = "0.3.21"
futures-timer = "3.0.2"
lazy_static = "1.4.0"
+12 -15
View File
@@ -37,6 +37,7 @@ mod inner {
mod inner {
// tracing implementation
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use backtrace::Backtrace;
use futures::{
channel::mpsc::{
self, SendError, TryRecvError, TrySendError, UnboundedReceiver, UnboundedSender,
@@ -47,7 +48,6 @@ mod inner {
};
use log::error;
use std::{
backtrace::{Backtrace, BacktraceStatus},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
@@ -108,7 +108,7 @@ mod inner {
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: Arc::new(AtomicBool::new(false)),
creation_backtrace: Arc::new(Backtrace::capture()),
creation_backtrace: Arc::new(Backtrace::new_unresolved()),
};
let receiver = TracingUnboundedReceiver { inner: r, name, queue_size };
(sender, receiver)
@@ -149,23 +149,20 @@ mod inner {
let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == self.queue_size_warning &&
!self.warning_fired.load(Ordering::Relaxed)
self.warning_fired
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
// `warning_fired` and `queue_size` are not synchronized, so it's possible
// that the warning is fired few times before the `warning_fired` is seen
// by all threads. This seems better than introducing a mutex guarding them.
self.warning_fired.store(true, Ordering::Relaxed);
match self.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{}",
self.name, self.queue_size_warning, self.creation_backtrace,
),
_ => error!(
"The number of unprocessed messages in channel `{}` reached {}.",
self.name, self.queue_size_warning,
),
}
let mut backtrace = (*self.creation_backtrace).clone();
backtrace.resolve();
error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
self.name, self.queue_size_warning, backtrace,
);
}
s