Add group name in task metrics (#10196)

* SpawnNamed: add new trait methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* Implement new methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* cargo fmt

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* SpawnNamed: add new trait methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* Implement new methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* cargo fmt

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* New approach - spaw() group param

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update traits: SpawnNamed and SpawnNamed

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update TaskManager tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update test TaskExecutor

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fix typo

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* grunt work: fix spawn() calls

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* cargo fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* remove old code

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* cargo fmt - the right version

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Implement review feedback

- use Option group name in SpawnNamed methods
- switch to kebab case
- implement default group name
- add group name to some tasks

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
sandreim
2021-11-11 19:15:09 +02:00
committed by GitHub
parent 2c5337e4b2
commit fdb3c64243
19 changed files with 232 additions and 100 deletions
@@ -89,7 +89,7 @@ pub fn new_partial(
let client = Arc::new(client);
let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});
@@ -289,7 +289,9 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
// the AURA authoring task is considered essential, i.e. if it
// fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking("aura", aura);
task_manager
.spawn_essential_handle()
.spawn_blocking("aura", Some("block-authoring"), aura);
}
// if the node isn't actively participating in consensus then it doesn't
@@ -329,6 +331,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
// if it fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
sc_finality_grandpa::run_grandpa_voter(grandpa_config)?,
);
}
+16 -8
View File
@@ -170,7 +170,7 @@ pub fn new_partial(
let client = Arc::new(client);
let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});
@@ -436,7 +436,11 @@ pub fn new_full_base(
};
let babe = sc_consensus_babe::start_babe(babe_config)?;
task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", babe);
task_manager.spawn_essential_handle().spawn_blocking(
"babe-proposer",
Some("block-authoring"),
babe,
);
}
// Spawn authority discovery module.
@@ -463,9 +467,11 @@ pub fn new_full_base(
prometheus_registry.clone(),
);
task_manager
.spawn_handle()
.spawn("authority-discovery-worker", authority_discovery_worker.run());
task_manager.spawn_handle().spawn(
"authority-discovery-worker",
Some("networking"),
authority_discovery_worker.run(),
);
}
// if the node isn't actively participating in consensus then it doesn't
@@ -503,9 +509,11 @@ pub fn new_full_base(
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
task_manager
.spawn_essential_handle()
.spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?);
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
grandpa::run_grandpa_voter(grandpa_config)?,
);
}
network_starter.start_network();
+12 -2
View File
@@ -243,11 +243,21 @@ impl TaskExecutor {
}
impl SpawnNamed for TaskExecutor {
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.pool.spawn_ok(future);
}
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.pool.spawn_ok(future);
}
}
@@ -270,6 +270,7 @@ where
spawn_handle.spawn_blocking(
"basic-authorship-proposer",
None,
Box::pin(async move {
// leave some time for evaluation and block finalization (33%)
let deadline = (self.now)() + max_duration - max_duration / 3;
@@ -89,7 +89,11 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
metrics,
);
spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed());
spawner.spawn_essential_blocking(
"basic-block-import-worker",
Some("block-import"),
future.boxed(),
);
Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
}
@@ -399,6 +399,7 @@ impl RuntimeSpawn for RuntimeInstanceSpawn {
let scheduler = self.scheduler.clone();
self.scheduler.spawn(
"executor-extra-runtime-instance",
None,
Box::pin(async move {
let module = AssertUnwindSafe(module);
+1
View File
@@ -226,6 +226,7 @@ pub async fn notification_future<Client, Block, Spawner>(
if n.is_new_best {
spawner.spawn(
"offchain-on-block",
Some("offchain-worker"),
offchain
.on_block_imported(&n.header, network_provider.clone(), is_validator)
.boxed(),
+2 -1
View File
@@ -54,7 +54,8 @@ impl SubscriptionTaskExecutor {
impl Spawn for SubscriptionTaskExecutor {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.0.spawn("substrate-rpc-subscription", future.map(drop).boxed());
self.0
.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
Ok(())
}
+13 -7
View File
@@ -424,6 +424,7 @@ where
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
Some("offchain-worker"),
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
@@ -505,11 +506,13 @@ where
// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
Some("transaction-pool"),
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
spawn_handle.spawn(
"on-transaction-imported",
Some("transaction-pool"),
transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()),
);
@@ -520,6 +523,7 @@ where
let metrics = MetricsService::with_prometheus(telemetry.clone(), &registry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
None,
prometheus_endpoint::init_prometheus(port, registry).map(drop),
);
@@ -531,6 +535,7 @@ where
// Periodically updated metrics and telemetry updates.
spawn_handle.spawn(
"telemetry-periodic-send",
None,
metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()),
);
@@ -567,6 +572,7 @@ where
// Spawn informant task
spawn_handle.spawn(
"informant",
None,
sc_informant::build(
client.clone(),
network.clone(),
@@ -798,7 +804,7 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("block_request_handler", handler.run());
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
protocol_config
}
};
@@ -815,7 +821,7 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("state_request_handler", handler.run());
spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
protocol_config
}
};
@@ -828,7 +834,7 @@ where
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
spawn_handle.spawn("warp_sync_request_handler", handler.run());
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
protocol_config
};
(provider, protocol_config)
@@ -842,7 +848,7 @@ where
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
LightClientRequestHandler::new(&protocol_id, client.clone());
spawn_handle.spawn("light_client_request_handler", handler.run());
spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
protocol_config
}
};
@@ -852,13 +858,13 @@ where
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
}))
},
transactions_handler_executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("network-transactions-handler", fut);
spawn_handle.spawn("network-transactions-handler", Some("networking"), fut);
})
},
network_config: config.network.clone(),
@@ -920,7 +926,7 @@ where
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", async move {
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
+1 -1
View File
@@ -75,7 +75,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
pub use task_manager::{SpawnTaskHandle, TaskManager};
pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -38,6 +38,9 @@ mod prometheus_future;
#[cfg(test)]
mod tests;
/// Default task group name.
pub const DEFAULT_GROUP_NAME: &'static str = "default";
/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
@@ -48,31 +51,39 @@ pub struct SpawnTaskHandle {
}
impl SpawnTaskHandle {
/// Spawns the given task with the given name.
/// Spawns the given task with the given name and an optional group name.
/// If group is not specified `DEFAULT_GROUP_NAME` will be used.
///
/// Note that the `name` is a `&'static str`. The reason for this choice is that statistics
/// about this task are getting reported to the Prometheus endpoint (if enabled), and that
/// therefore the set of possible task names must be bounded.
/// Note that the `name`/`group` is a `&'static str`. The reason for this choice is that
/// statistics about this task are getting reported to the Prometheus endpoint (if enabled), and
/// that therefore the set of possible task names must be bounded.
///
/// In other words, it would be a bad idea for someone to do for example
/// `spawn(format!("{:?}", some_public_key))`.
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_inner(name, task, TaskType::Async)
pub fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, group, task, TaskType::Async)
}
/// Spawns the blocking task with the given name. See also `spawn`.
pub fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, task, TaskType::Blocking)
self.spawn_inner(name, group, task, TaskType::Blocking)
}
/// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`.
fn spawn_inner(
&self,
name: &'static str,
group: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
@@ -83,21 +94,23 @@ impl SpawnTaskHandle {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
// If no group is specified use default.
let group = group.unwrap_or(DEFAULT_GROUP_NAME);
// Note that we increase the started counter here and not within the future. This way,
// we could properly visualize on Prometheus situations where the spawning doesn't work.
if let Some(metrics) = &self.metrics {
metrics.tasks_spawned.with_label_values(&[name]).inc();
metrics.tasks_spawned.with_label_values(&[name, group]).inc();
// We do a dummy increase in order for the task to show up in metrics.
metrics.tasks_ended.with_label_values(&[name, "finished"]).inc_by(0);
metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc_by(0);
}
let future = async move {
if let Some(metrics) = metrics {
// Add some wrappers around `task`.
let task = {
let poll_duration = metrics.poll_duration.with_label_values(&[name]);
let poll_start = metrics.poll_start.with_label_values(&[name]);
let poll_duration = metrics.poll_duration.with_label_values(&[name, group]);
let poll_start = metrics.poll_start.with_label_values(&[name, group]);
let inner =
prometheus_future::with_poll_durations(poll_duration, poll_start, task);
// The logic of `AssertUnwindSafe` here is ok considering that we throw
@@ -108,15 +121,15 @@ impl SpawnTaskHandle {
match select(on_exit, task).await {
Either::Right((Err(payload), _)) => {
metrics.tasks_ended.with_label_values(&[name, "panic"]).inc();
metrics.tasks_ended.with_label_values(&[name, "panic", group]).inc();
panic::resume_unwind(payload)
},
Either::Right((Ok(()), _)) => {
metrics.tasks_ended.with_label_values(&[name, "finished"]).inc();
metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc();
},
Either::Left(((), _)) => {
// The `on_exit` has triggered.
metrics.tasks_ended.with_label_values(&[name, "interrupted"]).inc();
metrics.tasks_ended.with_label_values(&[name, "interrupted", group]).inc();
},
}
} else {
@@ -141,12 +154,22 @@ impl SpawnTaskHandle {
}
impl sp_core::traits::SpawnNamed for SpawnTaskHandle {
fn spawn_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn_blocking(name, future);
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: BoxFuture<'static, ()>,
) {
self.spawn_inner(name, group, future, TaskType::Blocking)
}
fn spawn(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn(name, future);
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: BoxFuture<'static, ()>,
) {
self.spawn_inner(name, group, future, TaskType::Async)
}
}
@@ -172,8 +195,13 @@ impl SpawnEssentialTaskHandle {
/// Spawns the given task with the given name.
///
/// See also [`SpawnTaskHandle::spawn`].
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_inner(name, task, TaskType::Async)
pub fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, group, task, TaskType::Async)
}
/// Spawns the blocking task with the given name.
@@ -182,14 +210,16 @@ impl SpawnEssentialTaskHandle {
pub fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, task, TaskType::Blocking)
self.spawn_inner(name, group, task, TaskType::Blocking)
}
fn spawn_inner(
&self,
name: &'static str,
group: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
@@ -199,17 +229,27 @@ impl SpawnEssentialTaskHandle {
let _ = essential_failed.close_channel();
});
let _ = self.inner.spawn_inner(name, essential_task, task_type);
let _ = self.inner.spawn_inner(name, group, essential_task, task_type);
}
}
impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle {
fn spawn_essential_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn_blocking(name, future);
fn spawn_essential_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: BoxFuture<'static, ()>,
) {
self.spawn_blocking(name, group, future);
}
fn spawn_essential(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn(name, future);
fn spawn_essential(
&self,
name: &'static str,
group: Option<&'static str>,
future: BoxFuture<'static, ()>,
) {
self.spawn(name, group, future);
}
}
@@ -396,28 +436,28 @@ impl Metrics {
buckets: exponential_buckets(0.001, 4.0, 9)
.expect("function parameters are constant and always valid; qed"),
},
&["task_name"]
&["task_name", "task_group"]
)?, registry)?,
poll_start: register(CounterVec::new(
Opts::new(
"tasks_polling_started_total",
"Total number of times we started invoking Future::poll"
),
&["task_name"]
&["task_name", "task_group"]
)?, registry)?,
tasks_spawned: register(CounterVec::new(
Opts::new(
"tasks_spawned_total",
"Total number of tasks that have been spawned on the Service"
),
&["task_name"]
&["task_name", "task_group"]
)?, registry)?,
tasks_ended: register(CounterVec::new(
Opts::new(
"tasks_ended_total",
"Total number of tasks for which Future::poll has returned Ready(()) or panicked"
),
&["task_name", "reason"]
&["task_name", "reason", "task_group"]
)?, registry)?,
})
}
@@ -96,8 +96,8 @@ fn ensure_tasks_are_awaited_on_shutdown() {
let task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
@@ -115,7 +115,7 @@ fn ensure_keep_alive_during_shutdown() {
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
task_manager.keep_alive(drop_tester.new_ref());
spawn_handle.spawn("task1", run_background_task(()));
spawn_handle.spawn("task1", None, run_background_task(()));
assert_eq!(drop_tester, 1);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
@@ -134,10 +134,12 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() {
let drop_tester = DropTester::new();
spawn_handle.spawn(
"task1",
None,
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
spawn_handle.spawn(
"task2",
None,
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
assert_eq!(drop_tester, 2);
@@ -156,14 +158,14 @@ fn ensure_no_task_can_be_spawn_after_terminate() {
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
task_manager.terminate();
spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task3", None, run_background_task(drop_tester.new_ref()));
runtime.block_on(task_manager.clean_shutdown());
drop_tester.wait_on_drop();
}
@@ -176,8 +178,8 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() {
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
@@ -197,13 +199,13 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() {
let spawn_handle = task_manager.spawn_handle();
let spawn_essential_handle = task_manager.spawn_essential_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
spawn_essential_handle.spawn("task3", async { panic!("task failed") });
spawn_essential_handle.spawn("task3", None, async { panic!("task failed") });
runtime
.block_on(task_manager.future())
.expect_err("future()'s Result must be Err");
@@ -226,10 +228,10 @@ fn ensure_children_tasks_ends_when_task_manager_terminated() {
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
@@ -255,15 +257,15 @@ fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails()
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_essential_handle_child_1.spawn("task5", async { panic!("task failed") });
spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") });
runtime
.block_on(task_manager.future())
.expect_err("future()'s Result must be Err");
@@ -286,15 +288,15 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_handle_child_1.spawn("task5", async { panic!("task failed") });
spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") });
runtime.block_on(async {
let t1 = task_manager.future().fuse();
let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse();
@@ -64,6 +64,7 @@ fn spawn_validation_pool_task(
) {
spawner.spawn_essential_blocking(
name,
Some("transaction-pool"),
async move {
loop {
let task = receiver.lock().await.next().await;
+1 -1
View File
@@ -217,7 +217,7 @@ where
};
if let Some(background_task) = background_task {
spawner.spawn_essential("txpool-background", background_task);
spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
}
Self {
+19 -3
View File
@@ -152,10 +152,20 @@ impl Default for TaskExecutor {
#[cfg(feature = "std")]
impl crate::traits::SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_name: &'static str,
_group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_name: &'static str,
_group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}
@@ -165,11 +175,17 @@ impl crate::traits::SpawnEssentialNamed for TaskExecutor {
fn spawn_essential_blocking(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
fn spawn_essential(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_essential(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}
+50 -17
View File
@@ -190,58 +190,91 @@ sp_externalities::decl_extension! {
pub struct RuntimeSpawnExt(Box<dyn RuntimeSpawn>);
}
/// Something that can spawn tasks (blocking and non-blocking) with an assigned name.
/// Something that can spawn tasks (blocking and non-blocking) with an assigned name
/// and optional group.
#[dyn_clonable::clonable]
pub trait SpawnNamed: Clone + Send + Sync {
/// Spawn the given blocking future.
///
/// The given `name` is used to identify the future in tracing.
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
/// The given `group` and `name` is used to identify the future in tracing.
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
);
/// Spawn the given non-blocking future.
///
/// The given `name` is used to identify the future in tracing.
fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
/// The given `group` and `name` is used to identify the future in tracing.
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
);
}
impl SpawnNamed for Box<dyn SpawnNamed> {
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn_blocking(name, future)
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
(**self).spawn_blocking(name, group, future)
}
fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn(name, future)
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
(**self).spawn(name, group, future)
}
}
/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name.
/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name
/// and optional group.
///
/// Essential tasks are special tasks that should take down the node when they end.
#[dyn_clonable::clonable]
pub trait SpawnEssentialNamed: Clone + Send + Sync {
/// Spawn the given blocking future.
///
/// The given `name` is used to identify the future in tracing.
/// The given `group` and `name` is used to identify the future in tracing.
fn spawn_essential_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
);
/// Spawn the given non-blocking future.
///
/// The given `name` is used to identify the future in tracing.
fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
/// The given `group` and `name` is used to identify the future in tracing.
fn spawn_essential(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
);
}
impl SpawnEssentialNamed for Box<dyn SpawnEssentialNamed> {
fn spawn_essential_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
(**self).spawn_essential_blocking(name, future)
(**self).spawn_essential_blocking(name, group, future)
}
fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn_essential(name, future)
fn spawn_essential(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
(**self).spawn_essential(name, group, future)
}
}
@@ -74,6 +74,7 @@ impl BatchVerifier {
self.scheduler.spawn(
name,
None,
async move {
if !f() {
invalid_clone.store(true, AtomicOrdering::Relaxed);
@@ -177,7 +178,8 @@ impl BatchVerifier {
if pending.len() > 0 {
let (sender, receiver) = std::sync::mpsc::channel();
self.scheduler.spawn(
"substrate_batch_verify_join",
"substrate-batch-verify-join",
None,
async move {
futures::future::join_all(pending).await;
sender.send(()).expect(
+1
View File
@@ -95,6 +95,7 @@ mod inner {
let extra_scheduler = scheduler.clone();
scheduler.spawn(
"parallel-runtime-spawn",
Some("substrate-runtime"),
Box::pin(async move {
let result = match crate::new_async_externalities(extra_scheduler) {
Ok(mut ext) => {
@@ -235,7 +235,9 @@ where
});
// spawn the authorship task as an essential task.
task_manager.spawn_essential_handle().spawn("manual-seal", authorship_future);
task_manager
.spawn_essential_handle()
.spawn("manual-seal", None, authorship_future);
network_starter.start_network();
let rpc_handler = rpc_handlers.io_handler();