From 3d5f093ddbc57406dbeab818638b2f4b591a9695 Mon Sep 17 00:00:00 2001 From: Koute Date: Fri, 22 Oct 2021 01:54:15 +0900 Subject: [PATCH] Speed up logging once again (#9981) * Update `tracing`-related dependencies * Enable `parking_lot` feature in `tracing-subscriber` * Add an asynchronous stderr logger * Make clippy happy * Add an integration test for the logger * Refactor `test_logger_filters`'s subprocess machinery into a separate function * Use a child process instead of hooking into stderr for the test * Add a doc comment for `MakeStderrWriter` * Move the initialization into the `MakeStderrWriter`'s constructor * Add an extra test case to trigger the logger's emergency flush mechanism * Use the buffer's mutex for asynchronous flushes * Remove vestigial `nix` dependency from one of the previous commits --- substrate/Cargo.lock | 10 +- substrate/client/tracing/Cargo.toml | 3 +- .../client/tracing/src/logging/directives.rs | 2 +- substrate/client/tracing/src/logging/mod.rs | 147 ++++++++++- .../tracing/src/logging/stderr_writer.rs | 228 ++++++++++++++++++ substrate/primitives/tracing/Cargo.toml | 4 +- 6 files changed, 373 insertions(+), 21 deletions(-) create mode 100644 substrate/client/tracing/src/logging/stderr_writer.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index f15e363bfd..6325304bfc 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4085,9 +4085,9 @@ dependencies = [ [[package]] name = "memoffset" -version = "0.6.1" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" dependencies = [ "autocfg 1.0.1", ] @@ -8466,6 +8466,7 @@ dependencies = [ "chrono", "criterion", "lazy_static", + "libc", "log 0.4.14", "once_cell", "parking_lot 0.11.1", @@ -10635,14 +10636,15 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.19" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab69019741fca4d98be3c62d2b75254528b5432233fd8a4d2739fec20278de48" +checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" dependencies = [ "ansi_term 0.12.1", "chrono", "lazy_static", "matchers", + "parking_lot 0.11.1", "regex", "serde", "serde_json", diff --git a/substrate/client/tracing/Cargo.toml b/substrate/client/tracing/Cargo.toml index b4049fa097..4939e6a731 100644 --- a/substrate/client/tracing/Cargo.toml +++ b/substrate/client/tracing/Cargo.toml @@ -17,6 +17,7 @@ ansi_term = "0.12.1" atty = "0.2.13" chrono = "0.4.19" lazy_static = "1.4.0" +libc = "0.2.95" log = { version = "0.4.8" } once_cell = "1.8.0" parking_lot = "0.11.1" @@ -26,7 +27,7 @@ serde = "1.0.126" thiserror = "1.0.21" tracing = "0.1.29" tracing-log = "0.1.2" -tracing-subscriber = "0.2.19" +tracing-subscriber = { version = "0.2.25", features = ["parking_lot"] } sp-tracing = { version = "4.0.0-dev", path = "../../primitives/tracing" } sp-rpc = { version = "4.0.0-dev", path = "../../primitives/rpc" } sp-runtime = { version = "4.0.0-dev", path = "../../primitives/runtime" } diff --git a/substrate/client/tracing/src/logging/directives.rs b/substrate/client/tracing/src/logging/directives.rs index 16f68654de..fe7d6a780d 100644 --- a/substrate/client/tracing/src/logging/directives.rs +++ b/substrate/client/tracing/src/logging/directives.rs @@ -109,5 +109,5 @@ pub(crate) fn set_reload_handle(handle: Handle) { type SCSubscriber< N = tracing_fmt::format::DefaultFields, E = crate::logging::EventFormat, - W = fn() -> std::io::Stderr, + W = crate::logging::DefaultLogger, > = layer::Layered, Registry>; diff --git a/substrate/client/tracing/src/logging/mod.rs b/substrate/client/tracing/src/logging/mod.rs index c6a4f07017..7f995615a2 100644 --- a/substrate/client/tracing/src/logging/mod.rs +++ b/substrate/client/tracing/src/logging/mod.rs @@ -26,6 +26,9 @@ mod directives; mod event_format; mod fast_local_time; mod layers; +mod stderr_writer; + +pub(crate) type DefaultLogger = stderr_writer::MakeStderrWriter; pub use directives::*; pub use sc_tracing_proc_macro::*; @@ -47,6 +50,8 @@ pub use event_format::*; pub use fast_local_time::FastLocalTime; pub use layers::*; +use stderr_writer::MakeStderrWriter; + /// Logging Result typedef. pub type Result = std::result::Result; @@ -91,7 +96,7 @@ fn prepare_subscriber( profiling_targets: Option<&str>, force_colors: Option, builder_hook: impl Fn( - SubscriberBuilder std::io::Stderr>, + SubscriberBuilder, ) -> SubscriberBuilder, ) -> Result LookupSpan<'a>> where @@ -172,7 +177,7 @@ where let builder = builder.with_span_events(format::FmtSpan::NONE); - let builder = builder.with_writer(std::io::stderr as _); + let builder = builder.with_writer(MakeStderrWriter::default()); let builder = builder.event_format(event_format); @@ -282,7 +287,16 @@ impl LoggerBuilder { mod tests { use super::*; use crate as sc_tracing; - use std::{env, process::Command}; + use log::info; + use std::{ + collections::BTreeMap, + env, + process::Command, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + }; use tracing::{metadata::Kind, subscriber::Interest, Callsite, Level, Metadata}; const EXPECTED_LOG_MESSAGE: &'static str = "yeah logging works as expected"; @@ -292,9 +306,28 @@ mod tests { let _ = LoggerBuilder::new(directives).init().unwrap(); } + fn run_test_in_another_process( + test_name: &str, + test_body: impl FnOnce(), + ) -> Option { + if env::var("RUN_FORKED_TEST").is_ok() { + test_body(); + None + } else { + let output = Command::new(env::current_exe().unwrap()) + .arg(test_name) + .env("RUN_FORKED_TEST", "1") + .output() + .unwrap(); + + assert!(output.status.success()); + Some(output) + } + } + #[test] fn test_logger_filters() { - if env::var("RUN_TEST_LOGGER_FILTERS").is_ok() { + run_test_in_another_process("test_logger_filters", || { let test_directives = "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error"; init_logger(&test_directives); @@ -331,15 +364,7 @@ mod tests { assert!(test_filter("telemetry", Level::TRACE)); assert!(test_filter("something-with-dash", Level::ERROR)); }); - } else { - let status = Command::new(env::current_exe().unwrap()) - .arg("test_logger_filters") - .env("RUN_TEST_LOGGER_FILTERS", "1") - .output() - .unwrap() - .status; - assert!(status.success()); - } + }); } /// This test ensures that using dash (`-`) in the target name in logs and directives actually @@ -474,4 +499,100 @@ mod tests { assert_eq!("MAX_LOG_LEVEL=Trace", run_test(None, Some("test=info".into()))); } } + + // This creates a bunch of threads and makes sure they start executing + // a given callback almost exactly at the same time. + fn run_on_many_threads(thread_count: usize, callback: impl Fn(usize) + 'static + Send + Clone) { + let started_count = Arc::new(AtomicUsize::new(0)); + let barrier = Arc::new(AtomicBool::new(false)); + let threads: Vec<_> = (0..thread_count) + .map(|nth_thread| { + let started_count = started_count.clone(); + let barrier = barrier.clone(); + let callback = callback.clone(); + + std::thread::spawn(move || { + started_count.fetch_add(1, Ordering::SeqCst); + while !barrier.load(Ordering::SeqCst) { + std::thread::yield_now(); + } + + callback(nth_thread); + }) + }) + .collect(); + + while started_count.load(Ordering::SeqCst) != thread_count { + std::thread::yield_now(); + } + barrier.store(true, Ordering::SeqCst); + + for thread in threads { + if let Err(error) = thread.join() { + println!("error: failed to join thread: {:?}", error); + unsafe { libc::abort() } + } + } + } + + #[test] + fn parallel_logs_from_multiple_threads_are_properly_gathered() { + const THREAD_COUNT: usize = 128; + const LOGS_PER_THREAD: usize = 1024; + + let output = run_test_in_another_process( + "parallel_logs_from_multiple_threads_are_properly_gathered", + || { + let builder = LoggerBuilder::new(""); + builder.init().unwrap(); + + run_on_many_threads(THREAD_COUNT, |nth_thread| { + for _ in 0..LOGS_PER_THREAD { + info!("Thread <<{}>>", nth_thread); + } + }); + }, + ); + + if let Some(output) = output { + let stderr = String::from_utf8(output.stderr).unwrap(); + let mut count_per_thread = BTreeMap::new(); + for line in stderr.split("\n") { + if let Some(index_s) = line.find("Thread <<") { + let index_s = index_s + "Thread <<".len(); + let index_e = line.find(">>").unwrap(); + let nth_thread: usize = line[index_s..index_e].parse().unwrap(); + *count_per_thread.entry(nth_thread).or_insert(0) += 1; + } + } + + assert_eq!(count_per_thread.len(), THREAD_COUNT); + for (_, count) in count_per_thread { + assert_eq!(count, LOGS_PER_THREAD); + } + } + } + + #[test] + fn huge_single_line_log_is_properly_printed_out() { + let mut line = String::new(); + line.push_str("$$START$$"); + for n in 0..16 * 1024 * 1024 { + let ch = b'a' + (n as u8 % (b'z' - b'a')); + line.push(char::from(ch)); + } + line.push_str("$$END$$"); + + let output = + run_test_in_another_process("huge_single_line_log_is_properly_printed_out", || { + let builder = LoggerBuilder::new(""); + builder.init().unwrap(); + info!("{}", line); + }); + + if let Some(output) = output { + let stderr = String::from_utf8(output.stderr).unwrap(); + assert!(stderr.contains(&line)); + } + } } diff --git a/substrate/client/tracing/src/logging/stderr_writer.rs b/substrate/client/tracing/src/logging/stderr_writer.rs new file mode 100644 index 0000000000..9aab2491fb --- /dev/null +++ b/substrate/client/tracing/src/logging/stderr_writer.rs @@ -0,0 +1,228 @@ +// This file is part of Substrate. + +// Copyright (C) 2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! This module contains a buffered semi-asynchronous stderr writer. +//! +//! Depending on how we were started writing to stderr can take a surprisingly long time. +//! +//! If the other side takes their sweet sweet time reading whatever we send them then writing +//! to stderr might block for a long time, since it is effectively a synchronous operation. +//! And every time we write to stderr we need to grab a global lock, which affects every thread +//! which also tries to log something at the same time. +//! +//! Of course we *will* be ultimately limited by how fast the recipient can ingest our logs, +//! but it's not like logging is the only thing we're doing. And we still can't entirely +//! avoid the problem of multiple threads contending for the same lock. (Well, technically +//! we could employ something like a lock-free circular buffer, but that might be like +//! killing a fly with a sledgehammer considering the complexity involved; this is only +//! a logger after all.) +//! +//! But we can try to make things a little better. We can offload actually writing to stderr +//! to another thread and flush the logs in bulk instead of doing it per-line, which should +//! reduce the amount of CPU time we waste on making syscalls and on spinning waiting for locks. +//! +//! How much this helps depends on a multitude of factors, including the hardware we're running on, +//! how much we're logging, from how many threads, which exact set of threads are logging, to what +//! stderr is actually connected to (is it a terminal emulator? a file? an UDP socket?), etc. +//! +//! In general this can reduce the real time execution time as much as 75% in certain cases, or it +//! can make absolutely no difference in others. + +use parking_lot::{Condvar, Mutex, Once}; +use std::{ + io::Write, + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; +use tracing::{Level, Metadata}; + +/// How many bytes of buffered logs will trigger an async flush on another thread? +const ASYNC_FLUSH_THRESHOLD: usize = 16 * 1024; + +/// How many bytes of buffered logs will trigger a sync flush on the current thread? +const SYNC_FLUSH_THRESHOLD: usize = 768 * 1024; + +/// How many bytes can be buffered at maximum? +const EMERGENCY_FLUSH_THRESHOLD: usize = 2 * 1024 * 1024; + +/// If there isn't enough printed out this is how often the logs will be automatically flushed. +const AUTOFLUSH_EVERY: Duration = Duration::from_millis(50); + +/// The least serious level at which a synchronous flush will be triggered. +const SYNC_FLUSH_LEVEL_THRESHOLD: Level = Level::ERROR; + +/// The amount of time we'll block until the buffer is fully flushed on exit. +/// +/// This should be completely unnecessary in normal circumstances. +const ON_EXIT_FLUSH_TIMEOUT: Duration = Duration::from_secs(5); + +/// A global buffer to which we'll append all of our logs before flushing them out to stderr. +static BUFFER: Mutex> = parking_lot::const_mutex(Vec::new()); + +/// A spare buffer which we'll swap with the main buffer on each flush to minimize lock contention. +static SPARE_BUFFER: Mutex> = parking_lot::const_mutex(Vec::new()); + +/// A conditional variable used to forcefully trigger asynchronous flushes. +static ASYNC_FLUSH_CONDVAR: Condvar = Condvar::new(); + +static ENABLE_ASYNC_LOGGING: AtomicBool = AtomicBool::new(true); + +fn flush_logs(mut buffer: parking_lot::lock_api::MutexGuard>) { + let mut spare_buffer = SPARE_BUFFER.lock(); + std::mem::swap(&mut *spare_buffer, &mut *buffer); + std::mem::drop(buffer); + + let stderr = std::io::stderr(); + let mut stderr_lock = stderr.lock(); + let _ = stderr_lock.write_all(&*spare_buffer); + std::mem::drop(stderr_lock); + + spare_buffer.clear(); +} + +fn log_autoflush_thread() { + let mut buffer = BUFFER.lock(); + loop { + ASYNC_FLUSH_CONDVAR.wait_for(&mut buffer, AUTOFLUSH_EVERY); + loop { + flush_logs(buffer); + + buffer = BUFFER.lock(); + if buffer.len() >= ASYNC_FLUSH_THRESHOLD { + // While we were busy flushing we picked up enough logs to do another flush. + continue + } else { + break + } + } + } +} + +#[cold] +fn initialize() { + std::thread::Builder::new() + .name("log-autoflush".to_owned()) + .spawn(log_autoflush_thread) + .expect("thread spawning doesn't normally fail; qed"); + + // SAFETY: This is safe since we pass a valid pointer to `atexit`. + let errcode = unsafe { libc::atexit(on_exit) }; + assert_eq!(errcode, 0, "atexit failed while setting up the logger: {}", errcode); +} + +extern "C" fn on_exit() { + ENABLE_ASYNC_LOGGING.store(false, Ordering::SeqCst); + + if let Some(buffer) = BUFFER.try_lock_for(ON_EXIT_FLUSH_TIMEOUT) { + flush_logs(buffer); + } +} + +/// A drop-in replacement for [`std::io::stderr`] for use anywhere +/// a [`tracing_subscriber::fmt::MakeWriter`] is accepted. +pub struct MakeStderrWriter { + // A dummy field so that the structure is not publicly constructible. + _dummy: (), +} + +impl Default for MakeStderrWriter { + fn default() -> Self { + static ONCE: Once = Once::new(); + ONCE.call_once(initialize); + MakeStderrWriter { _dummy: () } + } +} + +impl tracing_subscriber::fmt::MakeWriter for MakeStderrWriter { + type Writer = StderrWriter; + + fn make_writer(&self) -> Self::Writer { + StderrWriter::new(false) + } + + // The `tracing-subscriber` crate calls this for every line logged. + fn make_writer_for(&self, meta: &Metadata<'_>) -> Self::Writer { + StderrWriter::new(*meta.level() <= SYNC_FLUSH_LEVEL_THRESHOLD) + } +} + +pub struct StderrWriter { + buffer: Option>>, + sync_flush_on_drop: bool, + original_len: usize, +} + +impl StderrWriter { + fn new(mut sync_flush_on_drop: bool) -> Self { + if !ENABLE_ASYNC_LOGGING.load(Ordering::Relaxed) { + sync_flush_on_drop = true; + } + + // This lock isn't as expensive as it might look, since this is only called once the full + // line to be logged is already serialized into a thread-local buffer inside of the + // `tracing-subscriber` crate, and basically the only thing we'll do when holding this lock + // is to copy that over to our global shared buffer in one go in `Write::write_all` and be + // immediately dropped. + let buffer = BUFFER.lock(); + StderrWriter { original_len: buffer.len(), buffer: Some(buffer), sync_flush_on_drop } + } +} + +#[cold] +fn emergency_flush(buffer: &mut Vec, input: &[u8]) { + let stderr = std::io::stderr(); + let mut stderr_lock = stderr.lock(); + let _ = stderr_lock.write_all(buffer); + buffer.clear(); + + let _ = stderr_lock.write_all(input); +} + +impl Write for StderrWriter { + fn write(&mut self, input: &[u8]) -> Result { + let buffer = self.buffer.as_mut().expect("buffer is only None after `drop`; qed"); + if buffer.len() + input.len() >= EMERGENCY_FLUSH_THRESHOLD { + // Make sure we don't blow our memory budget. Normally this should never happen, + // but there are cases where we directly print out untrusted user input which + // can potentially be megabytes in size. + emergency_flush(buffer, input); + } else { + buffer.extend_from_slice(input); + } + Ok(input.len()) + } + + fn write_all(&mut self, input: &[u8]) -> Result<(), std::io::Error> { + self.write(input).map(|_| ()) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) + } +} + +impl Drop for StderrWriter { + fn drop(&mut self) { + let buf = self.buffer.take().expect("buffer is only None after `drop`; qed"); + if self.sync_flush_on_drop || buf.len() >= SYNC_FLUSH_THRESHOLD { + flush_logs(buf); + } else if self.original_len < ASYNC_FLUSH_THRESHOLD && buf.len() >= ASYNC_FLUSH_THRESHOLD { + ASYNC_FLUSH_CONDVAR.notify_one(); + } + } +} diff --git a/substrate/primitives/tracing/Cargo.toml b/substrate/primitives/tracing/Cargo.toml index 85eb22d6df..46930a674f 100644 --- a/substrate/primitives/tracing/Cargo.toml +++ b/substrate/primitives/tracing/Cargo.toml @@ -23,8 +23,8 @@ codec = { version = "2.0.0", package = "parity-scale-codec", default-features = "derive", ] } tracing = { version = "0.1.29", default-features = false } -tracing-core = { version = "0.1.17", default-features = false } -tracing-subscriber = { version = "0.2.19", optional = true, features = [ +tracing-core = { version = "0.1.21", default-features = false } +tracing-subscriber = { version = "0.2.25", optional = true, features = [ "tracing-log", ] }