Simplify subsystem jobs (#2037)

* Simplify subsystem jobs

This pr simplifies the subsystem jobs interface. Instead of requiring an
extra message that is used to signal that a job should be ended, a job
now ends when the receiver returns `None`. Besides that it changes the
interface to enforce that messages to a job provide a relay parent.

* Drop ToJobTrait

* Remove FromJob

We always convert this message to FromJobCommand anyway.
This commit is contained in:
Bastian Köcher
2020-11-30 17:01:26 +01:00
committed by GitHub
parent 2d4aa3a42e
commit 536dceb4f6
9 changed files with 246 additions and 686 deletions
+73 -222
View File
@@ -26,7 +26,7 @@
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
@@ -56,6 +56,7 @@ use std::{
pin::Pin,
task::{Poll, Context},
time::Duration,
fmt,
};
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;
@@ -117,18 +118,11 @@ pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
) -> Result<RuntimeApiReceiver<Response>, Error>
where
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
FromJob: From<AllMessages>,
{
let (tx, rx) = oneshot::channel();
sender
.send(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
sender.send(AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))).into()).await?;
Ok(rx)
}
@@ -157,8 +151,7 @@ macro_rules! specialize_requests {
sender: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<$return_ty>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
FromJob: From<AllMessages>,
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
@@ -308,8 +301,7 @@ impl Validator {
mut sender: mpsc::Sender<FromJob>,
) -> Result<Self, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
FromJob: From<AllMessages>,
{
// Note: request_validators and request_session_index_for_child do not and cannot
// run concurrently: they both have a mutable handle to the same sender.
@@ -392,17 +384,6 @@ impl Validator {
}
}
/// ToJob is expected to be an enum declaring the set of messages of interest to a particular job.
///
/// Normally, this will be some subset of `Allmessages`, and a `Stop` variant.
pub trait ToJobTrait: TryFrom<AllMessages> {
/// The `Stop` variant of the ToJob enum.
const STOP: Self;
/// If the message variant contains its relay parent, return it here
fn relay_parent(&self) -> Option<Hash>;
}
struct AbortOnDrop(future::AbortHandle);
impl Drop for AbortOnDrop {
@@ -415,7 +396,6 @@ impl Drop for AbortOnDrop {
struct JobHandle<ToJob> {
_abort_handle: AbortOnDrop,
to_job: mpsc::Sender<ToJob>,
finished: oneshot::Receiver<()>,
}
impl<ToJob> JobHandle<ToJob> {
@@ -425,22 +405,6 @@ impl<ToJob> JobHandle<ToJob> {
}
}
impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// Stop this job gracefully.
///
/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
async fn stop(mut self) {
// we don't actually care if the message couldn't be sent
if self.to_job.send(ToJob::STOP).await.is_err() {
return;
}
let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
future::select(stop_timer, self.finished).await;
}
}
/// This module reexports Prometheus types and defines the [`Metrics`] trait.
pub mod metrics {
/// Reexport Substrate Prometheus types.
@@ -486,16 +450,29 @@ pub enum FromJobCommand {
SpawnBlocking(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
}
impl fmt::Debug for FromJobCommand {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::SendMessage(msg) => write!(fmt, "FromJobCommand::SendMessage({:?})", msg),
Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name),
Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name),
}
}
}
impl From<AllMessages> for FromJobCommand {
fn from(msg: AllMessages) -> Self {
Self::SendMessage(msg)
}
}
/// This trait governs jobs.
///
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
/// Other messages are passed along to and from the job via the overseer to other
/// subsystems.
/// Other messages are passed along to and from the job via the overseer to other subsystems.
pub trait JobTrait: Unpin {
/// Message type to the job. Typically a subset of AllMessages.
type ToJob: 'static + ToJobTrait + Send;
/// Message type from the job. Typically a subset of AllMessages.
type FromJob: 'static + Into<FromJobCommand> + Send;
/// Message type used to send messages to the job.
type ToJob: 'static + BoundToRelayParent + Send;
/// Job runtime error.
type Error: 'static + std::error::Error + Send;
/// Extra arguments this job needs to run properly.
@@ -512,13 +489,15 @@ pub trait JobTrait: Unpin {
/// Name of the job, i.e. `CandidateBackingJob`
const NAME: &'static str;
/// Run a job for the parent block indicated
/// Run a job for the given relay `parent`.
///
/// The job should be ended when `receiver` returns `None`.
fn run(
parent: Hash,
run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<Self::ToJob>,
sender: mpsc::Sender<Self::FromJob>,
sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
}
@@ -546,7 +525,7 @@ pub enum JobsError<JobError: 'static + std::error::Error> {
pub struct Jobs<Spawner, Job: JobTrait> {
spawner: Spawner,
running: HashMap<Hash, JobHandle<Job::ToJob>>,
outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJobCommand>>,
#[pin]
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
@@ -585,7 +564,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> {
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (finished_tx, finished) = oneshot::channel();
let err_tx = self.errors.clone();
@@ -609,23 +587,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}
});
let future = async move {
// job errors are already handled within the future, meaning
// that any errors here are due to the abortable mechanism.
// failure to abort isn't of interest.
let _ = future.await;
// transmission failure here is only possible if the receiver is closed,
// which means the handle is dropped, which means we don't care anymore
let _ = finished_tx.send(());
};
self.spawner.spawn(Job::NAME, future.boxed());
self.spawner.spawn(Job::NAME, future.map(drop).boxed());
self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle {
_abort_handle: AbortOnDrop(abort_handle),
to_job: to_job_tx,
finished,
};
self.running.insert(parent_hash, handle);
@@ -635,9 +603,7 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
/// Stop the job associated with this `parent_hash`.
pub async fn stop_job(&mut self, parent_hash: Hash) {
if let Some(handle) = self.running.remove(&parent_hash) {
handle.stop().await;
}
self.running.remove(&parent_hash);
}
/// Send a message to the appropriate job for this `parent_hash`.
@@ -656,7 +622,7 @@ where
Spawner: SpawnNamed,
Job: JobTrait,
{
type Item = Job::FromJob;
type Item = FromJobCommand;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
@@ -706,7 +672,7 @@ where
Context: SubsystemContext,
Job: 'static + JobTrait,
Job::RunArgs: Clone,
Job::ToJob: TryFrom<AllMessages> + TryFrom<<Context as SubsystemContext>::Message> + Sync,
Job::ToJob: From<<Context as SubsystemContext>::Message> + Sync,
{
/// Creates a new `Subsystem`.
pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self {
@@ -783,7 +749,7 @@ where
}
}
// if we have a channel on which to forward errors, do so
/// Forward a given error to the higher context using the given error channel.
async fn fwd_err(
hash: Option<Hash>,
err: JobsError<Job::Error>,
@@ -798,7 +764,9 @@ where
}
}
// handle an incoming message. return true if we should break afterwards.
/// Handle an incoming message.
///
/// Returns `true` when this job manager should shutdown.
async fn handle_incoming(
incoming: SubsystemResult<FromOverseer<Context::Message>>,
jobs: &mut Jobs<Spawner, Job>,
@@ -833,44 +801,12 @@ where
}
}
Ok(Signal(Conclude)) => {
// Breaking the loop ends fn run, which drops `jobs`, which immediately drops all ongoing work.
// We can afford to wait a little while to shut them all down properly before doing that.
//
// Forwarding the stream to a drain means we wait until all of the items in the stream
// have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`.
use futures::sink::drain;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
if let Err(e) = jobs
.running
.drain()
.map(|(_, handle)| handle.stop())
.collect::<FuturesUnordered<_>>()
.map(Ok)
.forward(drain())
.await
{
tracing::error!(
job = Job::NAME,
err = ?e,
"failed to stop a job on conclude signal",
);
let e = Error::from(e);
Self::fwd_err(None, JobsError::Utility(e), err_tx).await;
}
jobs.running.clear();
return true;
}
Ok(Communication { msg }) => {
if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
match to_job.relay_parent() {
Some(hash) => jobs.send_msg(hash, to_job).await,
None => tracing::debug!(
job = Job::NAME,
"trying to send a message to a job without specifying a relay parent",
),
}
jobs.send_msg(to_job.relay_parent(), to_job).await;
}
}
Ok(Signal(BlockFinalized(_))) => {}
@@ -889,11 +825,10 @@ where
// handle a command from a job.
async fn handle_from_job(
outgoing: Option<Job::FromJob>,
outgoing: Option<FromJobCommand>,
ctx: &mut Context,
) -> SubsystemResult<()> {
let cmd: FromJobCommand = outgoing.expect("the Jobs stream never ends; qed").into();
match cmd {
match outgoing.expect("the Jobs stream never ends; qed") {
FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await,
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?,
FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?,
@@ -907,10 +842,9 @@ impl<Spawner, Context, Job> Subsystem<Context> for JobManager<Spawner, Context,
where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
<Context as SubsystemContext>::Message: Into<Job::ToJob>,
Job: 'static + JobTrait + Send,
Job::RunArgs: Clone + Sync,
Job::ToJob: TryFrom<AllMessages> + Sync,
Job::ToJob: From<<Context as SubsystemContext>::Message> + Sync,
Job::Metrics: Sync,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
@@ -985,7 +919,7 @@ macro_rules! delegated_subsystem {
where
Spawner: Clone + $crate::reexports::SpawnNamed + Send + Unpin,
Context: $crate::reexports::SubsystemContext,
<Context as $crate::reexports::SubsystemContext>::Message: Into<$to_job>,
$to_job: From<<Context as $crate::reexports::SubsystemContext>::Message>,
{
#[doc = "Creates a new "]
#[doc = $subsystem_name]
@@ -1006,7 +940,7 @@ macro_rules! delegated_subsystem {
where
Spawner: $crate::reexports::SpawnNamed + Send + Clone + Unpin + 'static,
Context: $crate::reexports::SubsystemContext,
<Context as $crate::reexports::SubsystemContext>::Message: Into<$to_job>,
$to_job: From<<Context as $crate::reexports::SubsystemContext>::Message>,
{
fn start(self, ctx: Context) -> $crate::reexports::SpawnedSubsystem {
self.manager.start(ctx)
@@ -1061,22 +995,17 @@ impl<F: Future> Future for Timeout<F> {
#[cfg(test)]
mod tests {
use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait, FromJobCommand};
use super::*;
use thiserror::Error;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateSelectionMessage},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem,
};
use assert_matches::assert_matches;
use futures::{
channel::mpsc,
executor,
stream::{self, StreamExt},
future, Future, FutureExt, SinkExt,
};
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration};
use std::{pin::Pin, time::Duration};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
// you can leave the subsystem itself to the job manager.
@@ -1087,69 +1016,7 @@ mod tests {
// job structs are constructed within JobTrait::run
// most will want to retain the sender and receiver, as well as whatever other data they like
struct FakeCandidateSelectionJob {
receiver: mpsc::Receiver<ToJob>,
}
// ToJob implementations require the following properties:
//
// - have a Stop variant (to impl ToJobTrait)
// - impl ToJobTrait
// - impl TryFrom<AllMessages>
// - impl From<CandidateSelectionMessage> (from SubsystemContext::Message)
//
// Mostly, they are just a type-safe subset of AllMessages that this job is prepared to receive
enum ToJob {
CandidateSelection(CandidateSelectionMessage),
Stop,
}
impl ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateSelection(csm) => csm.relay_parent(),
Self::Stop => None,
}
}
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)),
_ => Err(()),
}
}
}
impl From<CandidateSelectionMessage> for ToJob {
fn from(csm: CandidateSelectionMessage) -> ToJob {
ToJob::CandidateSelection(csm)
}
}
// FromJob must be infallibly convertable into FromJobCommand.
//
// It exists to be a type-safe subset of FromJobCommand that this job is specified to send.
//
// Note: the Clone impl here is not generally required; it's just ueful for this test context because
// we include it in the RunArgs
#[derive(Clone)]
enum FromJob {
Test,
}
impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> FromJobCommand {
match from_job {
FromJob::Test => FromJobCommand::SendMessage(
AllMessages::CandidateSelection(CandidateSelectionMessage::default())
),
}
}
receiver: mpsc::Receiver<CandidateSelectionMessage>,
}
// Error will mostly be a wrapper to make the try operator more convenient;
@@ -1162,17 +1029,9 @@ mod tests {
}
impl JobTrait for FakeCandidateSelectionJob {
type ToJob = ToJob;
type FromJob = FromJob;
type ToJob = CandidateSelectionMessage;
type Error = Error;
// RunArgs can be anything that a particular job needs supplied from its external context
// in order to create the Job. In this case, they're a hashmap of parents to the mock outputs
// expected from that job.
//
// Note that it's not recommended to use something as heavy as a hashmap in production: the
// RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in
// an Arc. Within a testing context, that efficiency is less important.
type RunArgs = HashMap<Hash, Vec<FromJob>>;
type RunArgs = bool;
type Metrics = ();
const NAME: &'static str = "FakeCandidateSelectionJob";
@@ -1181,21 +1040,23 @@ mod tests {
//
// this function is in charge of creating and executing the job's main loop
fn run(
parent: Hash,
mut run_args: Self::RunArgs,
_: Hash,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCandidateSelectionJob { receiver };
// most jobs will have a request-response cycle at the heart of their run loop.
// however, in this case, we never receive valid messages, so we may as well
// just send all of our (mock) output messages now
let mock_output = run_args.remove(&parent).unwrap_or_default();
let mut stream = stream::iter(mock_output.into_iter().map(Ok));
sender.send_all(&mut stream).await?;
if run_args {
sender.send(FromJobCommand::SendMessage(
CandidateSelectionMessage::Invalid(
Default::default(),
Default::default(),
).into(),
)).await?;
}
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
@@ -1207,12 +1068,12 @@ mod tests {
impl FakeCandidateSelectionJob {
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.receiver.next().await {
match msg {
ToJob::CandidateSelection(_csm) => {
loop {
match self.receiver.next().await {
Some(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
ToJob::Stop => break,
None => break,
}
}
@@ -1228,7 +1089,7 @@ mod tests {
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
fn test_harness<T: Future<Output = ()>>(
run_args: HashMap<Hash, Vec<FromJob>>,
run_args: bool,
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
) {
let _ = env_logger::builder()
@@ -1259,13 +1120,8 @@ mod tests {
#[test]
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
let mut run_args = HashMap::new();
let _ = run_args.insert(
relay_parent.clone(),
vec![FromJob::Test],
);
test_harness(run_args, |mut overseer_handle, err_rx| async move {
test_harness(true, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
@@ -1293,13 +1149,8 @@ mod tests {
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let relay_parent = Hash::repeat_byte(0x01);
let mut run_args = HashMap::new();
let _ = run_args.insert(
relay_parent.clone(),
vec![FromJob::Test],
);
test_harness(run_args, |mut overseer_handle, err_rx| async move {
test_harness(true, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
@@ -1334,7 +1185,7 @@ mod tests {
let (context, _) = make_subsystem_context::<CandidateSelectionMessage, _>(pool.clone());
let SpawnedSubsystem { name, .. } =
FakeCandidateSelectionSubsystem::new(pool, HashMap::new(), ()).start(context);
FakeCandidateSelectionSubsystem::new(pool, false, ()).start(context);
assert_eq!(name, "FakeCandidateSelection");
}
}