mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-21 05:11:02 +00:00
Allow task manager to have children (#6771)
* Initial commit Forked at:7df97abab4Parent branch: origin/master * WIP Forked at:7df97abab4Parent branch: origin/master * WIP Forked at:7df97abab4Parent branch: origin/master * WIP Forked at:7df97abab4Parent branch: origin/master * WIP Forked at:7df97abab4Parent branch: origin/master * WIP Forked at:7df97abab4Parent branch: origin/master * WIP Forked at:7df97abab4Parent branch: origin/master * changelog * Remove Box * Make future nicer * Revert "Make future nicer" This reverts commit 49fb8fb6f245c3ca2c384468df14b34f34616736. * Simplify * Additional check * Simplify more Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
@@ -18,7 +18,7 @@ use exit_future::Signal;
|
|||||||
use log::{debug, error};
|
use log::{debug, error};
|
||||||
use futures::{
|
use futures::{
|
||||||
Future, FutureExt, StreamExt,
|
Future, FutureExt, StreamExt,
|
||||||
future::{select, Either, BoxFuture},
|
future::{select, Either, BoxFuture, join_all, try_join_all, pending},
|
||||||
sink::SinkExt,
|
sink::SinkExt,
|
||||||
};
|
};
|
||||||
use prometheus_endpoint::{
|
use prometheus_endpoint::{
|
||||||
@@ -214,8 +214,14 @@ pub struct TaskManager {
|
|||||||
essential_failed_rx: TracingUnboundedReceiver<()>,
|
essential_failed_rx: TracingUnboundedReceiver<()>,
|
||||||
/// Things to keep alive until the task manager is dropped.
|
/// Things to keep alive until the task manager is dropped.
|
||||||
keep_alive: Box<dyn std::any::Any + Send + Sync>,
|
keep_alive: Box<dyn std::any::Any + Send + Sync>,
|
||||||
|
/// A sender to a stream of background tasks. This is used for the completion future.
|
||||||
task_notifier: TracingUnboundedSender<JoinFuture>,
|
task_notifier: TracingUnboundedSender<JoinFuture>,
|
||||||
|
/// This future will complete when all the tasks are joined and the stream is closed.
|
||||||
completion_future: JoinFuture,
|
completion_future: JoinFuture,
|
||||||
|
/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
|
||||||
|
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
|
||||||
|
/// task fails.
|
||||||
|
children: Vec<TaskManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskManager {
|
impl TaskManager {
|
||||||
@@ -251,6 +257,7 @@ impl TaskManager {
|
|||||||
keep_alive: Box::new(()),
|
keep_alive: Box::new(()),
|
||||||
task_notifier,
|
task_notifier,
|
||||||
completion_future,
|
completion_future,
|
||||||
|
children: Vec::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -271,12 +278,21 @@ impl TaskManager {
|
|||||||
|
|
||||||
/// Send the signal for termination, prevent new tasks to be created, await for all the existing
|
/// Send the signal for termination, prevent new tasks to be created, await for all the existing
|
||||||
/// tasks to be finished and drop the object. You can consider this as an async drop.
|
/// tasks to be finished and drop the object. You can consider this as an async drop.
|
||||||
|
///
|
||||||
|
/// It's always better to call and await this function before exiting the process as background
|
||||||
|
/// tasks may be running in the background. If the process exit and the background tasks are not
|
||||||
|
/// cancelled, this will lead to objects not getting dropped properly.
|
||||||
|
///
|
||||||
|
/// This is an issue in some cases as some of our dependencies do require that we drop all the
|
||||||
|
/// objects properly otherwise it triggers a SIGABRT on exit.
|
||||||
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
|
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
|
||||||
self.terminate();
|
self.terminate();
|
||||||
|
let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown());
|
||||||
let keep_alive = self.keep_alive;
|
let keep_alive = self.keep_alive;
|
||||||
let completion_future = self.completion_future;
|
let completion_future = self.completion_future;
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
join_all(children_shutdowns).await;
|
||||||
completion_future.await;
|
completion_future.await;
|
||||||
drop(keep_alive);
|
drop(keep_alive);
|
||||||
})
|
})
|
||||||
@@ -293,10 +309,17 @@ impl TaskManager {
|
|||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut t1 = self.essential_failed_rx.next().fuse();
|
let mut t1 = self.essential_failed_rx.next().fuse();
|
||||||
let mut t2 = self.on_exit.clone().fuse();
|
let mut t2 = self.on_exit.clone().fuse();
|
||||||
|
let mut t3 = try_join_all(
|
||||||
|
self.children.iter_mut().map(|x| x.future())
|
||||||
|
// Never end this future if there is no error because if there is no children,
|
||||||
|
// it must not stop
|
||||||
|
.chain(std::iter::once(pending().boxed()))
|
||||||
|
).fuse();
|
||||||
|
|
||||||
futures::select! {
|
futures::select! {
|
||||||
_ = t1 => Err(Error::Other("Essential task failed.".into())),
|
_ = t1 => Err(Error::Other("Essential task failed.".into())),
|
||||||
_ = t2 => Ok(()),
|
_ = t2 => Ok(()),
|
||||||
|
res = t3 => Err(res.map(|_| ()).expect_err("this future never ends; qed")),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -305,15 +328,25 @@ impl TaskManager {
|
|||||||
pub fn terminate(&mut self) {
|
pub fn terminate(&mut self) {
|
||||||
if let Some(signal) = self.signal.take() {
|
if let Some(signal) = self.signal.take() {
|
||||||
let _ = signal.fire();
|
let _ = signal.fire();
|
||||||
// NOTE: task will prevent new tasks to be spawned
|
// NOTE: this will prevent new tasks to be spawned
|
||||||
self.task_notifier.close_channel();
|
self.task_notifier.close_channel();
|
||||||
|
for child in self.children.iter_mut() {
|
||||||
|
child.terminate();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set what the task manager should keep alivei
|
/// Set what the task manager should keep alive.
|
||||||
pub(super) fn keep_alive<T: 'static + Send + Sync>(&mut self, to_keep_alive: T) {
|
pub(super) fn keep_alive<T: 'static + Send + Sync>(&mut self, to_keep_alive: T) {
|
||||||
self.keep_alive = Box::new(to_keep_alive);
|
self.keep_alive = Box::new(to_keep_alive);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register another TaskManager to terminate and gracefully shutdown when the parent
|
||||||
|
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
|
||||||
|
/// task fails. (But don't end the parent if a child's normal task fails.)
|
||||||
|
pub fn add_children(&mut self, child: TaskManager) {
|
||||||
|
self.children.push(child);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
use crate::config::TaskExecutor;
|
use crate::config::TaskExecutor;
|
||||||
use crate::task_manager::TaskManager;
|
use crate::task_manager::TaskManager;
|
||||||
use futures::future::FutureExt;
|
use futures::{future::FutureExt, pin_mut, select};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -82,7 +82,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn ensure_futures_are_awaited_on_shutdown() {
|
fn ensure_tasks_are_awaited_on_shutdown() {
|
||||||
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||||
let handle = runtime.handle().clone();
|
let handle = runtime.handle().clone();
|
||||||
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
||||||
@@ -187,7 +187,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() {
|
fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() {
|
||||||
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||||
let handle = runtime.handle().clone();
|
let handle = runtime.handle().clone();
|
||||||
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
||||||
@@ -208,3 +208,103 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() {
|
|||||||
runtime.block_on(task_manager.clean_shutdown());
|
runtime.block_on(task_manager.clean_shutdown());
|
||||||
assert_eq!(drop_tester, 0);
|
assert_eq!(drop_tester, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ensure_children_tasks_ends_when_task_manager_terminated() {
|
||||||
|
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let handle = runtime.handle().clone();
|
||||||
|
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
||||||
|
|
||||||
|
let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let child_1 = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let spawn_handle_child_1 = child_1.spawn_handle();
|
||||||
|
let child_2 = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let spawn_handle_child_2 = child_2.spawn_handle();
|
||||||
|
task_manager.add_children(child_1);
|
||||||
|
task_manager.add_children(child_2);
|
||||||
|
let spawn_handle = task_manager.spawn_handle();
|
||||||
|
let drop_tester = DropTester::new();
|
||||||
|
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
// allow the tasks to even start
|
||||||
|
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
task_manager.terminate();
|
||||||
|
runtime.block_on(task_manager.future()).expect("future has ended without error");
|
||||||
|
runtime.block_on(task_manager.clean_shutdown());
|
||||||
|
assert_eq!(drop_tester, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() {
|
||||||
|
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let handle = runtime.handle().clone();
|
||||||
|
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
||||||
|
|
||||||
|
let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let child_1 = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let spawn_handle_child_1 = child_1.spawn_handle();
|
||||||
|
let spawn_essential_handle_child_1 = child_1.spawn_essential_handle();
|
||||||
|
let child_2 = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let spawn_handle_child_2 = child_2.spawn_handle();
|
||||||
|
task_manager.add_children(child_1);
|
||||||
|
task_manager.add_children(child_2);
|
||||||
|
let spawn_handle = task_manager.spawn_handle();
|
||||||
|
let drop_tester = DropTester::new();
|
||||||
|
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
// allow the tasks to even start
|
||||||
|
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
spawn_essential_handle_child_1.spawn("task5", async { panic!("task failed") });
|
||||||
|
runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err");
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
runtime.block_on(task_manager.clean_shutdown());
|
||||||
|
assert_eq!(drop_tester, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
|
||||||
|
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let handle = runtime.handle().clone();
|
||||||
|
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
|
||||||
|
|
||||||
|
let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let child_1 = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let spawn_handle_child_1 = child_1.spawn_handle();
|
||||||
|
let child_2 = TaskManager::new(task_executor.clone(), None).unwrap();
|
||||||
|
let spawn_handle_child_2 = child_2.spawn_handle();
|
||||||
|
task_manager.add_children(child_1);
|
||||||
|
task_manager.add_children(child_2);
|
||||||
|
let spawn_handle = task_manager.spawn_handle();
|
||||||
|
let drop_tester = DropTester::new();
|
||||||
|
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
|
||||||
|
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
// allow the tasks to even start
|
||||||
|
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
spawn_handle_child_1.spawn("task5", async { panic!("task failed") });
|
||||||
|
runtime.block_on(async {
|
||||||
|
let t1 = task_manager.future().fuse();
|
||||||
|
let t2 = tokio::time::delay_for(Duration::from_secs(3)).fuse();
|
||||||
|
|
||||||
|
pin_mut!(t1, t2);
|
||||||
|
|
||||||
|
select! {
|
||||||
|
res = t1 => panic!("task should not have stopped: {:?}", res),
|
||||||
|
_ = t2 => {},
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(drop_tester, 4);
|
||||||
|
runtime.block_on(task_manager.clean_shutdown());
|
||||||
|
assert_eq!(drop_tester, 0);
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,11 @@ The format is based on [Keep a Changelog].
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
Client
|
||||||
|
------
|
||||||
|
|
||||||
|
* Child nodes can be handled by adding a child `TaskManager` to the parent's `TaskManager` (#6771)
|
||||||
|
|
||||||
## 2.0.0-rc4 -> 2.0.0-rc5 – River Dolphin
|
## 2.0.0-rc4 -> 2.0.0-rc5 – River Dolphin
|
||||||
|
|
||||||
Runtime
|
Runtime
|
||||||
|
|||||||
Reference in New Issue
Block a user