From c42d756fb7b1a9fd34f0da80bfba2044f925aaf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 1 Feb 2021 15:54:21 +0100 Subject: [PATCH] Fix tracing spans are not being forwarded to spawned task (#8009) * Fix tracing spans are not being forwarded to spawned task There is a bug that tracing spans are not forwarded to spawned task. The problem was that only the telemetry span was forwarded. The solution to this is to use the tracing provided `in_current_span` to capture the current active span and pass the telemetry span explictely. We will now always enter the span when the future is polled. This is essentially the same strategy as tracing is doing with its `Instrumented`, but now extended for our use case with having multiple spans active. * More tests --- substrate/Cargo.lock | 47 ++++----- substrate/client/service/Cargo.toml | 3 +- .../client/service/src/task_manager/mod.rs | 42 +++++++- .../client/service/src/task_manager/tests.rs | 98 ++++++++++++++++++- 4 files changed, 158 insertions(+), 32 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index a68f014e98..d8f52aa141 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2105,7 +2105,7 @@ dependencies = [ "http 0.2.1", "indexmap", "slab", - "tokio 0.2.23", + "tokio 0.2.25", "tokio-util", "tracing", "tracing-futures", @@ -2346,7 +2346,7 @@ dependencies = [ "itoa", "pin-project 1.0.2", "socket2", - "tokio 0.2.23", + "tokio 0.2.25", "tower-service", "tracing", "want 0.3.0", @@ -2365,7 +2365,7 @@ dependencies = [ "log", "rustls 0.18.1", "rustls-native-certs", - "tokio 0.2.23", + "tokio 0.2.25", "tokio-rustls", "webpki", ] @@ -3499,9 +3499,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.6.22" +version = "0.6.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" dependencies = [ "cfg-if 0.1.10", "fuchsia-zircon", @@ -3510,7 +3510,7 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow 0.2.1", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", @@ -3553,9 +3553,9 @@ dependencies = [ [[package]] name = "miow" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" dependencies = [ "kernel32-sys", "net2", @@ -3665,9 +3665,9 @@ dependencies = [ [[package]] name = "net2" -version = "0.2.35" +version = "0.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ebc3ec692ed7c9a255596c67808dee269f64655d8baf7b4f0638e51ba1d6853" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" dependencies = [ "cfg-if 0.1.10", "libc", @@ -6535,7 +6535,7 @@ dependencies = [ "tempfile", "thiserror", "tiny-bip39", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -6791,7 +6791,7 @@ dependencies = [ "substrate-test-runtime-client", "substrate-test-runtime-transaction-pool", "tempfile", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -6986,7 +6986,7 @@ dependencies = [ "substrate-prometheus-endpoint", "substrate-test-runtime-client", "tempfile", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -7230,7 +7230,7 @@ dependencies = [ "sp-utils", "substrate-test-runtime-client", "threadpool", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -7413,9 +7413,10 @@ dependencies = [ "substrate-test-runtime-client", "tempfile", "thiserror", - "tokio 0.2.23", + "tokio 0.2.25", "tracing", "tracing-futures", + "tracing-subscriber", "wasm-timer", ] @@ -8922,7 +8923,7 @@ dependencies = [ "sc-rpc-api", "serde", "sp-storage", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -8960,7 +8961,7 @@ dependencies = [ "hyper 0.13.9", "log", "prometheus", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -9075,7 +9076,7 @@ dependencies = [ "futures 0.3.9", "sc-service", "substrate-test-utils-derive", - "tokio 0.2.23", + "tokio 0.2.25", "trybuild", ] @@ -9094,7 +9095,7 @@ version = "0.1.0" dependencies = [ "sc-service", "substrate-test-utils", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -9323,9 +9324,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.23" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" +checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" dependencies = [ "bytes 0.5.6", "fnv", @@ -9459,7 +9460,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a" dependencies = [ "futures-core", "rustls 0.18.1", - "tokio 0.2.23", + "tokio 0.2.25", "webpki", ] @@ -9569,7 +9570,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.11", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index 10caca86e6..78c5f94baf 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -89,5 +89,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" sp-consensus-babe = { version = "0.8.0", path = "../../primitives/consensus/babe" } grandpa = { version = "0.8.0", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } -tokio = { version = "0.2", default-features = false } +tokio = { version = "0.2.25", default-features = false } async-std = { version = "1.6.5", default-features = false } +tracing-subscriber = "0.2.15" diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs index 6b14fbeec2..9a1fd15952 100644 --- a/substrate/client/service/src/task_manager/mod.rs +++ b/substrate/client/service/src/task_manager/mod.rs @@ -24,7 +24,7 @@ use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, future::{select, Either, BoxFuture, join_all, try_join_all, pending}, - sink::SinkExt, + sink::SinkExt, task::{Context, Poll}, }; use prometheus_endpoint::{ exponential_buckets, register, @@ -40,6 +40,37 @@ mod prometheus_future; #[cfg(test)] mod tests; +/// A wrapper around a `[Option]` and a [`Future`]. +/// +/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure" +/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support +/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every +/// time the future is polled and polls the inner future. So, the inner future can still have its +/// own span attached and we get our telemetry span ;) +struct WithTelemetrySpan { + span: Option, + inner: T, +} + +impl WithTelemetrySpan { + fn new(span: Option, inner: T) -> Self { + Self { + span, + inner, + } + } +} + +impl + Unpin> Future for WithTelemetrySpan { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { + let span = self.span.clone(); + let _enter = span.as_ref().map(|s| s.enter()); + Pin::new(&mut self.inner).poll(ctx) + } +} + /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { @@ -124,10 +155,11 @@ impl SpawnTaskHandle { } }; - let join_handle = { - let _span = self.telemetry_span.as_ref().map(|s| s.enter()); - self.executor.spawn(Box::pin(future.in_current_span()), task_type) - }; + let future = future.in_current_span().boxed(); + let join_handle = self.executor.spawn( + WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(), + task_type, + ); let mut task_notifier = self.task_notifier.clone(); self.executor.spawn( diff --git a/substrate/client/service/src/task_manager/tests.rs b/substrate/client/service/src/task_manager/tests.rs index f0ede1fc38..257f7db198 100644 --- a/substrate/client/service/src/task_manager/tests.rs +++ b/substrate/client/service/src/task_manager/tests.rs @@ -20,9 +20,10 @@ use crate::config::TaskExecutor; use crate::task_manager::TaskManager; use futures::{future::FutureExt, pin_mut, select}; use parking_lot::Mutex; -use std::any::Any; -use std::sync::Arc; -use std::time::Duration; +use std::{any::Any, sync::Arc, time::Duration}; +use tracing_subscriber::{layer::{SubscriberExt, Context}, Layer}; +use tracing::{subscriber::Subscriber, span::{Attributes, Id, Record, Span}, event::Event}; +use sc_telemetry::TelemetrySpan; #[derive(Clone, Debug)] struct DropTester(Arc>); @@ -312,3 +313,94 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } + +struct TestLayer { + spans_entered: Arc>>, + spans: Arc>>, +} + +impl Layer for TestLayer { + fn new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context) { + self.spans.lock().insert(id.clone(), attrs.metadata().name().to_string()); + } + + fn on_record(&self, _: &Id, _: &Record<'_>, _: Context) {} + + fn on_event(&self, _: &Event<'_>, _: Context) {} + + fn on_enter(&self, span: &Id, _: Context) { + let name = self.spans.lock().get(span).unwrap().clone(); + self.spans_entered.lock().push(name); + } + + fn on_exit(&self, _: &Id, _: Context) {} + + fn on_close(&self, _: Id, _: Context) {} +} + +type TestSubscriber = tracing_subscriber::layer::Layered< + TestLayer, + tracing_subscriber::fmt::Subscriber +>; + +fn setup_subscriber() -> ( + TestSubscriber, + Arc>>, +) { + let spans_entered = Arc::new(Mutex::new(Default::default())); + let layer = TestLayer { + spans: Arc::new(Mutex::new(Default::default())), + spans_entered: spans_entered.clone(), + }; + let subscriber = tracing_subscriber::fmt().finish().with(layer); + (subscriber, spans_entered) +} + +#[test] +fn telemetry_span_is_forwarded_to_task() { + let (subscriber, spans_entered) = setup_subscriber(); + let _sub_guard = tracing::subscriber::set_global_default(subscriber); + + let telemetry_span = TelemetrySpan::new(); + + let span = tracing::info_span!("test"); + let _enter = span.enter(); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ())); + let task_manager = TaskManager::new(task_executor, None, Some(telemetry_span.clone())).unwrap(); + + let (sender, receiver) = futures::channel::oneshot::channel(); + let spawn_handle = task_manager.spawn_handle(); + + let span = span.clone(); + task_manager.spawn_handle().spawn( + "test", + async move { + assert_eq!(span, Span::current()); + spawn_handle.spawn("test-nested", async move { + assert_eq!(span, Span::current()); + sender.send(()).unwrap(); + }.boxed()); + }.boxed(), + ); + + // We need to leave exit the span here. If tokio is not running with multithreading, this + // would lead to duplicate spans being "active" and forwarding the wrong one. + drop(_enter); + runtime.block_on(receiver).unwrap(); + runtime.block_on(task_manager.clean_shutdown()); + drop(runtime); + + let spans = spans_entered.lock(); + // We entered the telemetry span and the "test" in the future, the nested future and + // the "test" span outside of the future. So, we should have recorded 3 spans. + assert_eq!(5, spans.len()); + + assert_eq!(spans[0], "test"); + assert_eq!(spans[1], telemetry_span.span().metadata().unwrap().name()); + assert_eq!(spans[2], "test"); + assert_eq!(spans[3], telemetry_span.span().metadata().unwrap().name()); + assert_eq!(spans[4], "test"); +}