mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Remove TaskManagerBuilder (#5725)
* Remove TaskManagerBuilder * Clean up use declaration fo SpawnTaskHandle Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
@@ -15,7 +15,7 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm};
|
||||
use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter};
|
||||
use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager};
|
||||
use crate::status_sinks;
|
||||
use crate::config::{Configuration, KeystoreConfig, PrometheusConfig};
|
||||
use crate::metrics::MetricsService;
|
||||
@@ -81,7 +81,7 @@ pub struct ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
|
||||
config: Configuration,
|
||||
pub (crate) client: Arc<TCl>,
|
||||
backend: Arc<Backend>,
|
||||
tasks_builder: TaskManagerBuilder,
|
||||
task_manager: TaskManager,
|
||||
keystore: Arc<RwLock<Keystore>>,
|
||||
fetcher: Option<TFchr>,
|
||||
select_chain: Option<TSc>,
|
||||
@@ -145,7 +145,7 @@ type TFullParts<TBl, TRtApi, TExecDisp> = (
|
||||
TFullClient<TBl, TRtApi, TExecDisp>,
|
||||
Arc<TFullBackend<TBl>>,
|
||||
Arc<RwLock<sc_keystore::Store>>,
|
||||
TaskManagerBuilder,
|
||||
TaskManager,
|
||||
);
|
||||
|
||||
/// Creates a new full client for the given config.
|
||||
@@ -172,9 +172,9 @@ fn new_full_parts<TBl, TRtApi, TExecDisp>(
|
||||
KeystoreConfig::InMemory => Keystore::new_in_memory(),
|
||||
};
|
||||
|
||||
let tasks_builder = {
|
||||
let task_manager = {
|
||||
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
|
||||
TaskManagerBuilder::new(registry)?
|
||||
TaskManager::new(config.task_executor.clone(), registry)?
|
||||
};
|
||||
|
||||
let executor = NativeExecutor::<TExecDisp>::new(
|
||||
@@ -213,12 +213,12 @@ fn new_full_parts<TBl, TRtApi, TExecDisp>(
|
||||
fork_blocks,
|
||||
bad_blocks,
|
||||
extensions,
|
||||
Box::new(tasks_builder.spawn_handle()),
|
||||
Box::new(task_manager.spawn_handle()),
|
||||
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
|
||||
)?
|
||||
};
|
||||
|
||||
Ok((client, backend, keystore, tasks_builder))
|
||||
Ok((client, backend, keystore, task_manager))
|
||||
}
|
||||
|
||||
impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
@@ -238,7 +238,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
(),
|
||||
TFullBackend<TBl>,
|
||||
>, Error> {
|
||||
let (client, backend, keystore, tasks_builder) = new_full_parts(&config)?;
|
||||
let (client, backend, keystore, task_manager) = new_full_parts(&config)?;
|
||||
|
||||
let client = Arc::new(client);
|
||||
|
||||
@@ -247,7 +247,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
client,
|
||||
backend,
|
||||
keystore,
|
||||
tasks_builder,
|
||||
task_manager,
|
||||
fetcher: None,
|
||||
select_chain: None,
|
||||
import_queue: (),
|
||||
@@ -277,9 +277,9 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
(),
|
||||
TLightBackend<TBl>,
|
||||
>, Error> {
|
||||
let tasks_builder = {
|
||||
let task_manager = {
|
||||
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
|
||||
TaskManagerBuilder::new(registry)?
|
||||
TaskManager::new(config.task_executor.clone(), registry)?
|
||||
};
|
||||
|
||||
let keystore = match &config.keystore {
|
||||
@@ -311,7 +311,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
sc_client::light::new_fetch_checker::<_, TBl, _>(
|
||||
light_blockchain.clone(),
|
||||
executor.clone(),
|
||||
Box::new(tasks_builder.spawn_handle()),
|
||||
Box::new(task_manager.spawn_handle()),
|
||||
),
|
||||
);
|
||||
let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker));
|
||||
@@ -321,7 +321,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
backend.clone(),
|
||||
config.chain_spec.as_storage_builder(),
|
||||
executor,
|
||||
Box::new(tasks_builder.spawn_handle()),
|
||||
Box::new(task_manager.spawn_handle()),
|
||||
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
|
||||
)?);
|
||||
|
||||
@@ -329,7 +329,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
|
||||
config,
|
||||
client,
|
||||
backend,
|
||||
tasks_builder,
|
||||
task_manager,
|
||||
keystore,
|
||||
fetcher: Some(fetcher.clone()),
|
||||
select_chain: None,
|
||||
@@ -402,7 +402,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
task_manager: self.task_manager,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain,
|
||||
@@ -445,7 +445,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
task_manager: self.task_manager,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -483,7 +483,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
task_manager: self.task_manager,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -545,7 +545,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
task_manager: self.task_manager,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -606,7 +606,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
|
||||
Ok(ServiceBuilder {
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
tasks_builder: self.tasks_builder,
|
||||
task_manager: self.task_manager,
|
||||
backend: self.backend,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
@@ -635,7 +635,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
|
||||
config: self.config,
|
||||
client: self.client,
|
||||
backend: self.backend,
|
||||
tasks_builder: self.tasks_builder,
|
||||
task_manager: self.task_manager,
|
||||
keystore: self.keystore,
|
||||
fetcher: self.fetcher,
|
||||
select_chain: self.select_chain,
|
||||
@@ -745,7 +745,7 @@ ServiceBuilder<
|
||||
marker: _,
|
||||
mut config,
|
||||
client,
|
||||
tasks_builder,
|
||||
task_manager,
|
||||
fetcher: on_demand,
|
||||
backend,
|
||||
keystore,
|
||||
@@ -789,7 +789,7 @@ ServiceBuilder<
|
||||
imports_external_transactions: !matches!(config.role, Role::Light),
|
||||
pool: transaction_pool.clone(),
|
||||
client: client.clone(),
|
||||
executor: tasks_builder.spawn_handle(),
|
||||
executor: task_manager.spawn_handle(),
|
||||
});
|
||||
|
||||
let protocol_id = {
|
||||
@@ -811,7 +811,7 @@ ServiceBuilder<
|
||||
let network_params = sc_network::config::Params {
|
||||
role: config.role.clone(),
|
||||
executor: {
|
||||
let spawn_handle = tasks_builder.spawn_handle();
|
||||
let spawn_handle = task_manager.spawn_handle();
|
||||
Some(Box::new(move |fut| {
|
||||
spawn_handle.spawn("libp2p-node", fut);
|
||||
}))
|
||||
@@ -845,7 +845,7 @@ ServiceBuilder<
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let spawn_handle = tasks_builder.spawn_handle();
|
||||
let spawn_handle = task_manager.spawn_handle();
|
||||
|
||||
// Spawn background tasks which were stacked during the
|
||||
// service building.
|
||||
@@ -857,7 +857,7 @@ ServiceBuilder<
|
||||
// block notifications
|
||||
let txpool = Arc::downgrade(&transaction_pool);
|
||||
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
|
||||
let notifications_spawn_handle = tasks_builder.spawn_handle();
|
||||
let notifications_spawn_handle = task_manager.spawn_handle();
|
||||
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
|
||||
let is_validator = config.role.is_authority();
|
||||
|
||||
@@ -1013,7 +1013,7 @@ ServiceBuilder<
|
||||
chain_type: chain_spec.chain_type().clone(),
|
||||
};
|
||||
|
||||
let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle()));
|
||||
let subscriptions = sc_rpc::Subscriptions::new(Arc::new(task_manager.spawn_handle()));
|
||||
|
||||
let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) =
|
||||
(remote_backend.as_ref(), on_demand.as_ref()) {
|
||||
@@ -1145,7 +1145,7 @@ ServiceBuilder<
|
||||
|
||||
Ok(Service {
|
||||
client,
|
||||
task_manager: tasks_builder.into_task_manager(config.task_executor),
|
||||
task_manager,
|
||||
network,
|
||||
network_status_sinks,
|
||||
select_chain,
|
||||
|
||||
@@ -74,7 +74,7 @@ pub use std::{ops::Deref, result::Result, sync::Arc};
|
||||
#[doc(hidden)]
|
||||
pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
|
||||
pub use sc_tracing::TracingReceiver;
|
||||
pub use task_manager::{TaskManagerBuilder, SpawnTaskHandle};
|
||||
pub use task_manager::SpawnTaskHandle;
|
||||
use task_manager::TaskManager;
|
||||
|
||||
const DEFAULT_PROTOCOL_ID: &str = "sup";
|
||||
@@ -304,8 +304,6 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
|
||||
}
|
||||
}
|
||||
|
||||
this.task_manager.process_receiver(cx);
|
||||
|
||||
// The service future never ends.
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
@@ -15,13 +15,12 @@
|
||||
|
||||
use std::{
|
||||
pin::Pin,
|
||||
result::Result, sync::Arc,
|
||||
task::{Poll, Context},
|
||||
result::Result, sync::Arc
|
||||
};
|
||||
use exit_future::Signal;
|
||||
use log::{debug, error};
|
||||
use log::{debug};
|
||||
use futures::{
|
||||
Future, FutureExt, Stream,
|
||||
Future, FutureExt,
|
||||
future::select,
|
||||
compat::*,
|
||||
task::{Spawn, FutureObj, SpawnError},
|
||||
@@ -32,88 +31,17 @@ use prometheus_endpoint::{
|
||||
CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64
|
||||
};
|
||||
use sc_client_api::CloneableSpawn;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
|
||||
mod prometheus_future;
|
||||
|
||||
/// Type alias for service task executor (usually runtime).
|
||||
pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>;
|
||||
|
||||
/// Type alias for the task scheduler.
|
||||
pub type TaskScheduler = TracingUnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>;
|
||||
|
||||
/// Helper struct to setup background tasks execution for service.
|
||||
pub struct TaskManagerBuilder {
|
||||
/// A future that resolves when the service has exited, this is useful to
|
||||
/// make sure any internally spawned futures stop when the service does.
|
||||
on_exit: exit_future::Exit,
|
||||
/// A signal that makes the exit future above resolve, fired on service drop.
|
||||
signal: Option<Signal>,
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: TaskScheduler,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
to_spawn_rx: TracingUnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
/// Prometheus metrics where to report the stats about tasks.
|
||||
metrics: Option<Metrics>,
|
||||
}
|
||||
|
||||
impl TaskManagerBuilder {
|
||||
/// New asynchronous task manager setup.
|
||||
///
|
||||
/// If a Prometheus registry is passed, it will be used to report statistics about the
|
||||
/// service tasks.
|
||||
pub fn new(prometheus_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
|
||||
let (signal, on_exit) = exit_future::signal();
|
||||
let (to_spawn_tx, to_spawn_rx) = tracing_unbounded("mpsc_task_manager");
|
||||
|
||||
let metrics = prometheus_registry.map(Metrics::register).transpose()?;
|
||||
|
||||
Ok(Self {
|
||||
on_exit,
|
||||
signal: Some(signal),
|
||||
to_spawn_tx,
|
||||
to_spawn_rx,
|
||||
metrics,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get spawn handle.
|
||||
///
|
||||
/// Tasks spawned through this handle will get scheduled once
|
||||
/// service is up and running.
|
||||
pub fn spawn_handle(&self) -> SpawnTaskHandle {
|
||||
SpawnTaskHandle {
|
||||
on_exit: self.on_exit.clone(),
|
||||
sender: self.to_spawn_tx.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert into actual task manager from initial setup.
|
||||
pub(crate) fn into_task_manager(self, executor: ServiceTaskExecutor) -> TaskManager {
|
||||
let TaskManagerBuilder {
|
||||
on_exit,
|
||||
signal,
|
||||
to_spawn_rx,
|
||||
to_spawn_tx,
|
||||
metrics,
|
||||
} = self;
|
||||
TaskManager {
|
||||
on_exit,
|
||||
signal,
|
||||
to_spawn_tx,
|
||||
to_spawn_rx,
|
||||
executor,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An handle for spawning tasks in the service.
|
||||
#[derive(Clone)]
|
||||
pub struct SpawnTaskHandle {
|
||||
sender: TaskScheduler,
|
||||
on_exit: exit_future::Exit,
|
||||
executor: ServiceTaskExecutor,
|
||||
metrics: Option<Metrics>,
|
||||
}
|
||||
|
||||
@@ -152,9 +80,7 @@ impl SpawnTaskHandle {
|
||||
}
|
||||
};
|
||||
|
||||
if self.sender.unbounded_send(Box::pin(future)).is_err() {
|
||||
error!("Failed to send task to spawn over channel");
|
||||
}
|
||||
(self.executor)(Box::pin(future));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,11 +114,6 @@ pub struct TaskManager {
|
||||
on_exit: exit_future::Exit,
|
||||
/// A signal that makes the exit future above resolve, fired on service drop.
|
||||
signal: Option<Signal>,
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: TaskScheduler,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
/// Note: please read comment on [`SpawnTaskHandle::spawn`] for why this is a `&'static str`.
|
||||
to_spawn_rx: TracingUnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
/// How to spawn background tasks.
|
||||
executor: ServiceTaskExecutor,
|
||||
/// Prometheus metric where to report the polling times.
|
||||
@@ -200,6 +121,24 @@ pub struct TaskManager {
|
||||
}
|
||||
|
||||
impl TaskManager {
|
||||
/// If a Prometheus registry is passed, it will be used to report statistics about the
|
||||
/// service tasks.
|
||||
pub(super) fn new(
|
||||
executor: ServiceTaskExecutor,
|
||||
prometheus_registry: Option<&Registry>
|
||||
) -> Result<Self, PrometheusError> {
|
||||
let (signal, on_exit) = exit_future::signal();
|
||||
|
||||
let metrics = prometheus_registry.map(Metrics::register).transpose()?;
|
||||
|
||||
Ok(Self {
|
||||
on_exit,
|
||||
signal: Some(signal),
|
||||
executor,
|
||||
metrics,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn background/async task, which will be aware on exit signal.
|
||||
///
|
||||
/// See also the documentation of [`SpawnTaskHandler::spawn`].
|
||||
@@ -210,18 +149,11 @@ impl TaskManager {
|
||||
pub(super) fn spawn_handle(&self) -> SpawnTaskHandle {
|
||||
SpawnTaskHandle {
|
||||
on_exit: self.on_exit.clone(),
|
||||
sender: self.to_spawn_tx.clone(),
|
||||
executor: self.executor.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process background task receiver.
|
||||
pub(super) fn process_receiver(&mut self, cx: &mut Context) {
|
||||
while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) {
|
||||
(self.executor)(task_to_spawn);
|
||||
}
|
||||
}
|
||||
|
||||
/// Clone on exit signal.
|
||||
pub(super) fn on_exit(&self) -> exit_future::Exit {
|
||||
self.on_exit.clone()
|
||||
|
||||
Reference in New Issue
Block a user