mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 04:31:08 +00:00
Augment every task spawned by Service with on_exit (#3581)
* Augment every task spawned by Service with `on_exit` * Add CI test that the node exits
This commit is contained in:
@@ -216,6 +216,13 @@ check-web-wasm:
|
|||||||
- time cargo web build -p substrate-trie
|
- time cargo web build -p substrate-trie
|
||||||
- sccache -s
|
- sccache -s
|
||||||
|
|
||||||
|
node-exits:
|
||||||
|
stage: test
|
||||||
|
<<: *docker-env
|
||||||
|
except:
|
||||||
|
- /^v[0-9]+\.[0-9]+.*$/ # i.e. v1.0, v2.1rc1
|
||||||
|
script:
|
||||||
|
- ./ci/check_for_exit.sh
|
||||||
|
|
||||||
#### stage: build
|
#### stage: build
|
||||||
|
|
||||||
|
|||||||
Executable
+16
@@ -0,0 +1,16 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# Script that checks that a node exits after `SIGINT` was send.
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
cargo build --release
|
||||||
|
./target/release/substrate --dev &
|
||||||
|
PID=$!
|
||||||
|
|
||||||
|
# Let the chain running for 60 seconds
|
||||||
|
sleep 60
|
||||||
|
|
||||||
|
# Send `SIGINT` and give the process 30 seconds to end
|
||||||
|
kill -INT $PID
|
||||||
|
timeout 30 tail --pid=$PID -f /dev/null
|
||||||
@@ -111,13 +111,15 @@ pub type TaskExecutor = Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> +
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SpawnTaskHandle {
|
pub struct SpawnTaskHandle {
|
||||||
sender: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
|
sender: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
|
||||||
|
on_exit: exit_future::Exit,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle {
|
impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle {
|
||||||
fn execute(
|
fn execute(
|
||||||
&self,
|
&self,
|
||||||
future: Box<dyn Future<Item = (), Error = ()> + Send>
|
future: Box<dyn Future<Item = (), Error = ()> + Send>,
|
||||||
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
|
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
|
||||||
|
let future = Box::new(future.select(self.on_exit.clone()).then(|_| Ok(())));
|
||||||
if let Err(err) = self.sender.unbounded_send(future) {
|
if let Err(err) = self.sender.unbounded_send(future) {
|
||||||
let kind = futures::future::ExecuteErrorKind::Shutdown;
|
let kind = futures::future::ExecuteErrorKind::Shutdown;
|
||||||
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
|
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
|
||||||
@@ -350,7 +352,7 @@ macro_rules! new_impl {
|
|||||||
//light_components.clone(),
|
//light_components.clone(),
|
||||||
system_rpc_tx.clone(),
|
system_rpc_tx.clone(),
|
||||||
system_info.clone(),
|
system_info.clone(),
|
||||||
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone() }),
|
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
|
||||||
transaction_pool.clone(),
|
transaction_pool.clone(),
|
||||||
rpc_extensions.clone(),
|
rpc_extensions.clone(),
|
||||||
keystore.clone(),
|
keystore.clone(),
|
||||||
@@ -544,22 +546,25 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
|
fn spawn_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
|
||||||
|
let task = task.select(self.on_exit()).then(|_| Ok(()));
|
||||||
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
|
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
|
fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
|
||||||
let essential_failed = self.essential_failed.clone();
|
let essential_failed = self.essential_failed.clone();
|
||||||
let essential_task = Box::new(task.map_err(move |_| {
|
let essential_task = task.map_err(move |_| {
|
||||||
error!("Essential task failed. Shutting down service.");
|
error!("Essential task failed. Shutting down service.");
|
||||||
essential_failed.store(true, Ordering::Relaxed);
|
essential_failed.store(true, Ordering::Relaxed);
|
||||||
}));
|
});
|
||||||
|
let task = essential_task.select(self.on_exit()).then(|_| Ok(()));
|
||||||
|
|
||||||
let _ = self.to_spawn_tx.unbounded_send(essential_task);
|
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_task_handle(&self) -> SpawnTaskHandle {
|
fn spawn_task_handle(&self) -> SpawnTaskHandle {
|
||||||
SpawnTaskHandle {
|
SpawnTaskHandle {
|
||||||
sender: self.to_spawn_tx.clone(),
|
sender: self.to_spawn_tx.clone(),
|
||||||
|
on_exit: self.on_exit(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -589,7 +594,7 @@ where
|
|||||||
self.transaction_pool.clone()
|
self.transaction_pool.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_exit(&self) -> ::exit_future::Exit {
|
fn on_exit(&self) -> exit_future::Exit {
|
||||||
self.exit.clone()
|
self.exit.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -102,7 +102,6 @@ macro_rules! new_full_start {
|
|||||||
/// concrete types instead.
|
/// concrete types instead.
|
||||||
macro_rules! new_full {
|
macro_rules! new_full {
|
||||||
($config:expr) => {{
|
($config:expr) => {{
|
||||||
use futures::Future;
|
|
||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
use network::DhtEvent;
|
use network::DhtEvent;
|
||||||
|
|
||||||
@@ -118,7 +117,7 @@ macro_rules! new_full {
|
|||||||
$config.disable_grandpa
|
$config.disable_grandpa
|
||||||
);
|
);
|
||||||
|
|
||||||
let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!($config);
|
let (builder, mut import_setup, inherent_data_providers, tasks_to_spawn) = new_full_start!($config);
|
||||||
|
|
||||||
// Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
|
// Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
|
||||||
// back-pressure. Authority discovery is triggering one event per authority within the current authority set.
|
// back-pressure. Authority discovery is triggering one event per authority within the current authority set.
|
||||||
@@ -138,13 +137,7 @@ macro_rules! new_full {
|
|||||||
.expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
|
.expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
|
||||||
|
|
||||||
// spawn any futures that were created in the previous setup steps
|
// spawn any futures that were created in the previous setup steps
|
||||||
for task in tasks_to_spawn.drain(..) {
|
tasks_to_spawn.into_iter().for_each(|t| service.spawn_task(t));
|
||||||
service.spawn_task(
|
|
||||||
task.select(service.on_exit())
|
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|_| ())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if is_authority {
|
if is_authority {
|
||||||
let proposer = substrate_basic_authorship::ProposerFactory {
|
let proposer = substrate_basic_authorship::ProposerFactory {
|
||||||
@@ -170,15 +163,14 @@ macro_rules! new_full {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let babe = babe::start_babe(babe_config)?;
|
let babe = babe::start_babe(babe_config)?;
|
||||||
let select = babe.select(service.on_exit()).then(|_| Ok(()));
|
service.spawn_essential_task(babe);
|
||||||
service.spawn_task(Box::new(select));
|
|
||||||
|
|
||||||
let authority_discovery = authority_discovery::AuthorityDiscovery::new(
|
let authority_discovery = authority_discovery::AuthorityDiscovery::new(
|
||||||
service.client(),
|
service.client(),
|
||||||
service.network(),
|
service.network(),
|
||||||
dht_event_rx,
|
dht_event_rx,
|
||||||
);
|
);
|
||||||
service.spawn_task(Box::new(authority_discovery));
|
service.spawn_task(authority_discovery);
|
||||||
}
|
}
|
||||||
|
|
||||||
let config = grandpa::Config {
|
let config = grandpa::Config {
|
||||||
|
|||||||
Reference in New Issue
Block a user