mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 05:11:09 +00:00
Task manager for background/async tasks in service (#5092)
* Reorganize tasks into task manager * move to separate file and improve api * address api issues * fix spawning inside closures * decouple executor * tasks_setup -> tasks_builder * remove drops * add deprecatiion comment * add pub(super) * fix identation
This commit is contained in:
@@ -15,7 +15,7 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm};
|
||||
use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter};
|
||||
use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter};
|
||||
use crate::status_sinks;
|
||||
use crate::config::{Configuration, DatabaseConfig, KeystoreConfig};
|
||||
use sc_client_api::{
|
||||
@@ -30,7 +30,7 @@ use sp_consensus::import_queue::ImportQueue;
|
||||
use futures::{
|
||||
Future, FutureExt, StreamExt,
|
||||
channel::mpsc,
|
||||
future::{select, ready}
|
||||
future::ready,
|
||||
};
|
||||
use sc_keystore::{Store as Keystore};
|
||||
use log::{info, warn, error};
|
||||
@@ -44,7 +44,6 @@ use sp_runtime::traits::{
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sc_executor::{NativeExecutor, NativeExecutionDispatch};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
io::{Read, Write, Seek},
|
||||
marker::PhantomData, sync::Arc, pin::Pin
|
||||
};
|
||||
@@ -117,6 +116,7 @@ pub struct ServiceBuilder<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TF
|
||||
config: Configuration<TGen, TCSExt>,
|
||||
pub (crate) client: Arc<TCl>,
|
||||
backend: Arc<Backend>,
|
||||
tasks_builder: TaskManagerBuilder,
|
||||
keystore: Arc<RwLock<Keystore>>,
|
||||
fetcher: Option<TFchr>,
|
||||
select_chain: Option<TSc>,
|
||||
@@ -181,6 +181,7 @@ type TFullParts<TBl, TRtApi, TExecDisp> = (
|
||||
TFullClient<TBl, TRtApi, TExecDisp>,
|
||||
Arc<TFullBackend<TBl>>,
|
||||
Arc<RwLock<sc_keystore::Store>>,
|
||||
TaskManagerBuilder,
|
||||
);
|
||||
|
||||
/// Creates a new full client for the given config.
|
||||
@@ -212,6 +213,8 @@ fn new_full_parts<TBl, TRtApi, TExecDisp, TGen, TCSExt>(
|
||||
KeystoreConfig::None => return Err("No keystore config provided!".into()),
|
||||
};
|
||||
|
||||
let tasks_builder = TaskManagerBuilder::new();
|
||||
|
||||
let executor = NativeExecutor::<TExecDisp>::new(
|
||||
config.wasm_method,
|
||||
config.default_heap_pages,
|
||||
@@ -262,7 +265,7 @@ fn new_full_parts<TBl, TRtApi, TExecDisp, TGen, TCSExt>(
|
||||
)?
|
||||
};
|
||||
|
||||
Ok((client, backend, keystore))
|
||||
Ok((client, backend, keystore, tasks_builder))
|
||||
}
|
||||
|
||||
impl<TGen, TCSExt> ServiceBuilder<(), (), TGen, TCSExt, (), (), (), (), (), (), (), (), ()>
|
||||
@@ -285,7 +288,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
|
||||
(),
|
||||
TFullBackend<TBl>,
|
||||
>, Error> {
|
||||
let (client, backend, keystore) = new_full_parts(&config)?;
|
||||
let (client, backend, keystore, tasks_builder) = new_full_parts(&config)?;
|
||||
|
||||
let client = Arc::new(client);
|
||||
|
||||
@@ -294,6 +297,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
|
||||
client,
|
||||
backend,
|
||||
keystore,
|
||||
tasks_builder,
|
||||
fetcher: None,
|
||||
select_chain: None,
|
||||
import_queue: (),
|
||||
@@ -326,6 +330,8 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
|
||||
(),
|
||||
TLightBackend<TBl>,
|
||||
>, Error> {
|
||||
let tasks_builder = TaskManagerBuilder::new();
|
||||
|
||||
let keystore = match &config.keystore {
|
||||
KeystoreConfig::Path { path, password } => Keystore::open(
|
||||
path.clone(),
|
||||
@@ -378,6 +384,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
|
||||
config,
|
||||
client,
|
||||
backend,
|
||||
tasks_builder,
|
||||
keystore,
|
||||
fetcher: Some(fetcher.clone()),
|
||||
select_chain: None,
|
||||
@@ -451,6 +458,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain,
|
||||
@@ -494,6 +502,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -534,6 +543,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -598,6 +608,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -657,6 +668,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
Ok(ServiceBuilder {
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
tasks_builder: self.tasks_builder,
|
||||
backend: self.backend,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
@@ -686,6 +698,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -707,6 +720,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -819,6 +833,7 @@ ServiceBuilder<
|
||||
marker: _,
|
||||
mut config,
|
||||
client,
|
||||
tasks_builder,
|
||||
fetcher: on_demand,
|
||||
backend,
|
||||
keystore,
|
||||
@@ -839,12 +854,6 @@ ServiceBuilder<
|
||||
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
|
||||
)?;
|
||||
|
||||
let (signal, exit) = exit_future::signal();
|
||||
|
||||
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
|
||||
let (to_spawn_tx, to_spawn_rx) =
|
||||
mpsc::unbounded::<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>();
|
||||
|
||||
// A side-channel for essential tasks to communicate shutdown.
|
||||
let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded();
|
||||
|
||||
@@ -869,7 +878,7 @@ ServiceBuilder<
|
||||
imports_external_transactions: !config.roles.is_light(),
|
||||
pool: transaction_pool.clone(),
|
||||
client: client.clone(),
|
||||
executor: SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() },
|
||||
executor: tasks_builder.spawn_handle(),
|
||||
});
|
||||
|
||||
let protocol_id = {
|
||||
@@ -899,11 +908,9 @@ ServiceBuilder<
|
||||
let network_params = sc_network::config::Params {
|
||||
roles: config.roles,
|
||||
executor: {
|
||||
let to_spawn_tx = to_spawn_tx.clone();
|
||||
let spawn_handle = tasks_builder.spawn_handle();
|
||||
Some(Box::new(move |fut| {
|
||||
if let Err(e) = to_spawn_tx.unbounded_send((fut, From::from("libp2p-node"))) {
|
||||
error!("Failed to spawn libp2p background task: {:?}", e);
|
||||
}
|
||||
spawn_handle.spawn("libp2p-node", fut);
|
||||
}))
|
||||
},
|
||||
network_config: config.network.clone(),
|
||||
@@ -935,20 +942,19 @@ ServiceBuilder<
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let spawn_handle = tasks_builder.spawn_handle();
|
||||
|
||||
// Spawn background tasks which were stacked during the
|
||||
// service building.
|
||||
for (title, background_task) in background_tasks {
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
background_task,
|
||||
title.into(),
|
||||
));
|
||||
spawn_handle.spawn(title, background_task);
|
||||
}
|
||||
|
||||
{
|
||||
// block notifications
|
||||
let txpool = Arc::downgrade(&transaction_pool);
|
||||
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
|
||||
let to_spawn_tx_ = to_spawn_tx.clone();
|
||||
let notifications_spawn_handle = tasks_builder.spawn_handle();
|
||||
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
|
||||
let is_validator = config.roles.is_authority();
|
||||
|
||||
@@ -970,15 +976,14 @@ ServiceBuilder<
|
||||
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
|
||||
match offchain {
|
||||
Some(offchain) if is_new_best => {
|
||||
let future = offchain.on_block_imported(
|
||||
&header,
|
||||
network_state_info.clone(),
|
||||
is_validator,
|
||||
notifications_spawn_handle.spawn(
|
||||
"offchain-on-block",
|
||||
offchain.on_block_imported(
|
||||
&header,
|
||||
network_state_info.clone(),
|
||||
is_validator,
|
||||
),
|
||||
);
|
||||
let _ = to_spawn_tx_.unbounded_send((
|
||||
Box::pin(future),
|
||||
From::from("offchain-on-block"),
|
||||
));
|
||||
},
|
||||
Some(_) => log::debug!(
|
||||
target: "sc_offchain",
|
||||
@@ -991,20 +996,19 @@ ServiceBuilder<
|
||||
|
||||
let txpool = txpool.upgrade();
|
||||
if let Some(txpool) = txpool.as_ref() {
|
||||
let future = txpool.maintain(event);
|
||||
let _ = to_spawn_tx_.unbounded_send((
|
||||
Box::pin(future),
|
||||
From::from("txpool-maintain")
|
||||
));
|
||||
notifications_spawn_handle.spawn(
|
||||
"txpool-maintain",
|
||||
txpool.maintain(event),
|
||||
);
|
||||
}
|
||||
|
||||
ready(())
|
||||
});
|
||||
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
Box::pin(select(events, exit.clone()).map(drop)),
|
||||
From::from("txpool-and-offchain-notif"),
|
||||
));
|
||||
spawn_handle.spawn(
|
||||
"txpool-and-offchain-notif",
|
||||
events,
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
@@ -1024,28 +1028,20 @@ ServiceBuilder<
|
||||
ready(())
|
||||
});
|
||||
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
Box::pin(select(events, exit.clone()).map(drop)),
|
||||
From::from("telemetry-on-block"),
|
||||
));
|
||||
spawn_handle.spawn(
|
||||
"telemetry-on-block",
|
||||
events,
|
||||
);
|
||||
}
|
||||
|
||||
// Prometheus metrics
|
||||
let metrics = if let Some((registry, port)) = prometheus_registry_and_port.clone() {
|
||||
let metrics = ServiceMetrics::register(®istry)?;
|
||||
|
||||
metrics.node_roles.set(u64::from(config.roles.bits()));
|
||||
|
||||
let future = select(
|
||||
prometheus_endpoint::init_prometheus(port, registry).boxed(),
|
||||
exit.clone()
|
||||
).map(drop);
|
||||
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
Box::pin(future),
|
||||
From::from("prometheus-endpoint")
|
||||
));
|
||||
|
||||
spawn_handle.spawn(
|
||||
"prometheus-endpoint",
|
||||
prometheus_endpoint::init_prometheus(port, registry).map(drop)
|
||||
);
|
||||
Some(metrics)
|
||||
} else {
|
||||
None
|
||||
@@ -1123,10 +1119,10 @@ ServiceBuilder<
|
||||
ready(())
|
||||
});
|
||||
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
Box::pin(select(tel_task, exit.clone()).map(drop)),
|
||||
From::from("telemetry-periodic-send"),
|
||||
));
|
||||
spawn_handle.spawn(
|
||||
"telemetry-periodic-send",
|
||||
tel_task,
|
||||
);
|
||||
|
||||
// Periodically send the network state to the telemetry.
|
||||
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
|
||||
@@ -1139,10 +1135,10 @@ ServiceBuilder<
|
||||
);
|
||||
ready(())
|
||||
});
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
Box::pin(select(tel_task_2, exit.clone()).map(drop)),
|
||||
From::from("telemetry-periodic-network-state"),
|
||||
));
|
||||
spawn_handle.spawn(
|
||||
"telemetry-periodic-network-state",
|
||||
tel_task_2,
|
||||
);
|
||||
|
||||
// RPC
|
||||
let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded();
|
||||
@@ -1156,10 +1152,7 @@ ServiceBuilder<
|
||||
properties: chain_spec.properties().clone(),
|
||||
};
|
||||
|
||||
let subscriptions = sc_rpc::Subscriptions::new(Arc::new(SpawnTaskHandle {
|
||||
sender: to_spawn_tx.clone(),
|
||||
on_exit: exit.clone()
|
||||
}));
|
||||
let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle()));
|
||||
|
||||
let (chain, state) = if let (Some(remote_backend), Some(on_demand)) =
|
||||
(remote_backend.as_ref(), on_demand.as_ref()) {
|
||||
@@ -1217,18 +1210,17 @@ ServiceBuilder<
|
||||
let rpc_handlers = gen_handler();
|
||||
let rpc = start_rpc_servers(&config, gen_handler)?;
|
||||
|
||||
|
||||
let _ = to_spawn_tx.unbounded_send((
|
||||
Box::pin(select(build_network_future(
|
||||
spawn_handle.spawn(
|
||||
"network-worker",
|
||||
build_network_future(
|
||||
config.roles,
|
||||
network_mut,
|
||||
client.clone(),
|
||||
network_status_sinks.clone(),
|
||||
system_rpc_rx,
|
||||
has_bootnodes,
|
||||
), exit.clone()).map(drop)),
|
||||
From::from("network-worker"),
|
||||
));
|
||||
),
|
||||
);
|
||||
|
||||
let telemetry_connection_sinks: Arc<Mutex<Vec<futures::channel::mpsc::UnboundedSender<()>>>> = Default::default();
|
||||
|
||||
@@ -1269,9 +1261,12 @@ ServiceBuilder<
|
||||
});
|
||||
ready(())
|
||||
});
|
||||
let _ = to_spawn_tx.unbounded_send((Box::pin(select(
|
||||
future, exit.clone()
|
||||
).map(drop)), From::from("telemetry-worker")));
|
||||
|
||||
spawn_handle.spawn(
|
||||
"telemetry-worker",
|
||||
future,
|
||||
);
|
||||
|
||||
telemetry
|
||||
});
|
||||
|
||||
@@ -1288,21 +1283,13 @@ ServiceBuilder<
|
||||
|
||||
Ok(Service {
|
||||
client,
|
||||
task_manager: tasks_builder.into_task_manager(config.task_executor.ok_or(Error::TaskExecutorRequired)?),
|
||||
network,
|
||||
network_status_sinks,
|
||||
select_chain,
|
||||
transaction_pool,
|
||||
exit,
|
||||
signal: Some(signal),
|
||||
essential_failed_tx,
|
||||
essential_failed_rx,
|
||||
to_spawn_tx,
|
||||
to_spawn_rx,
|
||||
task_executor: if let Some(exec) = config.task_executor {
|
||||
exec
|
||||
} else {
|
||||
return Err(Error::TaskExecutorRequired);
|
||||
},
|
||||
rpc_handlers,
|
||||
_rpc: rpc,
|
||||
_telemetry: telemetry,
|
||||
|
||||
@@ -26,6 +26,7 @@ pub mod error;
|
||||
|
||||
mod builder;
|
||||
mod status_sinks;
|
||||
mod task_manager;
|
||||
|
||||
use std::{borrow::Cow, io, pin::Pin};
|
||||
use std::marker::PhantomData;
|
||||
@@ -37,10 +38,9 @@ use std::task::{Poll, Context};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use sc_client::Client;
|
||||
use exit_future::Signal;
|
||||
use futures::{
|
||||
Future, FutureExt, Stream, StreamExt,
|
||||
future::select, channel::mpsc,
|
||||
channel::mpsc,
|
||||
compat::*,
|
||||
sink::SinkExt,
|
||||
task::{Spawn, FutureObj, SpawnError},
|
||||
@@ -69,6 +69,8 @@ pub use sc_executor::NativeExecutionDispatch;
|
||||
pub use std::{ops::Deref, result::Result, sync::Arc};
|
||||
#[doc(hidden)]
|
||||
pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
|
||||
pub use task_manager::{TaskManagerBuilder, SpawnTaskHandle};
|
||||
use task_manager::TaskManager;
|
||||
|
||||
const DEFAULT_PROTOCOL_ID: &str = "sup";
|
||||
|
||||
@@ -85,28 +87,18 @@ impl<T> MallocSizeOfWasm for T {}
|
||||
/// Substrate service.
|
||||
pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
|
||||
client: Arc<TCl>,
|
||||
task_manager: TaskManager,
|
||||
select_chain: Option<TSc>,
|
||||
network: Arc<TNet>,
|
||||
/// Sinks to propagate network status updates.
|
||||
/// For each element, every time the `Interval` fires we push an element on the sender.
|
||||
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(TNetStatus, NetworkState)>>>,
|
||||
transaction_pool: Arc<TTxPool>,
|
||||
/// A future that resolves when the service has exited, this is useful to
|
||||
/// make sure any internally spawned futures stop when the service does.
|
||||
exit: exit_future::Exit,
|
||||
/// A signal that makes the exit future above resolve, fired on service drop.
|
||||
signal: Option<Signal>,
|
||||
/// Send a signal when a spawned essential task has concluded. The next time
|
||||
/// the service future is polled it should complete with an error.
|
||||
essential_failed_tx: mpsc::UnboundedSender<()>,
|
||||
/// A receiver for spawned essential-tasks concluding.
|
||||
essential_failed_rx: mpsc::UnboundedReceiver<()>,
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
/// How to spawn background tasks.
|
||||
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
|
||||
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
|
||||
_rpc: Box<dyn std::any::Any + Send + Sync>,
|
||||
_telemetry: Option<sc_telemetry::Telemetry>,
|
||||
@@ -119,48 +111,6 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
|
||||
|
||||
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Unpin for Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {}
|
||||
|
||||
/// Alias for a an implementation of `futures::future::Executor`.
|
||||
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
|
||||
|
||||
/// An handle for spawning tasks in the service.
|
||||
#[derive(Clone)]
|
||||
pub struct SpawnTaskHandle {
|
||||
sender: mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
on_exit: exit_future::Exit,
|
||||
}
|
||||
|
||||
impl SpawnTaskHandle {
|
||||
/// Spawns the given task with the given name.
|
||||
pub fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
|
||||
let on_exit = self.on_exit.clone();
|
||||
let future = async move {
|
||||
futures::pin_mut!(task);
|
||||
let _ = select(on_exit, task).await;
|
||||
};
|
||||
if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() {
|
||||
error!("Failed to send task to spawn over channel");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Spawn for SpawnTaskHandle {
|
||||
fn spawn_obj(&self, future: FutureObj<'static, ()>)
|
||||
-> Result<(), SpawnError> {
|
||||
let future = select(self.on_exit.clone(), future).map(drop);
|
||||
self.sender.unbounded_send((Box::pin(future), From::from("unnamed")))
|
||||
.map_err(|_| SpawnError::shutdown())
|
||||
}
|
||||
}
|
||||
|
||||
type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send + 'static>;
|
||||
|
||||
impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
|
||||
fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError<Boxed01Future01>>{
|
||||
self.spawn("unnamed", future.compat().map(drop));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Abstraction over a Substrate service.
|
||||
pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
|
||||
Spawn + Send + Unpin {
|
||||
@@ -225,6 +175,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
|
||||
fn transaction_pool(&self) -> Arc<Self::TransactionPool>;
|
||||
|
||||
/// Get a handle to a future that will resolve on exit.
|
||||
#[deprecated(note = "Use `spawn_task`/`spawn_essential_task` instead, those functions will attach on_exit signal.")]
|
||||
fn on_exit(&self) -> ::exit_future::Exit;
|
||||
|
||||
/// Get the prometheus metrics registry, if available.
|
||||
@@ -265,12 +216,7 @@ where
|
||||
}
|
||||
|
||||
fn spawn_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
|
||||
let on_exit = self.on_exit();
|
||||
let task = async move {
|
||||
futures::pin_mut!(task);
|
||||
let _ = select(on_exit, task).await;
|
||||
};
|
||||
let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into()));
|
||||
self.task_manager.spawn(name, task)
|
||||
}
|
||||
|
||||
fn spawn_essential_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
|
||||
@@ -281,20 +227,12 @@ where
|
||||
error!("Essential task failed. Shutting down service.");
|
||||
let _ = essential_failed.send(());
|
||||
});
|
||||
let on_exit = self.on_exit();
|
||||
let task = async move {
|
||||
futures::pin_mut!(essential_task);
|
||||
let _ = select(on_exit, essential_task).await;
|
||||
};
|
||||
|
||||
let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into()));
|
||||
let _ = self.spawn_task(name, essential_task);
|
||||
}
|
||||
|
||||
fn spawn_task_handle(&self) -> SpawnTaskHandle {
|
||||
SpawnTaskHandle {
|
||||
sender: self.to_spawn_tx.clone(),
|
||||
on_exit: self.on_exit(),
|
||||
}
|
||||
self.task_manager.spawn_handle()
|
||||
}
|
||||
|
||||
fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> {
|
||||
@@ -330,7 +268,7 @@ where
|
||||
}
|
||||
|
||||
fn on_exit(&self) -> exit_future::Exit {
|
||||
self.exit.clone()
|
||||
self.task_manager.on_exit()
|
||||
}
|
||||
|
||||
fn prometheus_registry(&self) -> Option<prometheus_endpoint::Registry> {
|
||||
@@ -355,9 +293,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
|
||||
}
|
||||
}
|
||||
|
||||
while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
|
||||
(this.task_executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn)));
|
||||
}
|
||||
this.task_manager.process_receiver(cx);
|
||||
|
||||
// The service future never ends.
|
||||
Poll::Pending
|
||||
@@ -371,7 +307,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Spawn for
|
||||
&self,
|
||||
future: FutureObj<'static, ()>
|
||||
) -> Result<(), SpawnError> {
|
||||
self.to_spawn_tx.unbounded_send((Box::pin(future), From::from("unnamed")))
|
||||
self.task_manager.scheduler().unbounded_send((Box::pin(future), From::from("unnamed")))
|
||||
.map_err(|_| SpawnError::shutdown())
|
||||
}
|
||||
}
|
||||
@@ -525,17 +461,6 @@ pub struct NetworkStatus<B: BlockT> {
|
||||
pub average_upload_per_sec: u64,
|
||||
}
|
||||
|
||||
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Drop for
|
||||
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
debug!(target: "service", "Substrate service shutdown");
|
||||
if let Some(signal) = self.signal.take() {
|
||||
let _ = signal.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
|
||||
mod waiting {
|
||||
|
||||
@@ -0,0 +1,190 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
//! Substrate service tasks management module.
|
||||
|
||||
use std::{
|
||||
result::Result, sync::Arc,
|
||||
task::{Poll, Context},
|
||||
borrow::Cow, pin::Pin,
|
||||
};
|
||||
use exit_future::Signal;
|
||||
use log::{debug, error};
|
||||
use futures::{
|
||||
Future, FutureExt, Stream,
|
||||
future::select, channel::mpsc,
|
||||
compat::*,
|
||||
task::{Spawn, FutureObj, SpawnError},
|
||||
};
|
||||
|
||||
/// Type alias for service task executor (usually runtime).
|
||||
pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>;
|
||||
|
||||
/// Type alias for the task scheduler.
|
||||
pub type TaskScheduler = mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>;
|
||||
|
||||
/// Helper struct to setup background tasks execution for service.
|
||||
pub struct TaskManagerBuilder {
|
||||
/// A future that resolves when the service has exited, this is useful to
|
||||
/// make sure any internally spawned futures stop when the service does.
|
||||
on_exit: exit_future::Exit,
|
||||
/// A signal that makes the exit future above resolve, fired on service drop.
|
||||
signal: Option<Signal>,
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: TaskScheduler,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
}
|
||||
|
||||
impl TaskManagerBuilder {
|
||||
/// New asynchronous task manager setup.
|
||||
pub fn new() -> Self {
|
||||
let (signal, on_exit) = exit_future::signal();
|
||||
let (to_spawn_tx, to_spawn_rx) = mpsc::unbounded();
|
||||
Self {
|
||||
on_exit,
|
||||
signal: Some(signal),
|
||||
to_spawn_tx,
|
||||
to_spawn_rx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get spawn handle.
|
||||
///
|
||||
/// Tasks spawned through this handle will get scheduled once
|
||||
/// service is up and running.
|
||||
pub fn spawn_handle(&self) -> SpawnTaskHandle {
|
||||
SpawnTaskHandle {
|
||||
on_exit: self.on_exit.clone(),
|
||||
sender: self.to_spawn_tx.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert into actual task manager from initial setup.
|
||||
pub(crate) fn into_task_manager(self, executor: ServiceTaskExecutor) -> TaskManager {
|
||||
let TaskManagerBuilder {
|
||||
on_exit,
|
||||
signal,
|
||||
to_spawn_rx,
|
||||
to_spawn_tx
|
||||
} = self;
|
||||
TaskManager {
|
||||
on_exit,
|
||||
signal,
|
||||
to_spawn_tx,
|
||||
to_spawn_rx,
|
||||
executor,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An handle for spawning tasks in the service.
|
||||
#[derive(Clone)]
|
||||
pub struct SpawnTaskHandle {
|
||||
sender: TaskScheduler,
|
||||
on_exit: exit_future::Exit,
|
||||
}
|
||||
|
||||
impl SpawnTaskHandle {
|
||||
/// Spawns the given task with the given name.
|
||||
pub fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
|
||||
let on_exit = self.on_exit.clone();
|
||||
let future = async move {
|
||||
futures::pin_mut!(task);
|
||||
let _ = select(on_exit, task).await;
|
||||
};
|
||||
if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() {
|
||||
error!("Failed to send task to spawn over channel");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Spawn for SpawnTaskHandle {
|
||||
fn spawn_obj(&self, future: FutureObj<'static, ()>)
|
||||
-> Result<(), SpawnError> {
|
||||
let future = select(self.on_exit.clone(), future).map(drop);
|
||||
self.sender.unbounded_send((Box::pin(future), From::from("unnamed")))
|
||||
.map_err(|_| SpawnError::shutdown())
|
||||
}
|
||||
}
|
||||
|
||||
type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send + 'static>;
|
||||
|
||||
impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
|
||||
fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError<Boxed01Future01>>{
|
||||
self.spawn("unnamed", future.compat().map(drop));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper struct to manage background/async tasks in Service.
|
||||
pub struct TaskManager {
|
||||
/// A future that resolves when the service has exited, this is useful to
|
||||
/// make sure any internally spawned futures stop when the service does.
|
||||
on_exit: exit_future::Exit,
|
||||
/// A signal that makes the exit future above resolve, fired on service drop.
|
||||
signal: Option<Signal>,
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: TaskScheduler,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
/// How to spawn background tasks.
|
||||
executor: ServiceTaskExecutor,
|
||||
}
|
||||
|
||||
impl TaskManager {
|
||||
/// Spawn background/async task, which will be aware on exit signal.
|
||||
pub(super) fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) {
|
||||
let on_exit = self.on_exit.clone();
|
||||
let future = async move {
|
||||
futures::pin_mut!(task);
|
||||
let _ = select(on_exit, task).await;
|
||||
};
|
||||
if self.to_spawn_tx.unbounded_send((Box::pin(future), name.into())).is_err() {
|
||||
error!("Failed to send task to spawn over channel");
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn spawn_handle(&self) -> SpawnTaskHandle {
|
||||
SpawnTaskHandle {
|
||||
on_exit: self.on_exit.clone(),
|
||||
sender: self.to_spawn_tx.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get sender where background/async tasks can be sent.
|
||||
pub(super) fn scheduler(&self) -> TaskScheduler {
|
||||
self.to_spawn_tx.clone()
|
||||
}
|
||||
|
||||
/// Process background task receiver.
|
||||
pub(super) fn process_receiver(&mut self, cx: &mut Context) {
|
||||
while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) {
|
||||
(self.executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn)));
|
||||
}
|
||||
}
|
||||
|
||||
/// Clone on exit signal.
|
||||
pub(super) fn on_exit(&self) -> exit_future::Exit {
|
||||
self.on_exit.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TaskManager {
|
||||
fn drop(&mut self) {
|
||||
debug!(target: "service", "Tasks manager shutdown");
|
||||
if let Some(signal) = self.signal.take() {
|
||||
let _ = signal.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user