diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 1cdebe8b14..b819c47bee 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -15,7 +15,7 @@
// along with Substrate. If not, see .
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm};
-use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter};
+use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager};
use crate::status_sinks;
use crate::config::{Configuration, KeystoreConfig, PrometheusConfig};
use crate::metrics::MetricsService;
@@ -81,7 +81,7 @@ pub struct ServiceBuilder,
backend: Arc,
- tasks_builder: TaskManagerBuilder,
+ task_manager: TaskManager,
keystore: Arc>,
fetcher: Option,
select_chain: Option,
@@ -145,7 +145,7 @@ type TFullParts = (
TFullClient,
Arc>,
Arc>,
- TaskManagerBuilder,
+ TaskManager,
);
/// Creates a new full client for the given config.
@@ -172,9 +172,9 @@ fn new_full_parts(
KeystoreConfig::InMemory => Keystore::new_in_memory(),
};
- let tasks_builder = {
+ let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
- TaskManagerBuilder::new(registry)?
+ TaskManager::new(config.task_executor.clone(), registry)?
};
let executor = NativeExecutor::::new(
@@ -213,12 +213,12 @@ fn new_full_parts(
fork_blocks,
bad_blocks,
extensions,
- Box::new(tasks_builder.spawn_handle()),
+ Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
)?
};
- Ok((client, backend, keystore, tasks_builder))
+ Ok((client, backend, keystore, task_manager))
}
impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
@@ -238,7 +238,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
(),
TFullBackend,
>, Error> {
- let (client, backend, keystore, tasks_builder) = new_full_parts(&config)?;
+ let (client, backend, keystore, task_manager) = new_full_parts(&config)?;
let client = Arc::new(client);
@@ -247,7 +247,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
client,
backend,
keystore,
- tasks_builder,
+ task_manager,
fetcher: None,
select_chain: None,
import_queue: (),
@@ -277,9 +277,9 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
(),
TLightBackend,
>, Error> {
- let tasks_builder = {
+ let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
- TaskManagerBuilder::new(registry)?
+ TaskManager::new(config.task_executor.clone(), registry)?
};
let keystore = match &config.keystore {
@@ -311,7 +311,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
sc_client::light::new_fetch_checker::<_, TBl, _>(
light_blockchain.clone(),
executor.clone(),
- Box::new(tasks_builder.spawn_handle()),
+ Box::new(task_manager.spawn_handle()),
),
);
let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker));
@@ -321,7 +321,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
backend.clone(),
config.chain_spec.as_storage_builder(),
executor,
- Box::new(tasks_builder.spawn_handle()),
+ Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
)?);
@@ -329,7 +329,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
config,
client,
backend,
- tasks_builder,
+ task_manager,
keystore,
fetcher: Some(fetcher.clone()),
select_chain: None,
@@ -402,7 +402,7 @@ impl
config: self.config,
client: self.client,
backend: self.backend,
- tasks_builder: self.tasks_builder,
+ task_manager: self.task_manager,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain,
@@ -445,7 +445,7 @@ impl
config: self.config,
client: self.client,
backend: self.backend,
- tasks_builder: self.tasks_builder,
+ task_manager: self.task_manager,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
@@ -483,7 +483,7 @@ impl
config: self.config,
client: self.client,
backend: self.backend,
- tasks_builder: self.tasks_builder,
+ task_manager: self.task_manager,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
@@ -545,7 +545,7 @@ impl
config: self.config,
client: self.client,
backend: self.backend,
- tasks_builder: self.tasks_builder,
+ task_manager: self.task_manager,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
@@ -606,7 +606,7 @@ impl
Ok(ServiceBuilder {
config: self.config,
client: self.client,
- tasks_builder: self.tasks_builder,
+ task_manager: self.task_manager,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
@@ -635,7 +635,7 @@ impl
config: self.config,
client: self.client,
backend: self.backend,
- tasks_builder: self.tasks_builder,
+ task_manager: self.task_manager,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
@@ -745,7 +745,7 @@ ServiceBuilder<
marker: _,
mut config,
client,
- tasks_builder,
+ task_manager,
fetcher: on_demand,
backend,
keystore,
@@ -789,7 +789,7 @@ ServiceBuilder<
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool.clone(),
client: client.clone(),
- executor: tasks_builder.spawn_handle(),
+ executor: task_manager.spawn_handle(),
});
let protocol_id = {
@@ -811,7 +811,7 @@ ServiceBuilder<
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
- let spawn_handle = tasks_builder.spawn_handle();
+ let spawn_handle = task_manager.spawn_handle();
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
}))
@@ -845,7 +845,7 @@ ServiceBuilder<
_ => None,
};
- let spawn_handle = tasks_builder.spawn_handle();
+ let spawn_handle = task_manager.spawn_handle();
// Spawn background tasks which were stacked during the
// service building.
@@ -857,7 +857,7 @@ ServiceBuilder<
// block notifications
let txpool = Arc::downgrade(&transaction_pool);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
- let notifications_spawn_handle = tasks_builder.spawn_handle();
+ let notifications_spawn_handle = task_manager.spawn_handle();
let network_state_info: Arc = network.clone();
let is_validator = config.role.is_authority();
@@ -1013,7 +1013,7 @@ ServiceBuilder<
chain_type: chain_spec.chain_type().clone(),
};
- let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle()));
+ let subscriptions = sc_rpc::Subscriptions::new(Arc::new(task_manager.spawn_handle()));
let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend.as_ref(), on_demand.as_ref()) {
@@ -1145,7 +1145,7 @@ ServiceBuilder<
Ok(Service {
client,
- task_manager: tasks_builder.into_task_manager(config.task_executor),
+ task_manager,
network,
network_status_sinks,
select_chain,
diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index 039e0257ab..56fee6b6d7 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -74,7 +74,7 @@ pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
pub use sc_tracing::TracingReceiver;
-pub use task_manager::{TaskManagerBuilder, SpawnTaskHandle};
+pub use task_manager::SpawnTaskHandle;
use task_manager::TaskManager;
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -304,8 +304,6 @@ impl Future for
}
}
- this.task_manager.process_receiver(cx);
-
// The service future never ends.
Poll::Pending
}
diff --git a/substrate/client/service/src/task_manager.rs b/substrate/client/service/src/task_manager.rs
index fd7fc62ab5..e6847d0881 100644
--- a/substrate/client/service/src/task_manager.rs
+++ b/substrate/client/service/src/task_manager.rs
@@ -15,13 +15,12 @@
use std::{
pin::Pin,
- result::Result, sync::Arc,
- task::{Poll, Context},
+ result::Result, sync::Arc
};
use exit_future::Signal;
-use log::{debug, error};
+use log::{debug};
use futures::{
- Future, FutureExt, Stream,
+ Future, FutureExt,
future::select,
compat::*,
task::{Spawn, FutureObj, SpawnError},
@@ -32,88 +31,17 @@ use prometheus_endpoint::{
CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64
};
use sc_client_api::CloneableSpawn;
-use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
mod prometheus_future;
/// Type alias for service task executor (usually runtime).
pub type ServiceTaskExecutor = Arc + Send>>) + Send + Sync>;
-/// Type alias for the task scheduler.
-pub type TaskScheduler = TracingUnboundedSender + Send>>>;
-
-/// Helper struct to setup background tasks execution for service.
-pub struct TaskManagerBuilder {
- /// A future that resolves when the service has exited, this is useful to
- /// make sure any internally spawned futures stop when the service does.
- on_exit: exit_future::Exit,
- /// A signal that makes the exit future above resolve, fired on service drop.
- signal: Option,
- /// Sender for futures that must be spawned as background tasks.
- to_spawn_tx: TaskScheduler,
- /// Receiver for futures that must be spawned as background tasks.
- to_spawn_rx: TracingUnboundedReceiver + Send>>>,
- /// Prometheus metrics where to report the stats about tasks.
- metrics: Option,
-}
-
-impl TaskManagerBuilder {
- /// New asynchronous task manager setup.
- ///
- /// If a Prometheus registry is passed, it will be used to report statistics about the
- /// service tasks.
- pub fn new(prometheus_registry: Option<&Registry>) -> Result {
- let (signal, on_exit) = exit_future::signal();
- let (to_spawn_tx, to_spawn_rx) = tracing_unbounded("mpsc_task_manager");
-
- let metrics = prometheus_registry.map(Metrics::register).transpose()?;
-
- Ok(Self {
- on_exit,
- signal: Some(signal),
- to_spawn_tx,
- to_spawn_rx,
- metrics,
- })
- }
-
- /// Get spawn handle.
- ///
- /// Tasks spawned through this handle will get scheduled once
- /// service is up and running.
- pub fn spawn_handle(&self) -> SpawnTaskHandle {
- SpawnTaskHandle {
- on_exit: self.on_exit.clone(),
- sender: self.to_spawn_tx.clone(),
- metrics: self.metrics.clone(),
- }
- }
-
- /// Convert into actual task manager from initial setup.
- pub(crate) fn into_task_manager(self, executor: ServiceTaskExecutor) -> TaskManager {
- let TaskManagerBuilder {
- on_exit,
- signal,
- to_spawn_rx,
- to_spawn_tx,
- metrics,
- } = self;
- TaskManager {
- on_exit,
- signal,
- to_spawn_tx,
- to_spawn_rx,
- executor,
- metrics,
- }
- }
-}
-
/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
- sender: TaskScheduler,
on_exit: exit_future::Exit,
+ executor: ServiceTaskExecutor,
metrics: Option,
}
@@ -152,9 +80,7 @@ impl SpawnTaskHandle {
}
};
- if self.sender.unbounded_send(Box::pin(future)).is_err() {
- error!("Failed to send task to spawn over channel");
- }
+ (self.executor)(Box::pin(future));
}
}
@@ -188,11 +114,6 @@ pub struct TaskManager {
on_exit: exit_future::Exit,
/// A signal that makes the exit future above resolve, fired on service drop.
signal: Option,
- /// Sender for futures that must be spawned as background tasks.
- to_spawn_tx: TaskScheduler,
- /// Receiver for futures that must be spawned as background tasks.
- /// Note: please read comment on [`SpawnTaskHandle::spawn`] for why this is a `&'static str`.
- to_spawn_rx: TracingUnboundedReceiver + Send>>>,
/// How to spawn background tasks.
executor: ServiceTaskExecutor,
/// Prometheus metric where to report the polling times.
@@ -200,6 +121,24 @@ pub struct TaskManager {
}
impl TaskManager {
+ /// If a Prometheus registry is passed, it will be used to report statistics about the
+ /// service tasks.
+ pub(super) fn new(
+ executor: ServiceTaskExecutor,
+ prometheus_registry: Option<&Registry>
+ ) -> Result {
+ let (signal, on_exit) = exit_future::signal();
+
+ let metrics = prometheus_registry.map(Metrics::register).transpose()?;
+
+ Ok(Self {
+ on_exit,
+ signal: Some(signal),
+ executor,
+ metrics,
+ })
+ }
+
/// Spawn background/async task, which will be aware on exit signal.
///
/// See also the documentation of [`SpawnTaskHandler::spawn`].
@@ -210,18 +149,11 @@ impl TaskManager {
pub(super) fn spawn_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
on_exit: self.on_exit.clone(),
- sender: self.to_spawn_tx.clone(),
+ executor: self.executor.clone(),
metrics: self.metrics.clone(),
}
}
- /// Process background task receiver.
- pub(super) fn process_receiver(&mut self, cx: &mut Context) {
- while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) {
- (self.executor)(task_to_spawn);
- }
- }
-
/// Clone on exit signal.
pub(super) fn on_exit(&self) -> exit_future::Exit {
self.on_exit.clone()