mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 15:21:08 +00:00
Per subsystem CPU usage tracking (#4239)
* SubsystemContext: add subsystem name str Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * Overseer builder proc macro changes * initilize SubsystemContext name field. * Add subsystem name in TaskKind::launch_task() Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * Update ToOverseer enum Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * Assign subsystem names to orphan tasks Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * cargo fmt Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * SubsystemContext: add subsystem name str Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * Overseer builder proc macro changes * initilize SubsystemContext name field. * Add subsystem name in TaskKind::launch_task() Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * Update ToOverseer enum Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * Assign subsystem names to orphan tasks Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com> * cargo fmt Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Rebase changes for new spawn() group param Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Add subsystem constat in JobTrait Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Add subsystem string Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fix tests Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fix spawn() calls Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * cargo fmt Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fix Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fix tests Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * fix Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fix more tests Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Address PR review feedback #1 Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Address PR review round 2 Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fixes - remove JobTrait::Subsystem - fix tests Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * update Cargo.lock Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Generated
+207
-214
File diff suppressed because it is too large
Load Diff
@@ -291,7 +291,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
|
||||
let mut task_sender = sender.clone();
|
||||
let metrics = metrics.clone();
|
||||
ctx.spawn(
|
||||
"collation generation collation builder",
|
||||
"collation-builder",
|
||||
Box::pin(async move {
|
||||
let persisted_validation_data_hash = validation_data.hash();
|
||||
|
||||
|
||||
@@ -383,8 +383,9 @@ impl Error {
|
||||
fn trace(&self) {
|
||||
match self {
|
||||
// don't spam the log with spurious errors
|
||||
Self::RuntimeApi(_) | Self::Oneshot(_) =>
|
||||
tracing::debug!(target: LOG_TARGET, err = ?self),
|
||||
Self::RuntimeApi(_) | Self::Oneshot(_) => {
|
||||
tracing::debug!(target: LOG_TARGET, err = ?self)
|
||||
},
|
||||
// it's worth reporting otherwise
|
||||
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
|
||||
}
|
||||
|
||||
@@ -659,7 +659,7 @@ impl CandidateBackingJob {
|
||||
}
|
||||
};
|
||||
sender
|
||||
.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed()))
|
||||
.send_command(FromJobCommand::Spawn("backing-validation", bg.boxed()))
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -900,11 +900,13 @@ impl CandidateBackingJob {
|
||||
.await;
|
||||
|
||||
match confirmation_rx.await {
|
||||
Err(oneshot::Canceled) =>
|
||||
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",),
|
||||
Err(oneshot::Canceled) => {
|
||||
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",)
|
||||
},
|
||||
Ok(ImportStatementsResult::ValidImport) => {},
|
||||
Ok(ImportStatementsResult::InvalidImport) =>
|
||||
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",),
|
||||
Ok(ImportStatementsResult::InvalidImport) => {
|
||||
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1168,7 +1170,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
type RunArgs = SyncCryptoStorePtr;
|
||||
type Metrics = Metrics;
|
||||
|
||||
const NAME: &'static str = "CandidateBackingJob";
|
||||
const NAME: &'static str = "candidate-backing-job";
|
||||
|
||||
fn run<S: SubsystemSender>(
|
||||
parent: Hash,
|
||||
|
||||
@@ -233,7 +233,7 @@ impl JobTrait for BitfieldSigningJob {
|
||||
type RunArgs = SyncCryptoStorePtr;
|
||||
type Metrics = Metrics;
|
||||
|
||||
const NAME: &'static str = "BitfieldSigningJob";
|
||||
const NAME: &'static str = "bitfield-signing-job";
|
||||
|
||||
/// Run a job for the parent block indicated
|
||||
fn run<S: SubsystemSender>(
|
||||
|
||||
@@ -148,7 +148,7 @@ impl JobTrait for ProvisioningJob {
|
||||
type RunArgs = ();
|
||||
type Metrics = Metrics;
|
||||
|
||||
const NAME: &'static str = "ProvisioningJob";
|
||||
const NAME: &'static str = "provisioner-job";
|
||||
|
||||
/// Run a job for the parent block indicated
|
||||
//
|
||||
|
||||
@@ -278,11 +278,21 @@ impl TaskExecutor {
|
||||
}
|
||||
|
||||
impl sp_core::traits::SpawnNamed for TaskExecutor {
|
||||
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
|
||||
fn spawn_blocking(
|
||||
&self,
|
||||
_task_name: &'static str,
|
||||
_subsystem_name: Option<&'static str>,
|
||||
future: futures::future::BoxFuture<'static, ()>,
|
||||
) {
|
||||
self.0.spawn_ok(future);
|
||||
}
|
||||
|
||||
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
|
||||
fn spawn(
|
||||
&self,
|
||||
_task_name: &'static str,
|
||||
_subsystem_name: Option<&'static str>,
|
||||
future: futures::future::BoxFuture<'static, ()>,
|
||||
) {
|
||||
self.0.spawn_ok(future);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,7 +270,8 @@ where
|
||||
)
|
||||
}
|
||||
} else {
|
||||
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
|
||||
self.spawn_handle
|
||||
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
|
||||
self.active_requests.push(receiver);
|
||||
}
|
||||
}
|
||||
@@ -288,7 +289,8 @@ where
|
||||
}
|
||||
|
||||
if let Some((req, recv)) = self.waiting_requests.pop_front() {
|
||||
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
|
||||
self.spawn_handle
|
||||
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req);
|
||||
self.active_requests.push(recv);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,6 +112,7 @@ impl Jaeger {
|
||||
// Spawn a background task that pulls span information and sends them on the network.
|
||||
spawner.spawn(
|
||||
"jaeger-collector",
|
||||
Some("jaeger"),
|
||||
Box::pin(async move {
|
||||
match async_std::net::UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(udp_socket) => loop {
|
||||
|
||||
@@ -197,7 +197,8 @@ impl TestState {
|
||||
// lock ;-)
|
||||
let update_tx = tx.clone();
|
||||
harness.pool.spawn(
|
||||
"Sending active leaves updates",
|
||||
"sending-active-leaves-updates",
|
||||
None,
|
||||
async move {
|
||||
for update in updates {
|
||||
overseer_signal(update_tx.clone(), OverseerSignal::ActiveLeaves(update)).await;
|
||||
@@ -308,7 +309,8 @@ fn to_incoming_req(
|
||||
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>) =
|
||||
oneshot::channel();
|
||||
executor.spawn(
|
||||
"Message forwarding",
|
||||
"message-forwarding",
|
||||
None,
|
||||
async {
|
||||
let response = rx.await;
|
||||
let payload = response.expect("Unexpected canceled request").result;
|
||||
|
||||
@@ -782,7 +782,7 @@ where
|
||||
awaiting: vec![response_sender],
|
||||
});
|
||||
|
||||
if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) {
|
||||
if let Err(e) = ctx.spawn("recovery-task", Box::pin(remote)) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
|
||||
@@ -102,12 +102,22 @@ struct Xxx {
|
||||
struct DummySpawner;
|
||||
|
||||
impl SpawnNamed for DummySpawner {
|
||||
fn spawn_blocking(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
|
||||
unimplemented!("spawn blocking {}", name)
|
||||
fn spawn_blocking(
|
||||
&self,
|
||||
task_name: &'static str,
|
||||
subsystem_name: Option<&'static str>,
|
||||
_future: futures::future::BoxFuture<'static, ()>,
|
||||
) {
|
||||
unimplemented!("spawn blocking {} {}", task_name, subsystem_name.unwrap_or("default"))
|
||||
}
|
||||
|
||||
fn spawn(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
|
||||
unimplemented!("spawn {}", name)
|
||||
fn spawn(
|
||||
&self,
|
||||
task_name: &'static str,
|
||||
subsystem_name: Option<&'static str>,
|
||||
_future: futures::future::BoxFuture<'static, ()>,
|
||||
) {
|
||||
unimplemented!("spawn {} {}", task_name, subsystem_name.unwrap_or("default"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -337,7 +337,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
// TODO generate a builder pattern that ensures this
|
||||
// TODO https://github.com/paritytech/polkadot/issues/3427
|
||||
let #subsystem_name = match self. #subsystem_name {
|
||||
FieldInitMethod::Fn(func) => func(handle.clone())?,
|
||||
FieldInitMethod::Fn(func) => func(handle.clone())?,
|
||||
FieldInitMethod::Value(val) => val,
|
||||
FieldInitMethod::Uninitialized =>
|
||||
panic!("All subsystems must exist with the builder pattern."),
|
||||
@@ -349,11 +349,18 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
#channel_name_rx, #channel_name_unbounded_rx
|
||||
);
|
||||
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);
|
||||
|
||||
// Generate subsystem name based on overseer field name.
|
||||
let mut subsystem_string = String::from(stringify!(#subsystem_name));
|
||||
// Convert owned `snake case` string to a `kebab case` static str.
|
||||
let subsystem_static_str = Box::leak(subsystem_string.replace("_", "-").into_boxed_str());
|
||||
|
||||
let ctx = #subsyste_ctx_name::< #consumes >::new(
|
||||
signal_rx,
|
||||
message_rx,
|
||||
channels_out.clone(),
|
||||
to_overseer_tx.clone(),
|
||||
subsystem_static_str
|
||||
);
|
||||
|
||||
let #subsystem_name: OverseenSubsystem< #consumes > =
|
||||
@@ -364,6 +371,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
unbounded_meter,
|
||||
ctx,
|
||||
#subsystem_name,
|
||||
subsystem_static_str,
|
||||
&mut running_subsystems,
|
||||
)?;
|
||||
)*
|
||||
@@ -489,22 +497,22 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
/// Task kind to launch.
|
||||
pub trait TaskKind {
|
||||
/// Spawn a task, it depends on the implementer if this is blocking or not.
|
||||
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>);
|
||||
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>);
|
||||
}
|
||||
|
||||
#[allow(missing_docs)]
|
||||
struct Regular;
|
||||
impl TaskKind for Regular {
|
||||
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
|
||||
spawner.spawn(name, future)
|
||||
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
|
||||
spawner.spawn(task_name, Some(subsystem_name), future)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(missing_docs)]
|
||||
struct Blocking;
|
||||
impl TaskKind for Blocking {
|
||||
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
|
||||
spawner.spawn_blocking(name, future)
|
||||
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
|
||||
spawner.spawn(task_name, Some(subsystem_name), future)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -517,6 +525,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
unbounded_meter: #support_crate ::metered::Meter,
|
||||
ctx: Ctx,
|
||||
s: SubSys,
|
||||
subsystem_name: &'static str,
|
||||
futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
|
||||
) -> ::std::result::Result<OverseenSubsystem<M>, #error_ty >
|
||||
where
|
||||
@@ -540,7 +549,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
let _ = tx.send(());
|
||||
});
|
||||
|
||||
<TK as TaskKind>::launch_task(spawner, name, fut);
|
||||
<TK as TaskKind>::launch_task(spawner, name, subsystem_name, fut);
|
||||
|
||||
futures.push(Box::pin(
|
||||
rx.map(|e| {
|
||||
|
||||
@@ -112,6 +112,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
>,
|
||||
signals_received: SignalsReceived,
|
||||
pending_incoming: Option<(usize, M)>,
|
||||
name: &'static str
|
||||
}
|
||||
|
||||
impl<M> #subsystem_ctx_name<M> {
|
||||
@@ -121,6 +122,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
messages: SubsystemIncomingMessages<M>,
|
||||
to_subsystems: ChannelsOut,
|
||||
to_overseer: #support_crate ::metered::UnboundedMeteredSender<#support_crate:: ToOverseer>,
|
||||
name: &'static str
|
||||
) -> Self {
|
||||
let signals_received = SignalsReceived::default();
|
||||
#subsystem_ctx_name {
|
||||
@@ -133,8 +135,13 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
to_overseer,
|
||||
signals_received,
|
||||
pending_incoming: None,
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
self.name
|
||||
}
|
||||
}
|
||||
|
||||
#[#support_crate ::async_trait]
|
||||
@@ -229,6 +236,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
{
|
||||
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnJob {
|
||||
name,
|
||||
subsystem: Some(self.name()),
|
||||
s,
|
||||
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
|
||||
Ok(())
|
||||
@@ -239,6 +247,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
{
|
||||
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnBlockingJob {
|
||||
name,
|
||||
subsystem: Some(self.name()),
|
||||
s,
|
||||
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
|
||||
Ok(())
|
||||
|
||||
@@ -111,6 +111,8 @@ pub enum ToOverseer {
|
||||
SpawnJob {
|
||||
/// Name of the task to spawn which be shown in jaeger and tracing logs.
|
||||
name: &'static str,
|
||||
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
|
||||
subsystem: Option<&'static str>,
|
||||
/// The future to execute.
|
||||
s: BoxFuture<'static, ()>,
|
||||
},
|
||||
@@ -120,6 +122,8 @@ pub enum ToOverseer {
|
||||
SpawnBlockingJob {
|
||||
/// Name of the task to spawn which be shown in jaeger and tracing logs.
|
||||
name: &'static str,
|
||||
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
|
||||
subsystem: Option<&'static str>,
|
||||
/// The future to execute.
|
||||
s: BoxFuture<'static, ()>,
|
||||
},
|
||||
@@ -128,8 +132,12 @@ pub enum ToOverseer {
|
||||
impl fmt::Debug for ToOverseer {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::SpawnJob { name, .. } => writeln!(f, "SpawnJob{{ {}, ..}}", name),
|
||||
Self::SpawnBlockingJob { name, .. } => writeln!(f, "SpawnBlockingJob{{ {}, ..}}", name),
|
||||
Self::SpawnJob { name, subsystem, .. } => {
|
||||
writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
|
||||
},
|
||||
Self::SpawnBlockingJob { name, subsystem, .. } => {
|
||||
writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,7 +562,10 @@ where
|
||||
|
||||
futures::future::ready(())
|
||||
});
|
||||
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
|
||||
overseer
|
||||
.spawner()
|
||||
.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -616,11 +619,11 @@ where
|
||||
},
|
||||
msg = self.to_overseer_rx.select_next_some() => {
|
||||
match msg {
|
||||
ToOverseer::SpawnJob { name, s } => {
|
||||
self.spawn_job(name, s);
|
||||
ToOverseer::SpawnJob { name, subsystem, s } => {
|
||||
self.spawn_job(name, subsystem, s);
|
||||
}
|
||||
ToOverseer::SpawnBlockingJob { name, s } => {
|
||||
self.spawn_blocking_job(name, s);
|
||||
ToOverseer::SpawnBlockingJob { name, subsystem, s } => {
|
||||
self.spawn_blocking_job(name, subsystem, s);
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -772,11 +775,21 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
|
||||
self.spawner.spawn(name, j);
|
||||
fn spawn_job(
|
||||
&mut self,
|
||||
task_name: &'static str,
|
||||
subsystem_name: Option<&'static str>,
|
||||
j: BoxFuture<'static, ()>,
|
||||
) {
|
||||
self.spawner.spawn(task_name, subsystem_name, j);
|
||||
}
|
||||
|
||||
fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
|
||||
self.spawner.spawn_blocking(name, j);
|
||||
fn spawn_blocking_job(
|
||||
&mut self,
|
||||
task_name: &'static str,
|
||||
subsystem_name: Option<&'static str>,
|
||||
j: BoxFuture<'static, ()>,
|
||||
) {
|
||||
self.spawner.spawn_blocking(task_name, subsystem_name, j);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1151,6 +1151,7 @@ fn context_holds_onto_message_until_enough_signals_received() {
|
||||
stream::select(bounded_rx, unbounded_rx),
|
||||
channels_out,
|
||||
to_overseer_tx,
|
||||
"test",
|
||||
);
|
||||
|
||||
assert_eq!(ctx.signals_received.load(), 0);
|
||||
|
||||
@@ -380,7 +380,11 @@ where
|
||||
|
||||
let telemetry = telemetry.map(|(worker, telemetry)| {
|
||||
if let Some(worker) = worker {
|
||||
task_manager.spawn_handle().spawn("telemetry", worker.run());
|
||||
task_manager.spawn_handle().spawn(
|
||||
"telemetry",
|
||||
Some("telemetry"),
|
||||
Box::pin(worker.run()),
|
||||
);
|
||||
}
|
||||
telemetry
|
||||
});
|
||||
@@ -805,6 +809,7 @@ where
|
||||
// Start the offchain workers to have
|
||||
task_manager.spawn_handle().spawn(
|
||||
"offchain-notifications",
|
||||
None,
|
||||
sc_offchain::notification_future(
|
||||
config.role.is_authority(),
|
||||
client.clone(),
|
||||
@@ -904,7 +909,11 @@ where
|
||||
prometheus_registry.clone(),
|
||||
);
|
||||
|
||||
task_manager.spawn_handle().spawn("authority-discovery-worker", worker.run());
|
||||
task_manager.spawn_handle().spawn(
|
||||
"authority-discovery-worker",
|
||||
Some("authority-discovery"),
|
||||
Box::pin(worker.run()),
|
||||
);
|
||||
Some(service)
|
||||
} else {
|
||||
None
|
||||
@@ -950,6 +959,7 @@ where
|
||||
let handle = handle.clone();
|
||||
task_manager.spawn_essential_handle().spawn_blocking(
|
||||
"overseer",
|
||||
None,
|
||||
Box::pin(async move {
|
||||
use futures::{pin_mut, select, FutureExt};
|
||||
|
||||
@@ -1038,7 +1048,7 @@ where
|
||||
};
|
||||
|
||||
let babe = babe::start_babe(babe_config)?;
|
||||
task_manager.spawn_essential_handle().spawn_blocking("babe", babe);
|
||||
task_manager.spawn_essential_handle().spawn_blocking("babe", None, babe);
|
||||
}
|
||||
|
||||
// if the node isn't actively participating in consensus then it doesn't
|
||||
@@ -1063,9 +1073,11 @@ where
|
||||
// Wococo's purpose is to be a testbed for BEEFY, so if it fails we'll
|
||||
// bring the node down with it to make sure it is noticed.
|
||||
if chain_spec.is_wococo() {
|
||||
task_manager.spawn_essential_handle().spawn_blocking("beefy-gadget", gadget);
|
||||
task_manager
|
||||
.spawn_essential_handle()
|
||||
.spawn_blocking("beefy-gadget", None, gadget);
|
||||
} else {
|
||||
task_manager.spawn_handle().spawn_blocking("beefy-gadget", gadget);
|
||||
task_manager.spawn_handle().spawn_blocking("beefy-gadget", None, gadget);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1119,9 +1131,11 @@ where
|
||||
telemetry: telemetry.as_ref().map(|x| x.handle()),
|
||||
};
|
||||
|
||||
task_manager
|
||||
.spawn_essential_handle()
|
||||
.spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?);
|
||||
task_manager.spawn_essential_handle().spawn_blocking(
|
||||
"grandpa-voter",
|
||||
None,
|
||||
grandpa::run_grandpa_voter(grandpa_config)?,
|
||||
);
|
||||
}
|
||||
|
||||
network_starter.start_network();
|
||||
|
||||
@@ -212,7 +212,7 @@ where
|
||||
name: &'static str,
|
||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
) -> SubsystemResult<()> {
|
||||
self.spawn.spawn(name, s);
|
||||
self.spawn.spawn(name, None, s);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -221,7 +221,7 @@ where
|
||||
name: &'static str,
|
||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
) -> SubsystemResult<()> {
|
||||
self.spawn.spawn_blocking(name, s);
|
||||
self.spawn.spawn_blocking(name, None, s);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -396,7 +396,7 @@ mod tests {
|
||||
|
||||
let mut handle = Handle::new(handle);
|
||||
|
||||
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
|
||||
spawner.spawn("overseer", None, overseer.run().then(|_| async { () }).boxed());
|
||||
|
||||
block_on(handle.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default())));
|
||||
assert!(matches!(
|
||||
|
||||
@@ -485,7 +485,7 @@ pub trait JobTrait: Unpin + Sized {
|
||||
/// The `delegate_subsystem!` macro should take care of this.
|
||||
type Metrics: 'static + metrics::Metrics + Send;
|
||||
|
||||
/// Name of the job, i.e. `CandidateBackingJob`
|
||||
/// Name of the job, i.e. `candidate-backing-job`
|
||||
const NAME: &'static str;
|
||||
|
||||
/// Run a job for the given relay `parent`.
|
||||
@@ -577,7 +577,11 @@ where
|
||||
Ok(())
|
||||
});
|
||||
|
||||
self.spawner.spawn(Job::NAME, future.map(drop).boxed());
|
||||
self.spawner.spawn(
|
||||
Job::NAME,
|
||||
Some(Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME)),
|
||||
future.map(drop).boxed(),
|
||||
);
|
||||
self.outgoing_msgs.push(from_job_rx);
|
||||
|
||||
let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx };
|
||||
@@ -750,6 +754,6 @@ where
|
||||
Ok(())
|
||||
});
|
||||
|
||||
SpawnedSubsystem { name: Job::NAME.strip_suffix("Job").unwrap_or(Job::NAME), future }
|
||||
SpawnedSubsystem { name: Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME), future }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ impl JobTrait for FakeCollatorProtocolJob {
|
||||
type RunArgs = bool;
|
||||
type Metrics = ();
|
||||
|
||||
const NAME: &'static str = "FakeCollatorProtocolJob";
|
||||
const NAME: &'static str = "fake-collator-protocol-job";
|
||||
|
||||
/// Run a job for the parent block indicated
|
||||
//
|
||||
@@ -199,7 +199,7 @@ fn test_subsystem_impl_and_name_derivation() {
|
||||
|
||||
let SpawnedSubsystem { name, .. } =
|
||||
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
|
||||
assert_eq!(name, "FakeCollatorProtocol");
|
||||
assert_eq!(name, "fake-collator-protocol");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -186,6 +186,7 @@ impl Collator {
|
||||
let seconded_collations = seconded_collations.clone();
|
||||
spawner.spawn(
|
||||
"adder-collator-seconded",
|
||||
None,
|
||||
async move {
|
||||
if let Ok(res) = recv.await {
|
||||
if !matches!(
|
||||
|
||||
Reference in New Issue
Block a user