From f8db199b97bd2ab3caf46ea209a313043fa1ea75 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 1 Aug 2019 09:43:37 +0200 Subject: [PATCH] Replace the telemetry Mutex with a channel (#3266) * Replace the telemetry Mutex with a channel * Don't return Err() if channel is full * Change polling pattern * Add more issue numbers --- substrate/Cargo.lock | 28 ++++++++ substrate/core/telemetry/Cargo.toml | 3 + substrate/core/telemetry/src/lib.rs | 106 ++++++++++++++++++---------- 3 files changed, 100 insertions(+), 37 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 191b308754..adb20893d8 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -517,6 +517,14 @@ dependencies = [ "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-channel" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-deque" version = "0.6.3" @@ -3442,6 +3450,17 @@ dependencies = [ "erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "slog-async" +version = "2.3.0" +source = "git+https://github.com/paritytech/slog-async#107848e7ded5e80dc43f6296c2b96039eb92c0a5" +dependencies = [ + "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "slog-json" version = "2.3.0" @@ -4838,6 +4857,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-async 2.3.0 (git+https://github.com/paritytech/slog-async)", "slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-scope 4.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5019,6 +5039,11 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "target_info" version = "0.1.0" @@ -5980,6 +6005,7 @@ dependencies = [ "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" "checksum criterion 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0363053954f3e679645fc443321ca128b7b950a6fe288cf5f9335cc22ee58394" "checksum criterion-plot 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "76f9212ddf2f4a9eb2d401635190600656a1f88a932ef53d06e7fa4c7e02fb8e" +"checksum crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" "checksum crossbeam-deque 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "05e44b8cf3e1a625844d1750e1f7820da46044ff6d28f4d43e455ba3e5bb2c13" "checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71" "checksum crossbeam-epoch 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fedcd6772e37f3da2a9af9bf12ebe046c0dfe657992377b4df982a2b54cd37a9" @@ -6262,6 +6288,7 @@ dependencies = [ "checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1cc9c640a4adbfbcc11ffb95efe5aa7af7309e002adab54b185507dbf2377b99" +"checksum slog-async 2.3.0 (git+https://github.com/paritytech/slog-async)" = "" "checksum slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc0d2aff1f8f325ef660d9a0eb6e6dcd20b30b3f581a5897f58bf42d061c37a" "checksum slog-scope 4.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d1d3ec6214d46e57a7ec87c1972bbca66c59172a0cfffa5233c54726afb946bf" "checksum slog_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9eff3b513cf2e0d1a60e1aba152dc72bedc5b05585722bb3cebd7bcb1e31b98f" @@ -6287,6 +6314,7 @@ dependencies = [ "checksum syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)" = "eadc09306ca51a40555dd6fc2b415538e9e18bc9f870e47b1a524a79fe2dcf5e" "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" "checksum sysinfo 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c3e2cab189e59f72710e3dd5e1e0d5be0f6c5c999c326f2fdcdf3bf4483ec9fd" +"checksum take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" "checksum target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" diff --git a/substrate/core/telemetry/Cargo.toml b/substrate/core/telemetry/Cargo.toml index 27c94f0656..540305435f 100644 --- a/substrate/core/telemetry/Cargo.toml +++ b/substrate/core/telemetry/Cargo.toml @@ -16,6 +16,9 @@ log = "0.4" rand = "0.6" serde = { version = "1.0.81", features = ["derive"] } slog = { version = "^2", features = ["nested-values"] } +# TODO: we're using slog-async just to be able to clone records; See https://github.com/slog-rs/slog/issues/221, +# https://github.com/paritytech/substrate/issues/2823 and https://github.com/paritytech/substrate/issues/3260 +slog-async = { git = "https://github.com/paritytech/slog-async", features = ["nested-values"] } slog-json = { version = "^2", features = ["nested-values"] } slog-scope = "^4" tokio-io = "0.1" diff --git a/substrate/core/telemetry/src/lib.rs b/substrate/core/telemetry/src/lib.rs index 88d515e538..71a86defb6 100644 --- a/substrate/core/telemetry/src/lib.rs +++ b/substrate/core/telemetry/src/lib.rs @@ -58,12 +58,12 @@ //! ``` //! -use futures::{prelude::*, task::AtomicWaker}; +use futures::{prelude::*, channel::mpsc}; use libp2p::{Multiaddr, wasm_ext}; use log::warn; use parking_lot::Mutex; use serde::{Serialize, Deserialize}; -use std::{pin::Pin, sync::{Arc, Weak}, task::{Context, Poll}, time::{Duration, Instant}}; +use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::{Duration, Instant}}; pub use slog_scope::with_logger; pub use slog; @@ -112,31 +112,32 @@ pub const CONSENSUS_INFO: &str = "1"; /// Telemetry object. Implements `Future` and must be polled regularly. /// Contains an `Arc` and can be cloned and pass around. Only one clone needs to be polled -/// regularly. +/// regularly and should be polled regularly. /// Dropping all the clones unregisters the telemetry. #[derive(Clone)] pub struct Telemetry { - inner: Arc, + inner: Arc>, /// Slog guard so that we don't get deregistered. _guard: Arc, } -// Implementation notes: considering that logging can happen at any moment, we only have two -// options: locking a mutex (which we currently do), or using a channel (which we should do). -// At the moment, `slog` doesn't provide any easy way to serialize records in order to send them -// over a channel, but ideally that's what should be done. - -/// Shared between `Telemetry` and `TelemetryDrain`. +/// Behind the `Mutex` in `Telemetry`. +/// +/// Note that ideally we wouldn't have to make the `Telemetry` clonable, as that would remove the +/// need for a `Mutex`. However there is currently a weird hack in place in `substrate-service` +/// where we extract the telemetry registration so that it continues running during the shutdown +/// process. struct TelemetryInner { /// Worker for the telemetry. - worker: Mutex, - /// Waker to wake up when we add a log entry to the worker. - polling_waker: AtomicWaker, + worker: worker::TelemetryWorker, + /// Receives log entries for them to be dispatched to the worker. + receiver: mpsc::Receiver, } /// Implements `slog::Drain`. struct TelemetryDrain { - inner: std::panic::AssertUnwindSafe>, + /// Sends log entries. + sender: std::panic::AssertUnwindSafe>, } /// Initializes the telemetry. See the crate root documentation for more information. @@ -153,19 +154,18 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry { } } - let inner = Arc::new(TelemetryInner { - worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)), - polling_waker: AtomicWaker::new(), - }); - + let (sender, receiver) = mpsc::channel(16); let guard = { - let logger = TelemetryDrain { inner: std::panic::AssertUnwindSafe(Arc::downgrade(&inner)) }; + let logger = TelemetryDrain { sender: std::panic::AssertUnwindSafe(sender) }; let root = slog::Logger::root(slog::Drain::fuse(logger), slog::o!()); slog_scope::set_global_logger(root) }; Telemetry { - inner, + inner: Arc::new(Mutex::new(TelemetryInner { + worker: worker::TelemetryWorker::new(endpoints, config.wasm_external_transport), + receiver, + })), _guard: Arc::new(guard), } } @@ -184,12 +184,42 @@ impl Stream for Telemetry { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let before = Instant::now(); + // Because the `Telemetry` is clonable, we need to put the actual fields behind a `Mutex`. + // However, the user is only ever supposed to poll from one instance of `Telemetry`, while + // the other instances are used only for RAII purposes. + // We assume that the user is following this advice and therefore that the `Mutex` is only + // ever locked once at a time. + let mut inner = match self.inner.try_lock() { + Some(l) => l, + None => { + warn!( + target: "telemetry", + "The telemetry seems to be polled multiple times simultaneously" + ); + // Returning `Pending` here means that we may never get polled again, but this is + // ok because we're in a situation where something else is actually currently doing + // the polling. + return Poll::Pending; + } + }; + let mut has_connected = false; - while let Poll::Ready(event) = self.inner.worker.lock().poll(cx) { - // Right now we only have one possible event. This line is here in order to not - // forget to handle any possible new event type. - let worker::TelemetryWorkerEvent::Connected = event; - has_connected = true; + + // The polling pattern is: poll the worker so that it processes its queue, then add one + // message from the receiver (if possible), then poll the worker again, and so on. + loop { + while let Poll::Ready(event) = inner.worker.poll(cx) { + // Right now we only have one possible event. This line is here in order to not + // forget to handle any possible new event type. + let worker::TelemetryWorkerEvent::Connected = event; + has_connected = true; + } + + if let Poll::Ready(Some(log_entry)) = Stream::poll_next(Pin::new(&mut inner.receiver), cx) { + log_entry.as_record_values(|rec, val| { let _ = inner.worker.log(rec, val); }); + } else { + break; + } } if before.elapsed() > Duration::from_millis(200) { @@ -199,7 +229,6 @@ impl Stream for Telemetry { if has_connected { Poll::Ready(Some(TelemetryEvent::Connected)) } else { - self.inner.polling_waker.register(cx.waker()); Poll::Pending } } @@ -210,17 +239,20 @@ impl slog::Drain for TelemetryDrain { type Err = (); fn log(&self, record: &slog::Record, values: &slog::OwnedKVList) -> Result { - if let Some(inner) = self.inner.0.upgrade() { - let before = Instant::now(); - let result = inner.worker.lock().log(record, values); - inner.polling_waker.wake(); - if before.elapsed() > Duration::from_millis(50) { - warn!(target: "telemetry", "Writing a telemetry log took more than 50ms"); - } - result - } else { - Ok(()) + let before = Instant::now(); + + let serialized = slog_async::AsyncRecord::from(record, values); + // Note: interestingly, `try_send` requires a `&mut` because it modifies some internal value, while `clone()` + // is lock-free. + if let Err(err) = self.sender.clone().try_send(serialized) { + warn!(target: "telemetry", "Ignored telemetry message because of error on channel: {:?}", err); } + + if before.elapsed() > Duration::from_millis(50) { + warn!(target: "telemetry", "Writing a telemetry log took more than 50ms"); + } + + Ok(()) } }