diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index c8e7de7761..b75ecf5fec 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6592,7 +6592,6 @@ dependencies = [ "exit-future", "futures 0.1.29", "futures 0.3.4", - "futures-diagnose", "futures-timer 3.0.2", "lazy_static", "log", @@ -6601,6 +6600,7 @@ dependencies = [ "parity-scale-codec", "parity-util-mem", "parking_lot 0.10.2", + "pin-project", "procfs", "sc-chain-spec", "sc-client", diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index b8d1e71097..83cb6d717f 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -24,7 +24,6 @@ wasmtime = [ derive_more = "0.99.2" futures01 = { package = "futures", version = "0.1.29" } futures = "0.3.4" -futures-diagnose = "1.0" parking_lot = "0.10.0" lazy_static = "1.4.0" log = "0.4.8" @@ -32,6 +31,7 @@ slog = { version = "2.5.2", features = ["nested-values"] } futures-timer = "3.0.1" wasm-timer = "0.2" exit-future = "0.2.0" +pin-project = "0.4.8" serde = "1.0.101" serde_json = "1.0.41" sysinfo = "0.12.0" diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 0eefbe730f..acb546fc60 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -170,7 +170,10 @@ fn new_full_parts( KeystoreConfig::InMemory => Keystore::new_in_memory(), }; - let tasks_builder = TaskManagerBuilder::new(); + let tasks_builder = { + let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); + TaskManagerBuilder::new(registry)? + }; let executor = NativeExecutor::::new( config.wasm_method, @@ -280,7 +283,10 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { (), TLightBackend, >, Error> { - let tasks_builder = TaskManagerBuilder::new(); + let tasks_builder = { + let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); + TaskManagerBuilder::new(registry)? + }; let keystore = match &config.keystore { KeystoreConfig::Path { path, password } => Keystore::open( diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index d5db64ea46..969453210e 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -29,7 +29,7 @@ mod builder; mod status_sinks; mod task_manager; -use std::{borrow::Cow, io, pin::Pin}; +use std::{io, pin::Pin}; use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; @@ -139,12 +139,18 @@ pub trait AbstractService: 'static + Future> + fn telemetry(&self) -> Option; /// Spawns a task in the background that runs the future passed as parameter. - fn spawn_task(&self, name: impl Into>, task: impl Future + Send + 'static); + /// + /// Information about this task will be reported to Prometheus. + /// + /// The task name is a `&'static str` as opposed to a `String`. The reason for that is that + /// in order to avoid memory consumption issues with the Prometheus metrics, the set of + /// possible task names has to be bounded. + fn spawn_task(&self, name: &'static str, task: impl Future + Send + 'static); /// Spawns a task in the background that runs the future passed as /// parameter. The given task is considered essential, i.e. if it errors we /// trigger a service exit. - fn spawn_essential_task(&self, name: impl Into>, task: impl Future + Send + 'static); + fn spawn_essential_task(&self, name: &'static str, task: impl Future + Send + 'static); /// Returns a handle for spawning tasks. fn spawn_task_handle(&self) -> SpawnTaskHandle; @@ -220,11 +226,11 @@ where self.keystore.clone() } - fn spawn_task(&self, name: impl Into>, task: impl Future + Send + 'static) { + fn spawn_task(&self, name: &'static str, task: impl Future + Send + 'static) { self.task_manager.spawn(name, task) } - fn spawn_essential_task(&self, name: impl Into>, task: impl Future + Send + 'static) { + fn spawn_essential_task(&self, name: &'static str, task: impl Future + Send + 'static) { let mut essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() @@ -312,8 +318,8 @@ impl Spawn for &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError> { - self.task_manager.scheduler().unbounded_send((Box::pin(future), From::from("unnamed"))) - .map_err(|_| SpawnError::shutdown()) + self.task_manager.spawn_handle().spawn("unnamed", future); + Ok(()) } } diff --git a/substrate/client/service/src/task_manager.rs b/substrate/client/service/src/task_manager.rs index 7c5862e853..fd7fc62ab5 100644 --- a/substrate/client/service/src/task_manager.rs +++ b/substrate/client/service/src/task_manager.rs @@ -14,9 +14,9 @@ //! Substrate service tasks management module. use std::{ + pin::Pin, result::Result, sync::Arc, task::{Poll, Context}, - borrow::Cow, pin::Pin, }; use exit_future::Signal; use log::{debug, error}; @@ -26,14 +26,21 @@ use futures::{ compat::*, task::{Spawn, FutureObj, SpawnError}, }; +use prometheus_endpoint::{ + exponential_buckets, register, + PrometheusError, + CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64 +}; use sc_client_api::CloneableSpawn; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; +mod prometheus_future; + /// Type alias for service task executor (usually runtime). pub type ServiceTaskExecutor = Arc + Send>>) + Send + Sync>; /// Type alias for the task scheduler. -pub type TaskScheduler = TracingUnboundedSender<(Pin + Send>>, Cow<'static, str>)>; +pub type TaskScheduler = TracingUnboundedSender + Send>>>; /// Helper struct to setup background tasks execution for service. pub struct TaskManagerBuilder { @@ -45,20 +52,29 @@ pub struct TaskManagerBuilder { /// Sender for futures that must be spawned as background tasks. to_spawn_tx: TaskScheduler, /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: TracingUnboundedReceiver<(Pin + Send>>, Cow<'static, str>)>, + to_spawn_rx: TracingUnboundedReceiver + Send>>>, + /// Prometheus metrics where to report the stats about tasks. + metrics: Option, } impl TaskManagerBuilder { /// New asynchronous task manager setup. - pub fn new() -> Self { + /// + /// If a Prometheus registry is passed, it will be used to report statistics about the + /// service tasks. + pub fn new(prometheus_registry: Option<&Registry>) -> Result { let (signal, on_exit) = exit_future::signal(); let (to_spawn_tx, to_spawn_rx) = tracing_unbounded("mpsc_task_manager"); - Self { + + let metrics = prometheus_registry.map(Metrics::register).transpose()?; + + Ok(Self { on_exit, signal: Some(signal), to_spawn_tx, to_spawn_rx, - } + metrics, + }) } /// Get spawn handle. @@ -69,6 +85,7 @@ impl TaskManagerBuilder { SpawnTaskHandle { on_exit: self.on_exit.clone(), sender: self.to_spawn_tx.clone(), + metrics: self.metrics.clone(), } } @@ -78,7 +95,8 @@ impl TaskManagerBuilder { on_exit, signal, to_spawn_rx, - to_spawn_tx + to_spawn_tx, + metrics, } = self; TaskManager { on_exit, @@ -86,6 +104,7 @@ impl TaskManagerBuilder { to_spawn_tx, to_spawn_rx, executor, + metrics, } } } @@ -95,17 +114,45 @@ impl TaskManagerBuilder { pub struct SpawnTaskHandle { sender: TaskScheduler, on_exit: exit_future::Exit, + metrics: Option, } impl SpawnTaskHandle { /// Spawns the given task with the given name. - pub fn spawn(&self, name: impl Into>, task: impl Future + Send + 'static) { + /// + /// Note that the `name` is a `&'static str`. The reason for this choice is that statistics + /// about this task are getting reported to the Prometheus endpoint (if enabled), and that + /// therefore the set of possible task names must be bounded. + /// + /// In other words, it would be a bad idea for someone to do for example + /// `spawn(format!("{:?}", some_public_key))`. + pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { let on_exit = self.on_exit.clone(); + let metrics = self.metrics.clone(); + + // Note that we increase the started counter here and not within the future. This way, + // we could properly visualize on Prometheus situations where the spawning doesn't work. + if let Some(metrics) = &self.metrics { + metrics.tasks_spawned.with_label_values(&[name]).inc(); + // We do a dummy increase in order for the task to show up in metrics. + metrics.tasks_ended.with_label_values(&[name]).inc_by(0); + } + let future = async move { - futures::pin_mut!(task); - let _ = select(on_exit, task).await; + if let Some(metrics) = metrics { + let poll_duration = metrics.poll_duration.with_label_values(&[name]); + let poll_start = metrics.poll_start.with_label_values(&[name]); + let task = prometheus_future::with_poll_durations(poll_duration, poll_start, task); + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + metrics.tasks_ended.with_label_values(&[name]).inc(); + } else { + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + } }; - if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() { + + if self.sender.unbounded_send(Box::pin(future)).is_err() { error!("Failed to send task to spawn over channel"); } } @@ -114,9 +161,8 @@ impl SpawnTaskHandle { impl Spawn for SpawnTaskHandle { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - let future = select(self.on_exit.clone(), future).map(drop); - self.sender.unbounded_send((Box::pin(future), From::from("unnamed"))) - .map_err(|_| SpawnError::shutdown()) + self.spawn("unamed", future); + Ok(()) } } @@ -145,40 +191,34 @@ pub struct TaskManager { /// Sender for futures that must be spawned as background tasks. to_spawn_tx: TaskScheduler, /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: TracingUnboundedReceiver<(Pin + Send>>, Cow<'static, str>)>, + /// Note: please read comment on [`SpawnTaskHandle::spawn`] for why this is a `&'static str`. + to_spawn_rx: TracingUnboundedReceiver + Send>>>, /// How to spawn background tasks. executor: ServiceTaskExecutor, + /// Prometheus metric where to report the polling times. + metrics: Option, } impl TaskManager { /// Spawn background/async task, which will be aware on exit signal. - pub(super) fn spawn(&self, name: impl Into>, task: impl Future + Send + 'static) { - let on_exit = self.on_exit.clone(); - let future = async move { - futures::pin_mut!(task); - let _ = select(on_exit, task).await; - }; - if self.to_spawn_tx.unbounded_send((Box::pin(future), name.into())).is_err() { - error!("Failed to send task to spawn over channel"); - } + /// + /// See also the documentation of [`SpawnTaskHandler::spawn`]. + pub(super) fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { + self.spawn_handle().spawn(name, task) } pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), sender: self.to_spawn_tx.clone(), + metrics: self.metrics.clone(), } } - /// Get sender where background/async tasks can be sent. - pub(super) fn scheduler(&self) -> TaskScheduler { - self.to_spawn_tx.clone() - } - /// Process background task receiver. pub(super) fn process_receiver(&mut self, cx: &mut Context) { - while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) { - (self.executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn))); + while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) { + (self.executor)(task_to_spawn); } } @@ -196,3 +236,51 @@ impl Drop for TaskManager { } } } + +#[derive(Clone)] +struct Metrics { + // This list is ordered alphabetically + poll_duration: HistogramVec, + poll_start: CounterVec, + tasks_spawned: CounterVec, + tasks_ended: CounterVec, +} + +impl Metrics { + fn register(registry: &Registry) -> Result { + Ok(Self { + poll_duration: register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "tasks_polling_duration", + "Duration in seconds of each invocation of Future::poll" + ), + buckets: exponential_buckets(0.001, 4.0, 9) + .expect("function parameters are constant and always valid; qed"), + }, + &["task_name"] + )?, registry)?, + poll_start: register(CounterVec::new( + Opts::new( + "tasks_polling_started_total", + "Total number of times we started invoking Future::poll" + ), + &["task_name"] + )?, registry)?, + tasks_spawned: register(CounterVec::new( + Opts::new( + "tasks_spawned_total", + "Total number of tasks that have been spawned on the Service" + ), + &["task_name"] + )?, registry)?, + tasks_ended: register(CounterVec::new( + Opts::new( + "tasks_ended_total", + "Total number of tasks for which Future::poll has returned Ready(())" + ), + &["task_name"] + )?, registry)?, + }) + } +} diff --git a/substrate/client/service/src/task_manager/prometheus_future.rs b/substrate/client/service/src/task_manager/prometheus_future.rs new file mode 100644 index 0000000000..53bd59aa7a --- /dev/null +++ b/substrate/client/service/src/task_manager/prometheus_future.rs @@ -0,0 +1,69 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Wrapper around a `Future` that reports statistics about when the `Future` is polled. + +use futures::prelude::*; +use prometheus_endpoint::{Counter, Histogram, U64}; +use std::{fmt, pin::Pin, task::{Context, Poll}}; + +/// Wraps around a `Future`. Report the polling duration to the `Histogram` and when the polling +/// starts to the `Counter`. +pub fn with_poll_durations( + poll_duration: Histogram, + poll_start: Counter, + inner: T +) -> PrometheusFuture { + PrometheusFuture { + inner, + poll_duration, + poll_start, + } +} + +/// Wraps around `Future` and adds diagnostics to it. +#[pin_project::pin_project] +#[derive(Clone)] +pub struct PrometheusFuture { + /// The inner future doing the actual work. + #[pin] + inner: T, + poll_duration: Histogram, + poll_start: Counter, +} + +impl Future for PrometheusFuture +where + T: Future, +{ + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + + this.poll_start.inc(); + let _timer = this.poll_duration.start_timer(); + Future::poll(this.inner, cx) + + // `_timer` is dropped here and will observe the duration + } +} + +impl fmt::Debug for PrometheusFuture +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.inner, f) + } +}