allow jobs to spawn sub-tasks (#2030)

* allow jobs to spawn sub-tasks

* fix fallout in subsytems
This commit is contained in:
Robert Habermeier
2020-11-28 15:12:43 -05:00
committed by GitHub
parent 0c84214814
commit d6307a4978
5 changed files with 55 additions and 33 deletions
+5 -4
View File
@@ -52,6 +52,7 @@ use polkadot_node_subsystem_util::{
request_from_runtime, request_from_runtime,
Validator, Validator,
delegated_subsystem, delegated_subsystem,
FromJobCommand,
metrics::{self, prometheus}, metrics::{self, prometheus},
}; };
use statement_table::{ use statement_table::{
@@ -197,9 +198,9 @@ enum FromJob {
StatementDistribution(StatementDistributionMessage), StatementDistribution(StatementDistributionMessage),
} }
impl From<FromJob> for AllMessages { impl From<FromJob> for FromJobCommand {
fn from(f: FromJob) -> Self { fn from(f: FromJob) -> FromJobCommand {
match f { FromJobCommand::SendMessage(match f {
FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg), FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg), FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg), FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
@@ -207,7 +208,7 @@ impl From<FromJob> for AllMessages {
FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg), FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg), FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
FromJob::Provisioner(msg) => AllMessages::Provisioner(msg), FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
} })
} }
} }
@@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
errors::RuntimeApiError, errors::RuntimeApiError,
}; };
use polkadot_node_subsystem_util::{ use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, ToJobTrait, Validator, self as util, JobManager, JobTrait, ToJobTrait, Validator, FromJobCommand,
metrics::{self, prometheus}, metrics::{self, prometheus},
}; };
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
@@ -90,14 +90,14 @@ pub enum FromJob {
RuntimeApi(RuntimeApiMessage), RuntimeApi(RuntimeApiMessage),
} }
impl From<FromJob> for AllMessages { impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> AllMessages { fn from(from_job: FromJob) -> FromJobCommand {
match from_job { FromJobCommand::SendMessage(match from_job {
FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm), FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm),
FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm), FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm),
FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm), FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm),
FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram), FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram),
} })
} }
} }
@@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
}, },
}; };
use polkadot_node_subsystem_util::{ use polkadot_node_subsystem_util::{
self as util, delegated_subsystem, JobTrait, ToJobTrait, self as util, delegated_subsystem, JobTrait, ToJobTrait, FromJobCommand,
metrics::{self, prometheus}, metrics::{self, prometheus},
}; };
use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV}; use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV};
@@ -89,12 +89,12 @@ enum FromJob {
Collator(CollatorProtocolMessage), Collator(CollatorProtocolMessage),
} }
impl From<FromJob> for AllMessages { impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> AllMessages { fn from(from_job: FromJob) -> FromJobCommand {
match from_job { FromJobCommand::SendMessage(match from_job {
FromJob::Backing(msg) => AllMessages::CandidateBacking(msg), FromJob::Backing(msg) => AllMessages::CandidateBacking(msg),
FromJob::Collator(msg) => AllMessages::CollatorProtocol(msg), FromJob::Collator(msg) => AllMessages::CollatorProtocol(msg),
} })
} }
} }
+5 -5
View File
@@ -33,7 +33,7 @@ use polkadot_node_subsystem::{
}; };
use polkadot_node_subsystem_util::{ use polkadot_node_subsystem_util::{
self as util, self as util,
delegated_subsystem, delegated_subsystem, FromJobCommand,
request_availability_cores, request_persisted_validation_data, JobTrait, ToJobTrait, request_availability_cores, request_persisted_validation_data, JobTrait, ToJobTrait,
metrics::{self, prometheus}, metrics::{self, prometheus},
}; };
@@ -98,12 +98,12 @@ enum FromJob {
Runtime(RuntimeApiMessage), Runtime(RuntimeApiMessage),
} }
impl From<FromJob> for AllMessages { impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> AllMessages { fn from(from_job: FromJob) -> FromJobCommand {
match from_job { FromJobCommand::SendMessage(match from_job {
FromJob::ChainApi(cam) => AllMessages::ChainApi(cam), FromJob::ChainApi(cam) => AllMessages::ChainApi(cam),
FromJob::Runtime(ram) => AllMessages::RuntimeApi(ram), FromJob::Runtime(ram) => AllMessages::RuntimeApi(ram),
} })
} }
} }
+35 -14
View File
@@ -476,6 +476,16 @@ pub mod metrics {
} }
} }
/// Commands from a job to the broader subsystem.
pub enum FromJobCommand {
/// Send a message to another subsystem.
SendMessage(AllMessages),
/// Spawn a child task on the executor.
Spawn(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
/// Spawn a blocking child task on the executor's dedicated thread pool.
SpawnBlocking(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
}
/// This trait governs jobs. /// This trait governs jobs.
/// ///
/// Jobs are instantiated and killed automatically on appropriate overseer messages. /// Jobs are instantiated and killed automatically on appropriate overseer messages.
@@ -485,7 +495,7 @@ pub trait JobTrait: Unpin {
/// Message type to the job. Typically a subset of AllMessages. /// Message type to the job. Typically a subset of AllMessages.
type ToJob: 'static + ToJobTrait + Send; type ToJob: 'static + ToJobTrait + Send;
/// Message type from the job. Typically a subset of AllMessages. /// Message type from the job. Typically a subset of AllMessages.
type FromJob: 'static + Into<AllMessages> + Send; type FromJob: 'static + Into<FromJobCommand> + Send;
/// Job runtime error. /// Job runtime error.
type Error: 'static + std::error::Error + Send; type Error: 'static + std::error::Error + Send;
/// Extra arguments this job needs to run properly. /// Extra arguments this job needs to run properly.
@@ -752,8 +762,11 @@ where
).await { ).await {
break break
}, },
outgoing = jobs.next().fuse() => outgoing = jobs.next().fuse() => {
Self::handle_outgoing(outgoing, &mut ctx).await, if let Err(e) = Self::handle_from_job(outgoing, &mut ctx).await {
tracing::warn!(err = ?e, "failed to handle command from job");
}
}
complete => break, complete => break,
} }
} }
@@ -863,13 +876,19 @@ where
false false
} }
// handle an outgoing message. // handle a command from a job.
async fn handle_outgoing( async fn handle_from_job(
outgoing: Option<Job::FromJob>, outgoing: Option<Job::FromJob>,
ctx: &mut Context, ctx: &mut Context,
) { ) -> SubsystemResult<()> {
let msg = outgoing.expect("the Jobs stream never ends; qed"); let cmd: FromJobCommand = outgoing.expect("the Jobs stream never ends; qed").into();
ctx.send_message(msg.into()).await; match cmd {
FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await,
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?,
FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?,
}
Ok(())
} }
} }
@@ -1031,7 +1050,7 @@ impl<F: Future> Future for Timeout<F> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait}; use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait, FromJobCommand};
use thiserror::Error; use thiserror::Error;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::{AllMessages, CandidateSelectionMessage}, messages::{AllMessages, CandidateSelectionMessage},
@@ -1101,9 +1120,9 @@ mod tests {
} }
} }
// FromJob must be infallibly convertable into AllMessages. // FromJob must be infallibly convertable into FromJobCommand.
// //
// It exists to be a type-safe subset of AllMessages that this job is specified to send. // It exists to be a type-safe subset of FromJobCommand that this job is specified to send.
// //
// Note: the Clone impl here is not generally required; it's just ueful for this test context because // Note: the Clone impl here is not generally required; it's just ueful for this test context because
// we include it in the RunArgs // we include it in the RunArgs
@@ -1112,10 +1131,12 @@ mod tests {
Test, Test,
} }
impl From<FromJob> for AllMessages { impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> AllMessages { fn from(from_job: FromJob) -> FromJobCommand {
match from_job { match from_job {
FromJob::Test => AllMessages::CandidateSelection(CandidateSelectionMessage::default()), FromJob::Test => FromJobCommand::SendMessage(
AllMessages::CandidateSelection(CandidateSelectionMessage::default())
),
} }
} }
} }