Add test suite and minor refinements to the utility subsystem (#1403)

* get conclude signal working properly; don't allocate a vector

* wip: add test suite / example / explanation for using utility subsystem

Unfortunately, the test fails right now for reasons which seem
very odd. Just have to keep poking at it.

* explicitly import everything

* fix subsystem-util test

The root problem here was two-fold:

- there was a circular dependency from subsystem -> test-helpers/subsystem ->
  subsystem
- cfg(test) doesn't propagate between crates

The solution: move the subsystem test helpers into a sub-module
within subsystem. Publicly export them from the previous location
so no other code breaks.

Doing this has an additional benefit: it ensures that no production
code can ever accidentally use the subsystem helpers, as they are compile-
gated on cfg(test).

* fully commit to moving test helpers into a subsystem module

* add some more tests

* get rid of log tests in favor of real error forwarding

It's not obvious whether we'll ever really want to chase down
these errors outside a testing context, but having the capability
won't hurt.

* fix issue which caused test to hang on osx

* only require that job errors are PartialEq when testing

also fix polkadot-node-core-backing tests

* get rid of any notion of partialeq

* rethink testing

Combine tests of starting and stopping job: leaving a test executor
with a job running was pretty clearly the cause of the sometimes-hang.

Also, add a timeout so tests _can't_ hang anymore; they just fail
after a while.

* rename fwd_errors -> forward_errors

* warn on error propagation failure

* fix unused import leftover from merge

* derive eq for subsystemerror
This commit is contained in:
Peter Goodspeed-Niklaus
2020-07-21 02:35:14 +02:00
committed by GitHub
parent 9aadb5d499
commit 5cfcc8446c
16 changed files with 412 additions and 88 deletions
+2 -15
View File
@@ -4457,7 +4457,6 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-core",
"sp-runtime",
@@ -4498,7 +4497,6 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-statement-table",
"polkadot-subsystem-test-helpers",
"sc-client-api",
"sc-keystore",
"sp-api",
@@ -4549,12 +4547,14 @@ dependencies = [
name = "polkadot-node-subsystem"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"derive_more 0.99.9",
"futures 0.3.5",
"futures-timer 3.0.2",
"log 0.4.8",
"parity-scale-codec",
"parking_lot 0.10.2",
"pin-project",
"polkadot-node-primitives",
"polkadot-primitives",
@@ -4615,7 +4615,6 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-core",
"sp-runtime",
@@ -4970,7 +4969,6 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sp-core",
"sp-keyring",
"sp-runtime",
@@ -4987,17 +4985,6 @@ dependencies = [
"sp-core",
]
[[package]]
name = "polkadot-subsystem-test-helpers"
version = "0.1.0"
dependencies = [
"async-trait",
"futures 0.3.5",
"parking_lot 0.10.2",
"polkadot-node-subsystem",
"sp-core",
]
[[package]]
name = "polkadot-test-runtime"
version = "0.8.17"
-1
View File
@@ -52,7 +52,6 @@ members = [
"node/service",
"node/core/backing",
"node/subsystem",
"node/test-helpers/subsystem",
"node/test-service",
"parachain/test-parachains",
+1 -1
View File
@@ -22,5 +22,5 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.5", features = ["thread-pool"] }
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] }
+4 -4
View File
@@ -745,7 +745,7 @@ where
/// Run this subsystem
pub async fn run(ctx: Context, keystore: KeyStorePtr, spawner: Spawner) {
<Manager<Spawner, Context>>::run(ctx, keystore, spawner).await
<Manager<Spawner, Context>>::run(ctx, keystore, spawner, None).await
}
}
@@ -895,13 +895,13 @@ mod tests {
}
struct TestHarness {
virtual_overseer: subsystem_test::TestSubsystemContextHandle<CandidateBackingMessage>,
virtual_overseer: polkadot_subsystem::test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>,
}
fn test_harness<T: Future<Output=()>>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone());
let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone());
@@ -959,7 +959,7 @@ mod tests {
// Tests that the subsystem performs actions that are requied on startup.
async fn test_startup(
virtual_overseer: &mut subsystem_test::TestSubsystemContextHandle<CandidateBackingMessage>,
virtual_overseer: &mut polkadot_subsystem::test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>,
test_state: &TestState,
) {
// Start work on some new parent.
-1
View File
@@ -18,6 +18,5 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+4 -4
View File
@@ -531,7 +531,7 @@ mod tests {
use assert_matches::assert_matches;
use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
use subsystem_test::{SingleItemSink, SingleItemStream};
use polkadot_subsystem::test_helpers::{SingleItemSink, SingleItemStream};
// The subsystem's view of the network - only supports a single call to `event_stream`.
struct TestNetwork {
@@ -550,7 +550,7 @@ mod tests {
TestNetwork,
TestNetworkHandle,
) {
let (net_tx, net_rx) = subsystem_test::single_item_sink();
let (net_tx, net_rx) = polkadot_subsystem::test_helpers::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded();
(
@@ -631,13 +631,13 @@ mod tests {
struct TestHarness {
network_handle: TestNetworkHandle,
virtual_overseer: subsystem_test::TestSubsystemContextHandle<NetworkBridgeMessage>,
virtual_overseer: polkadot_subsystem::test_helpers::TestSubsystemContextHandle<NetworkBridgeMessage>,
}
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (network, network_handle) = new_test_network();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let network_bridge = run_network(
network,
@@ -18,6 +18,6 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] }
@@ -620,7 +620,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
@@ -700,7 +700,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
@@ -778,7 +778,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
@@ -850,7 +850,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request before peer B.
@@ -938,7 +938,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
@@ -1001,7 +1001,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
@@ -1062,7 +1062,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
@@ -1120,7 +1120,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let max_plausibly_awaited = n_validators * 2;
@@ -1205,7 +1205,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let pov_hash = make_pov(vec![1, 2, 3]).hash();
@@ -1267,7 +1267,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let pov_hash = make_pov(vec![1, 2, 3]).hash();
@@ -1344,7 +1344,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
@@ -1427,7 +1427,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
@@ -21,7 +21,7 @@ indexmap = "1.4.0"
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = ["test-helpers"] }
assert_matches = "1.3.0"
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -1213,7 +1213,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let peer = PeerId::random();
executor::block_on(async move {
@@ -1305,7 +1305,7 @@ mod tests {
].into_iter().collect();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let statement = {
+10
View File
@@ -13,6 +13,7 @@ futures-timer = "3.0.2"
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
log = "0.4.8"
parity-scale-codec = "1.3.0"
parking_lot = { version = "0.10.0", optional = true }
pin-project = "0.4.22"
polkadot-node-primitives = { path = "../primitives" }
polkadot-primitives = { path = "../../primitives" }
@@ -20,3 +21,12 @@ polkadot-statement-table = { path = "../../statement-table" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
streamunordered = "0.5.1"
[dev-dependencies]
assert_matches = "1.3.0"
async-trait = "0.1"
futures = { version = "0.3.5", features = ["thread-pool"] }
parking_lot = "0.10.0"
[features]
test-helpers = [ "parking_lot" ]
+3 -1
View File
@@ -35,6 +35,8 @@ use crate::messages::AllMessages;
pub mod messages;
pub mod util;
#[cfg(any(test, feature = "test-helpers"))]
pub mod test_helpers;
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
@@ -71,7 +73,7 @@ pub enum FromOverseer<M> {
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
+6
View File
@@ -408,4 +408,10 @@ pub enum AllMessages {
AvailabilityStore(AvailabilityStoreMessage),
/// Message for the network bridge subsystem.
NetworkBridge(NetworkBridgeMessage),
/// Test message
///
/// This variant is only valid while testing, but makes the process of testing the
/// subsystem job manager much simpler.
#[cfg(test)]
Test(String),
}
@@ -16,8 +16,8 @@
//! Utilities for testing subsystems.
use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use polkadot_subsystem::messages::AllMessages;
use crate::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use crate::messages::AllMessages;
use futures::prelude::*;
use futures::channel::mpsc;
+364 -30
View File
@@ -22,7 +22,7 @@
use crate::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{
channel::{mpsc, oneshot},
@@ -67,12 +67,20 @@ pub enum Error {
/// Attempted to send on a MPSC channel which has been canceled
#[from]
Mpsc(mpsc::SendError),
/// A subsystem error
#[from]
Subsystem(SubsystemError),
/// The type system wants this even though it doesn't make sense
#[from]
Infallible(std::convert::Infallible),
/// Attempted to convert from an AllMessages to a FromJob, and failed.
SenderConversion(String),
/// The local node is not a validator.
NotAValidator,
/// The desired job is not present in the jobs list.
JobNotFound(Hash),
/// Already forwarding errors to another sender
AlreadyForwarding,
}
/// Request some data from the `RuntimeApi`.
@@ -262,7 +270,7 @@ pub trait ToJobTrait: TryFrom<AllMessages> {
}
/// A JobHandle manages a particular job for a subsystem.
pub struct JobHandle<ToJob> {
struct JobHandle<ToJob> {
abort_handle: future::AbortHandle,
to_job: mpsc::Sender<ToJob>,
finished: oneshot::Receiver<()>,
@@ -271,23 +279,23 @@ pub struct JobHandle<ToJob> {
impl<ToJob> JobHandle<ToJob> {
/// Send a message to the job.
pub async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
self.to_job.send(msg).await.map_err(Into::into)
}
/// Abort the job without waiting for a graceful shutdown
pub fn abort(self) {
self.abort_handle.abort();
}
}
impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// Stop this job gracefully.
///
/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
pub async fn stop(mut self) {
async fn stop(mut self) {
// we don't actually care if the message couldn't be sent
let _ = self.to_job.send(ToJob::STOP).await;
if let Err(_) = self.to_job.send(ToJob::STOP).await {
// no need to wait further here: the job is either stalled or
// disconnected, and in either case, we can just abort it immediately
self.abort_handle.abort();
return;
}
let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
match future::select(stop_timer, self.finished).await {
@@ -310,7 +318,7 @@ pub trait JobTrait: Unpin {
/// Message type from the job. Typically a subset of AllMessages.
type FromJob: 'static + Into<AllMessages> + Send;
/// Job runtime error.
type Error: std::fmt::Debug;
type Error: 'static + std::fmt::Debug + Send;
/// Extra arguments this job needs to run properly.
///
/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
@@ -323,8 +331,8 @@ pub trait JobTrait: Unpin {
fn run(
parent: Hash,
run_args: Self::RunArgs,
rx_to: mpsc::Receiver<Self::ToJob>,
tx_from: mpsc::Sender<Self::FromJob>,
receiver: mpsc::Receiver<Self::ToJob>,
sender: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
/// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job
@@ -342,6 +350,18 @@ pub trait JobTrait: Unpin {
}
}
/// Error which can be returned by the jobs manager
///
/// Wraps the utility error type and the job-specific error
#[derive(Debug, derive_more::From)]
pub enum JobsError<JobError> {
/// utility error
#[from]
Utility(Error),
/// internal job error
Job(JobError),
}
/// Jobs manager for a subsystem
///
/// - Spawns new jobs for a given relay-parent on demand.
@@ -356,9 +376,10 @@ pub struct Jobs<Spawner, Job: JobTrait> {
#[pin]
outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
}
impl<Spawner: SpawnNamed, Job: JobTrait> Jobs<Spawner, Job> {
impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
/// Create a new Jobs manager which handles spawning appropriate jobs.
pub fn new(spawner: Spawner) -> Self {
Self {
@@ -366,15 +387,31 @@ impl<Spawner: SpawnNamed, Job: JobTrait> Jobs<Spawner, Job> {
running: HashMap::new(),
outgoing_msgs: StreamUnordered::new(),
job: std::marker::PhantomData,
errors: None,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> {
if self.errors.is_some() { return Err(Error::AlreadyForwarding) }
self.errors = Some(tx);
Ok(())
}
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs) -> Result<(), Error> {
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (finished_tx, finished) = oneshot::channel();
// clone the error transmitter to move into the future
let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(parent_hash, run_args, to_job_rx, from_job_tx).await {
log::error!(
@@ -383,12 +420,26 @@ impl<Spawner: SpawnNamed, Job: JobTrait> Jobs<Spawner, Job> {
parent_hash,
e,
);
if let Some(mut err_tx) = err_tx {
// if we can't send the notification of error on the error channel, then
// there's no point trying to propagate this error onto the channel too
// all we can do is warn that error propagatio has failed
if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await {
log::warn!("failed to forward error: {:?}", e);
}
}
}
});
// discard output
// the spawn mechanism requires that the spawned future has no output
let future = async move {
// job errors are already handled within the future, meaning
// that any errors here are due to the abortable mechanism.
// failure to abort isn't of interest.
let _ = future.await;
// transmission failure here is only possible if the receiver is closed,
// which means the handle is dropped, which means we don't care anymore
let _ = finished_tx.send(());
};
self.spawner.spawn(Job::NAME, future.boxed());
@@ -472,13 +523,14 @@ pub struct JobManager<Spawner, Context, Job: JobTrait> {
run_args: Job::RunArgs,
context: std::marker::PhantomData<Context>,
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
}
impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
where
Spawner: SpawnNamed + Clone + Send + Unpin,
Context: SubsystemContext,
Job: JobTrait,
Job: 'static + JobTrait,
Job::RunArgs: Clone,
Job::ToJob: TryFrom<AllMessages> + TryFrom<<Context as SubsystemContext>::Message> + Sync,
{
@@ -489,9 +541,22 @@ where
run_args,
context: std::marker::PhantomData,
job: std::marker::PhantomData,
errors: None,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> {
if self.errors.is_some() { return Err(Error::AlreadyForwarding) }
self.errors = Some(tx);
Ok(())
}
/// Run this subsystem
///
/// Conceptually, this is very simple: it just loops forever.
@@ -500,23 +565,41 @@ where
/// - On other incoming messages, if they can be converted into Job::ToJob and
/// include a hash, then they're forwarded to the appropriate individual job.
/// - On outgoing messages from the jobs, it forwards them to the overseer.
pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner) {
///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner, mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) {
let mut jobs = Jobs::new(spawner.clone());
if let Some(ref err_tx) = err_tx {
jobs.forward_errors(err_tx.clone()).expect("we never call this twice in this context; qed");
}
loop {
select! {
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args).await { break },
outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx).await { break },
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &mut err_tx).await { break },
outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break },
complete => break,
}
}
}
// if we have a channel on which to forward errors, do so
async fn fwd_err(hash: Option<Hash>, err: JobsError<Job::Error>, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) {
if let Some(err_tx) = err_tx {
// if we can't send on the error transmission channel, we can't do anything useful about it
// still, we can at least log the failure
if let Err(e) = err_tx.send((hash, err)).await {
log::warn!("failed to forward error: {:?}", e);
}
}
}
// handle an incoming message. return true if we should break afterwards.
async fn handle_incoming(
incoming: SubsystemResult<FromOverseer<Context::Message>>,
jobs: &mut Jobs<Spawner, Job>,
run_args: &Job::RunArgs,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>
) -> bool {
use crate::FromOverseer::{Communication, Signal};
use crate::OverseerSignal::{Conclude, StartWork, StopWork};
@@ -525,12 +608,14 @@ where
Ok(Signal(StartWork(hash))) => {
if let Err(e) = jobs.spawn_job(hash, run_args.clone()) {
log::error!("Failed to spawn a job: {:?}", e);
Self::fwd_err(Some(hash), e.into(), err_tx).await;
return true;
}
}
Ok(Signal(StopWork(hash))) => {
if let Err(e) = jobs.stop_job(hash).await {
log::error!("Failed to stop a job: {:?}", e);
Self::fwd_err(Some(hash), e.into(), err_tx).await;
return true;
}
}
@@ -540,17 +625,21 @@ where
//
// Forwarding the stream to a drain means we wait until all of the items in the stream
// have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`.
use futures::sink::drain;
use futures::stream::StreamExt;
use futures::stream::FuturesUnordered;
let unordered = jobs.running
if let Err(e) = jobs.running
.drain()
.map(|(_, handle)| handle.stop())
.collect::<FuturesUnordered<_>>();
// now wait for all the futures to complete; collect a vector of their results
// this is strictly less efficient than draining them into oblivion, but this compiles, and that doesn't
// https://github.com/paritytech/polkadot/pull/1376#pullrequestreview-446488645
let _ = async move { unordered.collect::<Vec<_>>() }.await;
.collect::<FuturesUnordered<_>>()
.map(Ok)
.forward(drain())
.await
{
log::error!("failed to stop all jobs on conclude signal: {:?}", e);
Self::fwd_err(None, Error::from(e).into(), err_tx).await;
}
return true;
}
@@ -560,12 +649,14 @@ where
Some(hash) => {
if let Err(err) = jobs.send_msg(hash, to_job).await {
log::error!("Failed to send a message to a job: {:?}", err);
Self::fwd_err(Some(hash), err.into(), err_tx).await;
return true;
}
}
None => {
if let Err(err) = Job::handle_unanchored_msg(to_job) {
log::error!("Failed to handle unhashed message: {:?}", err);
Self::fwd_err(None, JobsError::Job(err), err_tx).await;
return true;
}
}
@@ -574,6 +665,7 @@ where
}
Err(err) => {
log::error!("error receiving message from subsystem context: {:?}", err);
Self::fwd_err(None, Error::from(err).into(), err_tx).await;
return true;
}
}
@@ -581,11 +673,12 @@ where
}
// handle an outgoing message. return true if we should break afterwards.
async fn handle_outgoing(outgoing: Option<Job::FromJob>, ctx: &mut Context) -> bool {
async fn handle_outgoing(outgoing: Option<Job::FromJob>, ctx: &mut Context, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) -> bool {
match outgoing {
Some(msg) => {
// discard errors when sending the message upstream
let _ = ctx.send_message(msg.into()).await;
if let Err(e) = ctx.send_message(msg.into()).await {
Self::fwd_err(None, Error::from(e).into(), err_tx).await;
}
}
None => return true,
}
@@ -598,16 +691,18 @@ where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
<Context as SubsystemContext>::Message: Into<Job::ToJob>,
Job: JobTrait + Send,
Job: 'static + JobTrait + Send,
Job::RunArgs: Clone + Sync,
Job::ToJob: TryFrom<AllMessages> + Sync,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let spawner = self.spawner.clone();
let run_args = self.run_args.clone();
let errors = self.errors;
let future = Box::pin(async move {
Self::run(ctx, run_args, spawner).await;
Self::run(ctx, run_args, spawner, errors).await;
});
SpawnedSubsystem {
@@ -616,3 +711,242 @@ where
}
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use crate::{
messages::{AllMessages, CandidateSelectionMessage},
test_helpers::{self, make_subsystem_context},
util::{
self,
JobsError,
JobManager,
JobTrait,
ToJobTrait,
},
FromOverseer,
OverseerSignal,
};
use futures::{
channel::mpsc,
executor,
Future,
FutureExt,
stream::{self, StreamExt},
SinkExt,
};
use futures_timer::Delay;
use polkadot_primitives::v1::Hash;
use std::{
collections::HashMap,
convert::TryFrom,
pin::Pin,
time::Duration,
};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
// you can leave the subsystem itself to the job manager.
// for purposes of demonstration, we're going to whip up a fake subsystem.
// this will 'select' candidates which are pre-loaded in the job
// job structs are constructed within JobTrait::run
// most will want to retain the sender and receiver, as well as whatever other data they like
struct FakeCandidateSelectionJob {
receiver: mpsc::Receiver<ToJob>,
}
// ToJob implementations require the following properties:
//
// - have a Stop variant (to impl ToJobTrait)
// - impl ToJobTrait
// - impl TryFrom<AllMessages>
// - impl From<CandidateSelectionMessage> (from SubsystemContext::Message)
//
// Mostly, they are just a type-safe subset of AllMessages that this job is prepared to receive
enum ToJob {
CandidateSelection(CandidateSelectionMessage),
Stop,
}
impl ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateSelection(csm) => csm.relay_parent(),
Self::Stop => None,
}
}
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)),
_ => Err(())
}
}
}
impl From<CandidateSelectionMessage> for ToJob {
fn from(csm: CandidateSelectionMessage) -> ToJob {
ToJob::CandidateSelection(csm)
}
}
// FromJob must be infallibly convertable into AllMessages.
//
// It exists to be a type-safe subset of AllMessages that this job is specified to send.
//
// Note: the Clone impl here is not generally required; it's just ueful for this test context because
// we include it in the RunArgs
#[derive(Clone)]
enum FromJob {
Test(String),
}
impl From<FromJob> for AllMessages {
fn from(from_job: FromJob) -> AllMessages {
match from_job {
FromJob::Test(s) => AllMessages::Test(s),
}
}
}
// Error will mostly be a wrapper to make the try operator more convenient;
// deriving From implementations for most variants is recommended.
// It must implement Debug for logging.
#[derive(Debug, derive_more::From)]
enum Error {
#[from]
Sending(mpsc::SendError)
}
impl JobTrait for FakeCandidateSelectionJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
// RunArgs can be anything that a particular job needs supplied from its external context
// in order to create the Job. In this case, they're a hashmap of parents to the mock outputs
// expected from that job.
//
// Note that it's not recommended to use something as heavy as a hashmap in production: the
// RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in
// an Arc. Within a testing context, that efficiency is less important.
type RunArgs = HashMap<Hash, Vec<FromJob>>;
const NAME: &'static str = "FakeCandidateSelectionJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run(
parent: Hash,
mut run_args: Self::RunArgs,
receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCandidateSelectionJob {
receiver,
};
// most jobs will have a request-response cycle at the heart of their run loop.
// however, in this case, we never receive valid messages, so we may as well
// just send all of our (mock) output messages now
let mock_output = run_args.remove(&parent).unwrap_or_default();
let mut stream = stream::iter(mock_output.into_iter().map(Ok));
sender.send_all(&mut stream).await?;
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
}.boxed()
}
}
impl FakeCandidateSelectionJob {
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.receiver.next().await {
match msg {
ToJob::CandidateSelection(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
ToJob::Stop => break,
}
}
Ok(())
}
}
// with the job defined, it's straightforward to get a subsystem implementation.
type FakeCandidateSelectionSubsystem<Spawner, Context> = JobManager<Spawner, Context, FakeCandidateSelectionJob>;
// this type lets us pretend to be the overseer
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
fn test_harness<T: Future<Output=()>>(run_args: HashMap<Hash, Vec<FromJob>>, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16);
let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, pool, Some(err_tx));
let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(test_future);
futures::pin_mut!(subsystem);
futures::pin_mut!(timeout);
executor::block_on(async move {
futures::select! {
_ = test_future.fuse() => (),
_ = subsystem.fuse() => (),
_ = timeout.fuse() => panic!("test timed out instead of completing"),
}
});
}
#[test]
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
let mut run_args = HashMap::new();
let test_message = format!("greetings from {}", relay_parent);
run_args.insert(relay_parent.clone(), vec![FromJob::Test(test_message.clone())]);
test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle.send(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))).await;
assert_matches!(
overseer_handle.recv().await,
AllMessages::Test(msg) if msg == test_message
);
overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}
#[test]
fn stopping_non_running_job_fails() {
let relay_parent: Hash = [0; 32].into();
let run_args = HashMap::new();
test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 1);
assert_eq!(errs[0].0, Some(relay_parent));
assert_matches!(
errs[0].1,
JobsError::Utility(util::Error::JobNotFound(match_relay_parent)) if relay_parent == match_relay_parent
);
});
}
}
@@ -1,13 +0,0 @@
[package]
name = "polkadot-subsystem-test-helpers"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Helpers for testing subsystems"
[dependencies]
futures = "0.3.5"
async-trait = "0.1"
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
parking_lot = "0.10.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }