Print "stalled" task on shutdown (#13022)

* Print "stalled" task on shutdown

When the node is shutting down, we give the Tokio runtime 60 seconds to shutdown. If after these 60
seconds there are still running tasks, we now print these tasks. This should help debugging nodes
that have stalled tasks.

This pr introduces a `TaskRegistry` that keeps track of all running tasks. Each task registers and
unregisters itself in this `TaskRegistry`.

* Fix rustdoc

* Update client/service/src/lib.rs
This commit is contained in:
Bastian Köcher
2022-12-28 09:16:52 +01:00
committed by GitHub
parent e9646fdc7e
commit 0a94112c9d
5 changed files with 192 additions and 25 deletions
+1 -1
View File
@@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};
pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME};
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -24,12 +24,19 @@ use futures::{
future::{pending, select, try_join_all, BoxFuture, Either},
Future, FutureExt, StreamExt,
};
use parking_lot::Mutex;
use prometheus_endpoint::{
exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
Registry, U64,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{panic, pin::Pin, result::Result};
use std::{
collections::{hash_map::Entry, HashMap},
panic,
pin::Pin,
result::Result,
sync::Arc,
};
use tokio::runtime::Handle;
use tracing_futures::Instrument;
@@ -72,6 +79,7 @@ pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
tokio_handle: Handle,
metrics: Option<Metrics>,
task_registry: TaskRegistry,
}
impl SpawnTaskHandle {
@@ -113,6 +121,7 @@ impl SpawnTaskHandle {
) {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
let registry = self.task_registry.clone();
let group = match group.into() {
GroupName::Specific(var) => var,
@@ -129,6 +138,10 @@ impl SpawnTaskHandle {
}
let future = async move {
// Register the task and keep the "token" alive until the task is ended. Then this
// "token" will unregister this task.
let _registry_token = registry.register_task(name, group);
if let Some(metrics) = metrics {
// Add some wrappers around `task`.
let task = {
@@ -298,6 +311,8 @@ pub struct TaskManager {
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
children: Vec<TaskManager>,
/// The registry of all running tasks.
task_registry: TaskRegistry,
}
impl TaskManager {
@@ -324,6 +339,7 @@ impl TaskManager {
essential_failed_rx,
keep_alive: Box::new(()),
children: Vec::new(),
task_registry: Default::default(),
})
}
@@ -333,6 +349,7 @@ impl TaskManager {
on_exit: self.on_exit.clone(),
tokio_handle: self.tokio_handle.clone(),
metrics: self.metrics.clone(),
task_registry: self.task_registry.clone(),
}
}
@@ -385,6 +402,14 @@ impl TaskManager {
pub fn add_child(&mut self, child: TaskManager) {
self.children.push(child);
}
/// Consume `self` and return the [`TaskRegistry`].
///
/// This [`TaskRegistry`] can be used to check for still running tasks after this task manager
/// was dropped.
pub fn into_task_registry(self) -> TaskRegistry {
self.task_registry
}
}
#[derive(Clone)]
@@ -434,3 +459,74 @@ impl Metrics {
})
}
}
/// Ensures that a [`Task`] is unregistered when this object is dropped.
struct UnregisterOnDrop {
task: Task,
registry: TaskRegistry,
}
impl Drop for UnregisterOnDrop {
fn drop(&mut self) {
let mut tasks = self.registry.tasks.lock();
if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
*entry.get_mut() -= 1;
if *entry.get() == 0 {
entry.remove();
}
}
}
}
/// Represents a running async task in the [`TaskManager`].
///
/// As a task is identified by a name and a group, it is totally valid that there exists multiple
/// tasks with the same name and group.
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct Task {
/// The name of the task.
pub name: &'static str,
/// The group this task is associated to.
pub group: &'static str,
}
impl Task {
/// Returns if the `group` is the [`DEFAULT_GROUP_NAME`].
pub fn is_default_group(&self) -> bool {
self.group == DEFAULT_GROUP_NAME
}
}
/// Keeps track of all running [`Task`]s in [`TaskManager`].
#[derive(Clone, Default)]
pub struct TaskRegistry {
tasks: Arc<Mutex<HashMap<Task, usize>>>,
}
impl TaskRegistry {
/// Register a task with the given `name` and `group`.
///
/// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is
/// dropped.
fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
let task = Task { name, group };
{
let mut tasks = self.tasks.lock();
*(*tasks).entry(task.clone()).or_default() += 1;
}
UnregisterOnDrop { task, registry: self.clone() }
}
/// Returns the running tasks.
///
/// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The
/// number per task represents the concurrently running tasks with the same identifier.
pub fn running_tasks(&self) -> HashMap<Task, usize> {
(*self.tasks.lock()).clone()
}
}