mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 23:18:01 +00:00
Report tasks panics in metrics (#5878)
* Report tasks panics in metrics * Renames * Renames
This commit is contained in:
@@ -13,15 +13,12 @@
|
||||
|
||||
//! Substrate service tasks management module.
|
||||
|
||||
use std::{
|
||||
pin::Pin,
|
||||
result::Result, sync::Arc
|
||||
};
|
||||
use std::{panic, pin::Pin, result::Result, sync::Arc};
|
||||
use exit_future::Signal;
|
||||
use log::{debug};
|
||||
use futures::{
|
||||
Future, FutureExt,
|
||||
future::select,
|
||||
future::{select, Either},
|
||||
compat::*,
|
||||
task::{Spawn, FutureObj, SpawnError},
|
||||
};
|
||||
@@ -74,17 +71,36 @@ impl SpawnTaskHandle {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.tasks_spawned.with_label_values(&[name]).inc();
|
||||
// We do a dummy increase in order for the task to show up in metrics.
|
||||
metrics.tasks_ended.with_label_values(&[name]).inc_by(0);
|
||||
metrics.tasks_ended.with_label_values(&[name, "finished"]).inc_by(0);
|
||||
}
|
||||
|
||||
let future = async move {
|
||||
if let Some(metrics) = metrics {
|
||||
let poll_duration = metrics.poll_duration.with_label_values(&[name]);
|
||||
let poll_start = metrics.poll_start.with_label_values(&[name]);
|
||||
let task = prometheus_future::with_poll_durations(poll_duration, poll_start, task);
|
||||
// Add some wrappers around `task`.
|
||||
let task = {
|
||||
let poll_duration = metrics.poll_duration.with_label_values(&[name]);
|
||||
let poll_start = metrics.poll_start.with_label_values(&[name]);
|
||||
let inner = prometheus_future::with_poll_durations(poll_duration, poll_start, task);
|
||||
// The logic of `AssertUnwindSafe` here is ok considering that we throw
|
||||
// away the `Future` after it has panicked.
|
||||
panic::AssertUnwindSafe(inner).catch_unwind()
|
||||
};
|
||||
futures::pin_mut!(task);
|
||||
let _ = select(on_exit, task).await;
|
||||
metrics.tasks_ended.with_label_values(&[name]).inc();
|
||||
|
||||
match select(on_exit, task).await {
|
||||
Either::Right((Err(payload), _)) => {
|
||||
metrics.tasks_ended.with_label_values(&[name, "panic"]).inc();
|
||||
panic::resume_unwind(payload)
|
||||
}
|
||||
Either::Right((Ok(()), _)) => {
|
||||
metrics.tasks_ended.with_label_values(&[name, "finished"]).inc();
|
||||
}
|
||||
Either::Left(((), _)) => {
|
||||
// The `on_exit` has triggered.
|
||||
metrics.tasks_ended.with_label_values(&[name, "interrupted"]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
futures::pin_mut!(task);
|
||||
let _ = select(on_exit, task).await;
|
||||
@@ -98,7 +114,7 @@ impl SpawnTaskHandle {
|
||||
impl Spawn for SpawnTaskHandle {
|
||||
fn spawn_obj(&self, future: FutureObj<'static, ()>)
|
||||
-> Result<(), SpawnError> {
|
||||
self.spawn("unamed", future);
|
||||
self.spawn("unnamed", future);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -220,9 +236,9 @@ impl Metrics {
|
||||
tasks_ended: register(CounterVec::new(
|
||||
Opts::new(
|
||||
"tasks_ended_total",
|
||||
"Total number of tasks for which Future::poll has returned Ready(())"
|
||||
"Total number of tasks for which Future::poll has returned Ready(()) or panicked"
|
||||
),
|
||||
&["task_name"]
|
||||
&["task_name", "reason"]
|
||||
)?, registry)?,
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user