Fix tracing spans are not being forwarded to spawned task (#8009)

* Fix tracing spans are not being forwarded to spawned task

There is a bug that tracing spans are not forwarded to spawned task. The
problem was that only the telemetry span was forwarded. The solution to
this is to use the tracing provided `in_current_span` to capture the
current active span and pass the telemetry span explictely. We will now
always enter the span when the future is polled. This is essentially the
same strategy as tracing is doing with its `Instrumented`, but now
extended for our use case with having multiple spans active.

* More tests
This commit is contained in:
Bastian Köcher
2021-02-01 15:54:21 +01:00
committed by GitHub
parent 4da880ed41
commit c42d756fb7
4 changed files with 158 additions and 32 deletions
@@ -24,7 +24,7 @@ use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture, join_all, try_join_all, pending},
sink::SinkExt,
sink::SinkExt, task::{Context, Poll},
};
use prometheus_endpoint::{
exponential_buckets, register,
@@ -40,6 +40,37 @@ mod prometheus_future;
#[cfg(test)]
mod tests;
/// A wrapper around a `[Option<TelemetrySpan>]` and a [`Future`].
///
/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure"
/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support
/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every
/// time the future is polled and polls the inner future. So, the inner future can still have its
/// own span attached and we get our telemetry span ;)
struct WithTelemetrySpan<T> {
span: Option<TelemetrySpan>,
inner: T,
}
impl<T> WithTelemetrySpan<T> {
fn new(span: Option<TelemetrySpan>, inner: T) -> Self {
Self {
span,
inner,
}
}
}
impl<T: Future<Output = ()> + Unpin> Future for WithTelemetrySpan<T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let span = self.span.clone();
let _enter = span.as_ref().map(|s| s.enter());
Pin::new(&mut self.inner).poll(ctx)
}
}
/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
@@ -124,10 +155,11 @@ impl SpawnTaskHandle {
}
};
let join_handle = {
let _span = self.telemetry_span.as_ref().map(|s| s.enter());
self.executor.spawn(Box::pin(future.in_current_span()), task_type)
};
let future = future.in_current_span().boxed();
let join_handle = self.executor.spawn(
WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(),
task_type,
);
let mut task_notifier = self.task_notifier.clone();
self.executor.spawn(