Add diagnostics to tasks (#4752)

This commit is contained in:
Pierre Krieger
2020-01-29 11:46:39 +01:00
committed by GitHub
parent 34c1c4b954
commit b452867eb7
10 changed files with 117 additions and 52 deletions
+42 -22
View File
@@ -27,7 +27,7 @@ pub mod error;
mod builder;
mod status_sinks;
use std::{io, pin::Pin};
use std::{borrow::Cow, io, pin::Pin};
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::collections::HashMap;
@@ -42,7 +42,7 @@ use futures::{
future::select, channel::mpsc,
compat::*,
sink::SinkExt,
task::{Spawn, SpawnExt, FutureObj, SpawnError},
task::{Spawn, FutureObj, SpawnError},
};
use sc_network::{
NetworkService, NetworkState, specialization::NetworkSpecialization,
@@ -92,9 +92,9 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
/// A receiver for spawned essential-tasks concluding.
essential_failed_rx: mpsc::UnboundedReceiver<()>,
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
to_spawn_tx: mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
/// How to spawn background tasks.
tasks_executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
@@ -112,15 +112,29 @@ pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
sender: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
sender: mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
on_exit: exit_future::Exit,
}
impl SpawnTaskHandle {
/// Spawns the given task with the given name.
pub fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
let on_exit = self.on_exit.clone();
let future = async move {
futures::pin_mut!(task);
let _ = select(on_exit, task).await;
};
if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() {
error!("Failed to send task to spawn over channel");
}
}
}
impl Spawn for SpawnTaskHandle {
fn spawn_obj(&self, future: FutureObj<'static, ()>)
-> Result<(), SpawnError> {
let future = select(self.on_exit.clone(), future).map(drop);
self.sender.unbounded_send(Box::pin(future))
self.sender.unbounded_send((Box::pin(future), From::from("unnamed")))
.map_err(|_| SpawnError::shutdown())
}
}
@@ -129,7 +143,7 @@ type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send +
impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError<Boxed01Future01>>{
self.spawn(future.compat().map(drop));
self.spawn("unnamed", future.compat().map(drop));
Ok(())
}
}
@@ -159,12 +173,12 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
fn telemetry(&self) -> Option<sc_telemetry::Telemetry>;
/// Spawns a task in the background that runs the future passed as parameter.
fn spawn_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static);
fn spawn_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static);
/// Spawns a task in the background that runs the future passed as
/// parameter. The given task is considered essential, i.e. if it errors we
/// trigger a service exit.
fn spawn_essential_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static);
fn spawn_essential_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static);
/// Returns a handle for spawning tasks.
fn spawn_task_handle(&self) -> SpawnTaskHandle;
@@ -238,12 +252,16 @@ where
self.keystore.clone()
}
fn spawn_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static) {
let task = select(self.on_exit(), task).map(drop);
let _ = self.to_spawn_tx.unbounded_send(Box::pin(task));
fn spawn_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
let on_exit = self.on_exit();
let task = async move {
futures::pin_mut!(task);
let _ = select(on_exit, task).await;
};
let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into()));
}
fn spawn_essential_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static) {
fn spawn_essential_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
let mut essential_failed = self.essential_failed_tx.clone();
let essential_task = std::panic::AssertUnwindSafe(task)
.catch_unwind()
@@ -251,9 +269,13 @@ where
error!("Essential task failed. Shutting down service.");
let _ = essential_failed.send(());
});
let task = select(self.on_exit(), essential_task).map(drop);
let on_exit = self.on_exit();
let task = async move {
futures::pin_mut!(essential_task);
let _ = select(on_exit, essential_task).await;
};
let _ = self.to_spawn_tx.unbounded_send(Box::pin(task));
let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into()));
}
fn spawn_task_handle(&self) -> SpawnTaskHandle {
@@ -317,8 +339,8 @@ impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
}
}
while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
(this.tasks_executor)(task_to_spawn);
while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
(this.tasks_executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn)));
}
// The service future never ends.
@@ -333,7 +355,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Spawn for
&self,
future: FutureObj<'static, ()>
) -> Result<(), SpawnError> {
self.to_spawn_tx.unbounded_send(Box::pin(future))
self.to_spawn_tx.unbounded_send((Box::pin(future), From::from("unnamed")))
.map_err(|_| SpawnError::shutdown())
}
}
@@ -575,7 +597,7 @@ pub struct TransactionPoolAdapter<C, P> {
imports_external_transactions: bool,
pool: Arc<P>,
client: Arc<C>,
executor: TaskExecutor,
executor: SpawnTaskHandle,
}
/// Get transactions for propagation.
@@ -649,9 +671,7 @@ where
}
});
if let Err(e) = self.executor.spawn(Box::new(import_future)) {
warn!("Error scheduling extrinsic import: {:?}", e);
}
self.executor.spawn("extrinsic-import", import_future);
}
Err(e) => debug!("Error decoding transaction {}", e),
}