From d6307a49780ffcd7407a353fba59675d7017bbb7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 28 Nov 2020 15:12:43 -0500 Subject: [PATCH] allow jobs to spawn sub-tasks (#2030) * allow jobs to spawn sub-tasks * fix fallout in subsytems --- polkadot/node/core/backing/src/lib.rs | 9 ++-- .../node/core/bitfield-signing/src/lib.rs | 10 ++-- .../node/core/candidate-selection/src/lib.rs | 10 ++-- polkadot/node/core/provisioner/src/lib.rs | 10 ++-- polkadot/node/subsystem-util/src/lib.rs | 49 +++++++++++++------ 5 files changed, 55 insertions(+), 33 deletions(-) diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 91ffbed9e5..000c121c42 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -52,6 +52,7 @@ use polkadot_node_subsystem_util::{ request_from_runtime, Validator, delegated_subsystem, + FromJobCommand, metrics::{self, prometheus}, }; use statement_table::{ @@ -197,9 +198,9 @@ enum FromJob { StatementDistribution(StatementDistributionMessage), } -impl From for AllMessages { - fn from(f: FromJob) -> Self { - match f { +impl From for FromJobCommand { + fn from(f: FromJob) -> FromJobCommand { + FromJobCommand::SendMessage(match f { FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg), FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg), FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg), @@ -207,7 +208,7 @@ impl From for AllMessages { FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg), FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg), FromJob::Provisioner(msg) => AllMessages::Provisioner(msg), - } + }) } } diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index 6bbd4f48ea..139fed6ddc 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{ errors::RuntimeApiError, }; use polkadot_node_subsystem_util::{ - self as util, JobManager, JobTrait, ToJobTrait, Validator, + self as util, JobManager, JobTrait, ToJobTrait, Validator, FromJobCommand, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; @@ -90,14 +90,14 @@ pub enum FromJob { RuntimeApi(RuntimeApiMessage), } -impl From for AllMessages { - fn from(from_job: FromJob) -> AllMessages { - match from_job { +impl From for FromJobCommand { + fn from(from_job: FromJob) -> FromJobCommand { + FromJobCommand::SendMessage(match from_job { FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm), FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm), FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm), FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram), - } + }) } } diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 64697a69a5..dc2692431c 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{ }, }; use polkadot_node_subsystem_util::{ - self as util, delegated_subsystem, JobTrait, ToJobTrait, + self as util, delegated_subsystem, JobTrait, ToJobTrait, FromJobCommand, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV}; @@ -89,12 +89,12 @@ enum FromJob { Collator(CollatorProtocolMessage), } -impl From for AllMessages { - fn from(from_job: FromJob) -> AllMessages { - match from_job { +impl From for FromJobCommand { + fn from(from_job: FromJob) -> FromJobCommand { + FromJobCommand::SendMessage(match from_job { FromJob::Backing(msg) => AllMessages::CandidateBacking(msg), FromJob::Collator(msg) => AllMessages::CollatorProtocol(msg), - } + }) } } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 1a33c3fdb0..91655951cc 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -33,7 +33,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util::{ self as util, - delegated_subsystem, + delegated_subsystem, FromJobCommand, request_availability_cores, request_persisted_validation_data, JobTrait, ToJobTrait, metrics::{self, prometheus}, }; @@ -98,12 +98,12 @@ enum FromJob { Runtime(RuntimeApiMessage), } -impl From for AllMessages { - fn from(from_job: FromJob) -> AllMessages { - match from_job { +impl From for FromJobCommand { + fn from(from_job: FromJob) -> FromJobCommand { + FromJobCommand::SendMessage(match from_job { FromJob::ChainApi(cam) => AllMessages::ChainApi(cam), FromJob::Runtime(ram) => AllMessages::RuntimeApi(ram), - } + }) } } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index e8794e090d..30cd107463 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -476,6 +476,16 @@ pub mod metrics { } } +/// Commands from a job to the broader subsystem. +pub enum FromJobCommand { + /// Send a message to another subsystem. + SendMessage(AllMessages), + /// Spawn a child task on the executor. + Spawn(&'static str, Pin + Send>>), + /// Spawn a blocking child task on the executor's dedicated thread pool. + SpawnBlocking(&'static str, Pin + Send>>), +} + /// This trait governs jobs. /// /// Jobs are instantiated and killed automatically on appropriate overseer messages. @@ -485,7 +495,7 @@ pub trait JobTrait: Unpin { /// Message type to the job. Typically a subset of AllMessages. type ToJob: 'static + ToJobTrait + Send; /// Message type from the job. Typically a subset of AllMessages. - type FromJob: 'static + Into + Send; + type FromJob: 'static + Into + Send; /// Job runtime error. type Error: 'static + std::error::Error + Send; /// Extra arguments this job needs to run properly. @@ -752,8 +762,11 @@ where ).await { break }, - outgoing = jobs.next().fuse() => - Self::handle_outgoing(outgoing, &mut ctx).await, + outgoing = jobs.next().fuse() => { + if let Err(e) = Self::handle_from_job(outgoing, &mut ctx).await { + tracing::warn!(err = ?e, "failed to handle command from job"); + } + } complete => break, } } @@ -863,13 +876,19 @@ where false } - // handle an outgoing message. - async fn handle_outgoing( + // handle a command from a job. + async fn handle_from_job( outgoing: Option, ctx: &mut Context, - ) { - let msg = outgoing.expect("the Jobs stream never ends; qed"); - ctx.send_message(msg.into()).await; + ) -> SubsystemResult<()> { + let cmd: FromJobCommand = outgoing.expect("the Jobs stream never ends; qed").into(); + match cmd { + FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await, + FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?, + FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?, + } + + Ok(()) } } @@ -1031,7 +1050,7 @@ impl Future for Timeout { #[cfg(test)] mod tests { - use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait}; + use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait, FromJobCommand}; use thiserror::Error; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateSelectionMessage}, @@ -1101,9 +1120,9 @@ mod tests { } } - // FromJob must be infallibly convertable into AllMessages. + // FromJob must be infallibly convertable into FromJobCommand. // - // It exists to be a type-safe subset of AllMessages that this job is specified to send. + // It exists to be a type-safe subset of FromJobCommand that this job is specified to send. // // Note: the Clone impl here is not generally required; it's just ueful for this test context because // we include it in the RunArgs @@ -1112,10 +1131,12 @@ mod tests { Test, } - impl From for AllMessages { - fn from(from_job: FromJob) -> AllMessages { + impl From for FromJobCommand { + fn from(from_job: FromJob) -> FromJobCommand { match from_job { - FromJob::Test => AllMessages::CandidateSelection(CandidateSelectionMessage::default()), + FromJob::Test => FromJobCommand::SendMessage( + AllMessages::CandidateSelection(CandidateSelectionMessage::default()) + ), } } }