Report tasks metrics to Prometheus (#5619)

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Pierre Krieger
2020-04-15 11:10:03 +02:00
committed by GitHub
parent b267b1728f
commit cff1390456
6 changed files with 211 additions and 42 deletions
+119 -31
View File
@@ -14,9 +14,9 @@
//! Substrate service tasks management module.
use std::{
pin::Pin,
result::Result, sync::Arc,
task::{Poll, Context},
borrow::Cow, pin::Pin,
};
use exit_future::Signal;
use log::{debug, error};
@@ -26,14 +26,21 @@ use futures::{
compat::*,
task::{Spawn, FutureObj, SpawnError},
};
use prometheus_endpoint::{
exponential_buckets, register,
PrometheusError,
CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64
};
use sc_client_api::CloneableSpawn;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
mod prometheus_future;
/// Type alias for service task executor (usually runtime).
pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>;
/// Type alias for the task scheduler.
pub type TaskScheduler = TracingUnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>;
pub type TaskScheduler = TracingUnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>;
/// Helper struct to setup background tasks execution for service.
pub struct TaskManagerBuilder {
@@ -45,20 +52,29 @@ pub struct TaskManagerBuilder {
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: TaskScheduler,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: TracingUnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
to_spawn_rx: TracingUnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Prometheus metrics where to report the stats about tasks.
metrics: Option<Metrics>,
}
impl TaskManagerBuilder {
/// New asynchronous task manager setup.
pub fn new() -> Self {
///
/// If a Prometheus registry is passed, it will be used to report statistics about the
/// service tasks.
pub fn new(prometheus_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
let (signal, on_exit) = exit_future::signal();
let (to_spawn_tx, to_spawn_rx) = tracing_unbounded("mpsc_task_manager");
Self {
let metrics = prometheus_registry.map(Metrics::register).transpose()?;
Ok(Self {
on_exit,
signal: Some(signal),
to_spawn_tx,
to_spawn_rx,
}
metrics,
})
}
/// Get spawn handle.
@@ -69,6 +85,7 @@ impl TaskManagerBuilder {
SpawnTaskHandle {
on_exit: self.on_exit.clone(),
sender: self.to_spawn_tx.clone(),
metrics: self.metrics.clone(),
}
}
@@ -78,7 +95,8 @@ impl TaskManagerBuilder {
on_exit,
signal,
to_spawn_rx,
to_spawn_tx
to_spawn_tx,
metrics,
} = self;
TaskManager {
on_exit,
@@ -86,6 +104,7 @@ impl TaskManagerBuilder {
to_spawn_tx,
to_spawn_rx,
executor,
metrics,
}
}
}
@@ -95,17 +114,45 @@ impl TaskManagerBuilder {
pub struct SpawnTaskHandle {
sender: TaskScheduler,
on_exit: exit_future::Exit,
metrics: Option<Metrics>,
}
impl SpawnTaskHandle {
/// Spawns the given task with the given name.
pub fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
///
/// Note that the `name` is a `&'static str`. The reason for this choice is that statistics
/// about this task are getting reported to the Prometheus endpoint (if enabled), and that
/// therefore the set of possible task names must be bounded.
///
/// In other words, it would be a bad idea for someone to do for example
/// `spawn(format!("{:?}", some_public_key))`.
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
// Note that we increase the started counter here and not within the future. This way,
// we could properly visualize on Prometheus situations where the spawning doesn't work.
if let Some(metrics) = &self.metrics {
metrics.tasks_spawned.with_label_values(&[name]).inc();
// We do a dummy increase in order for the task to show up in metrics.
metrics.tasks_ended.with_label_values(&[name]).inc_by(0);
}
let future = async move {
futures::pin_mut!(task);
let _ = select(on_exit, task).await;
if let Some(metrics) = metrics {
let poll_duration = metrics.poll_duration.with_label_values(&[name]);
let poll_start = metrics.poll_start.with_label_values(&[name]);
let task = prometheus_future::with_poll_durations(poll_duration, poll_start, task);
futures::pin_mut!(task);
let _ = select(on_exit, task).await;
metrics.tasks_ended.with_label_values(&[name]).inc();
} else {
futures::pin_mut!(task);
let _ = select(on_exit, task).await;
}
};
if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() {
if self.sender.unbounded_send(Box::pin(future)).is_err() {
error!("Failed to send task to spawn over channel");
}
}
@@ -114,9 +161,8 @@ impl SpawnTaskHandle {
impl Spawn for SpawnTaskHandle {
fn spawn_obj(&self, future: FutureObj<'static, ()>)
-> Result<(), SpawnError> {
let future = select(self.on_exit.clone(), future).map(drop);
self.sender.unbounded_send((Box::pin(future), From::from("unnamed")))
.map_err(|_| SpawnError::shutdown())
self.spawn("unamed", future);
Ok(())
}
}
@@ -145,40 +191,34 @@ pub struct TaskManager {
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: TaskScheduler,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: TracingUnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
/// Note: please read comment on [`SpawnTaskHandle::spawn`] for why this is a `&'static str`.
to_spawn_rx: TracingUnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// How to spawn background tasks.
executor: ServiceTaskExecutor,
/// Prometheus metric where to report the polling times.
metrics: Option<Metrics>,
}
impl TaskManager {
/// Spawn background/async task, which will be aware on exit signal.
pub(super) fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
let on_exit = self.on_exit.clone();
let future = async move {
futures::pin_mut!(task);
let _ = select(on_exit, task).await;
};
if self.to_spawn_tx.unbounded_send((Box::pin(future), name.into())).is_err() {
error!("Failed to send task to spawn over channel");
}
///
/// See also the documentation of [`SpawnTaskHandler::spawn`].
pub(super) fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_handle().spawn(name, task)
}
pub(super) fn spawn_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
on_exit: self.on_exit.clone(),
sender: self.to_spawn_tx.clone(),
metrics: self.metrics.clone(),
}
}
/// Get sender where background/async tasks can be sent.
pub(super) fn scheduler(&self) -> TaskScheduler {
self.to_spawn_tx.clone()
}
/// Process background task receiver.
pub(super) fn process_receiver(&mut self, cx: &mut Context) {
while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) {
(self.executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn)));
while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) {
(self.executor)(task_to_spawn);
}
}
@@ -196,3 +236,51 @@ impl Drop for TaskManager {
}
}
}
#[derive(Clone)]
struct Metrics {
// This list is ordered alphabetically
poll_duration: HistogramVec,
poll_start: CounterVec<U64>,
tasks_spawned: CounterVec<U64>,
tasks_ended: CounterVec<U64>,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
poll_duration: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"tasks_polling_duration",
"Duration in seconds of each invocation of Future::poll"
),
buckets: exponential_buckets(0.001, 4.0, 9)
.expect("function parameters are constant and always valid; qed"),
},
&["task_name"]
)?, registry)?,
poll_start: register(CounterVec::new(
Opts::new(
"tasks_polling_started_total",
"Total number of times we started invoking Future::poll"
),
&["task_name"]
)?, registry)?,
tasks_spawned: register(CounterVec::new(
Opts::new(
"tasks_spawned_total",
"Total number of tasks that have been spawned on the Service"
),
&["task_name"]
)?, registry)?,
tasks_ended: register(CounterVec::new(
Opts::new(
"tasks_ended_total",
"Total number of tasks for which Future::poll has returned Ready(())"
),
&["task_name"]
)?, registry)?,
})
}
}