diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 9bfe0b0d9e..98079e4fff 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4457,7 +4457,6 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", - "polkadot-subsystem-test-helpers", "sc-network", "sp-core", "sp-runtime", @@ -4498,7 +4497,6 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-primitives", "polkadot-statement-table", - "polkadot-subsystem-test-helpers", "sc-client-api", "sc-keystore", "sp-api", @@ -4549,12 +4547,14 @@ dependencies = [ name = "polkadot-node-subsystem" version = "0.1.0" dependencies = [ + "assert_matches", "async-trait", "derive_more 0.99.9", "futures 0.3.5", "futures-timer 3.0.2", "log 0.4.8", "parity-scale-codec", + "parking_lot 0.10.2", "pin-project", "polkadot-node-primitives", "polkadot-primitives", @@ -4615,7 +4615,6 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", - "polkadot-subsystem-test-helpers", "sc-network", "sp-core", "sp-runtime", @@ -4970,7 +4969,6 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", - "polkadot-subsystem-test-helpers", "sp-core", "sp-keyring", "sp-runtime", @@ -4987,17 +4985,6 @@ dependencies = [ "sp-core", ] -[[package]] -name = "polkadot-subsystem-test-helpers" -version = "0.1.0" -dependencies = [ - "async-trait", - "futures 0.3.5", - "parking_lot 0.10.2", - "polkadot-node-subsystem", - "sp-core", -] - [[package]] name = "polkadot-test-runtime" version = "0.8.17" diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 40adacde41..e0125209f2 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -52,7 +52,6 @@ members = [ "node/service", "node/core/backing", "node/subsystem", - "node/test-helpers/subsystem", "node/test-service", "parachain/test-parachains", diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml index 6c8155e3cf..a660df3c31 100644 --- a/polkadot/node/core/backing/Cargo.toml +++ b/polkadot/node/core/backing/Cargo.toml @@ -22,5 +22,5 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = { version = "0.3.5", features = ["thread-pool"] } -subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 260cd2eacc..3dd31e5e48 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -745,7 +745,7 @@ where /// Run this subsystem pub async fn run(ctx: Context, keystore: KeyStorePtr, spawner: Spawner) { - >::run(ctx, keystore, spawner).await + >::run(ctx, keystore, spawner, None).await } } @@ -895,13 +895,13 @@ mod tests { } struct TestHarness { - virtual_overseer: subsystem_test::TestSubsystemContextHandle, + virtual_overseer: polkadot_subsystem::test_helpers::TestSubsystemContextHandle, } fn test_harness>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) { let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); + let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool.clone()); let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone()); @@ -959,7 +959,7 @@ mod tests { // Tests that the subsystem performs actions that are requied on startup. async fn test_startup( - virtual_overseer: &mut subsystem_test::TestSubsystemContextHandle, + virtual_overseer: &mut polkadot_subsystem::test_helpers::TestSubsystemContextHandle, test_state: &TestState, ) { // Start work on some new parent. diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 49e84b8597..3bd4b6702b 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -18,6 +18,5 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys [dev-dependencies] parking_lot = "0.10.0" -subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index f2dbcd0aac..85606d07cc 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -531,7 +531,7 @@ mod tests { use assert_matches::assert_matches; use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; - use subsystem_test::{SingleItemSink, SingleItemStream}; + use polkadot_subsystem::test_helpers::{SingleItemSink, SingleItemStream}; // The subsystem's view of the network - only supports a single call to `event_stream`. struct TestNetwork { @@ -550,7 +550,7 @@ mod tests { TestNetwork, TestNetworkHandle, ) { - let (net_tx, net_rx) = subsystem_test::single_item_sink(); + let (net_tx, net_rx) = polkadot_subsystem::test_helpers::single_item_sink(); let (action_tx, action_rx) = mpsc::unbounded(); ( @@ -631,13 +631,13 @@ mod tests { struct TestHarness { network_handle: TestNetworkHandle, - virtual_overseer: subsystem_test::TestSubsystemContextHandle, + virtual_overseer: polkadot_subsystem::test_helpers::TestSubsystemContextHandle, } fn test_harness>(test: impl FnOnce(TestHarness) -> T) { let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (network, network_handle) = new_test_network(); - let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool); + let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); let network_bridge = run_network( network, diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml index 232e84f7eb..672e697c26 100644 --- a/polkadot/node/network/pov-distribution/Cargo.toml +++ b/polkadot/node/network/pov-distribution/Cargo.toml @@ -18,6 +18,6 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys [dev-dependencies] parking_lot = "0.10.0" -subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] } diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 8a7c0fa7a5..aa37cfdc3f 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -620,7 +620,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; @@ -700,7 +700,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; @@ -778,7 +778,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { handle_network_update( @@ -850,7 +850,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request before peer B. @@ -938,7 +938,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request: right relay parent, awaited hash, wrong PoV. @@ -1001,7 +1001,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request: right relay parent, awaited hash, wrong PoV. @@ -1062,7 +1062,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request: right relay parent, awaited hash, wrong PoV. @@ -1120,7 +1120,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { let max_plausibly_awaited = n_validators * 2; @@ -1205,7 +1205,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { let pov_hash = make_pov(vec![1, 2, 3]).hash(); @@ -1267,7 +1267,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { let pov_hash = make_pov(vec![1, 2, 3]).hash(); @@ -1344,7 +1344,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { handle_network_update( @@ -1427,7 +1427,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { handle_network_update( diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml index 955830ef20..307178201c 100644 --- a/polkadot/node/network/statement-distribution/Cargo.toml +++ b/polkadot/node/network/statement-distribution/Cargo.toml @@ -21,7 +21,7 @@ indexmap = "1.4.0" [dev-dependencies] parking_lot = "0.10.0" -subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = ["test-helpers"] } assert_matches = "1.3.0" sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 67ac4c6b64..ae13b62791 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1213,7 +1213,7 @@ mod tests { }; let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); let peer = PeerId::random(); executor::block_on(async move { @@ -1305,7 +1305,7 @@ mod tests { ].into_iter().collect(); let pool = sp_core::testing::SpawnBlockingExecutor::new(); - let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool); executor::block_on(async move { let statement = { diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index 188e7cbfa7..701a197c2f 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -13,6 +13,7 @@ futures-timer = "3.0.2" keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } log = "0.4.8" parity-scale-codec = "1.3.0" +parking_lot = { version = "0.10.0", optional = true } pin-project = "0.4.22" polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } @@ -20,3 +21,12 @@ polkadot-statement-table = { path = "../../statement-table" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } streamunordered = "0.5.1" + +[dev-dependencies] +assert_matches = "1.3.0" +async-trait = "0.1" +futures = { version = "0.3.5", features = ["thread-pool"] } +parking_lot = "0.10.0" + +[features] +test-helpers = [ "parking_lot" ] diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 8637db45a2..430a8418d9 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -35,6 +35,8 @@ use crate::messages::AllMessages; pub mod messages; pub mod util; +#[cfg(any(test, feature = "test-helpers"))] +pub mod test_helpers; /// Signals sent by an overseer to a subsystem. #[derive(PartialEq, Clone, Debug)] @@ -71,7 +73,7 @@ pub enum FromOverseer { /// * Subsystems dying when they are not expected to /// * Subsystems not dying when they are told to die /// * etc. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct SubsystemError; impl From for SubsystemError { diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index d3c630cb56..2040b41348 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -408,4 +408,10 @@ pub enum AllMessages { AvailabilityStore(AvailabilityStoreMessage), /// Message for the network bridge subsystem. NetworkBridge(NetworkBridgeMessage), + /// Test message + /// + /// This variant is only valid while testing, but makes the process of testing the + /// subsystem job manager much simpler. + #[cfg(test)] + Test(String), } diff --git a/polkadot/node/test-helpers/subsystem/src/lib.rs b/polkadot/node/subsystem/src/test_helpers.rs similarity index 97% rename from polkadot/node/test-helpers/subsystem/src/lib.rs rename to polkadot/node/subsystem/src/test_helpers.rs index 5fa7f0b9ec..256a928b43 100644 --- a/polkadot/node/test-helpers/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/test_helpers.rs @@ -16,8 +16,8 @@ //! Utilities for testing subsystems. -use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError}; -use polkadot_subsystem::messages::AllMessages; +use crate::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError}; +use crate::messages::AllMessages; use futures::prelude::*; use futures::channel::mpsc; diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs index 94bb95c26d..9964a1e8dd 100644 --- a/polkadot/node/subsystem/src/util.rs +++ b/polkadot/node/subsystem/src/util.rs @@ -22,7 +22,7 @@ use crate::{ messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster}, - FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, + FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use futures::{ channel::{mpsc, oneshot}, @@ -67,12 +67,20 @@ pub enum Error { /// Attempted to send on a MPSC channel which has been canceled #[from] Mpsc(mpsc::SendError), + /// A subsystem error + #[from] + Subsystem(SubsystemError), + /// The type system wants this even though it doesn't make sense + #[from] + Infallible(std::convert::Infallible), /// Attempted to convert from an AllMessages to a FromJob, and failed. SenderConversion(String), /// The local node is not a validator. NotAValidator, /// The desired job is not present in the jobs list. JobNotFound(Hash), + /// Already forwarding errors to another sender + AlreadyForwarding, } /// Request some data from the `RuntimeApi`. @@ -262,7 +270,7 @@ pub trait ToJobTrait: TryFrom { } /// A JobHandle manages a particular job for a subsystem. -pub struct JobHandle { +struct JobHandle { abort_handle: future::AbortHandle, to_job: mpsc::Sender, finished: oneshot::Receiver<()>, @@ -271,23 +279,23 @@ pub struct JobHandle { impl JobHandle { /// Send a message to the job. - pub async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> { + async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> { self.to_job.send(msg).await.map_err(Into::into) } - - /// Abort the job without waiting for a graceful shutdown - pub fn abort(self) { - self.abort_handle.abort(); - } } impl JobHandle { /// Stop this job gracefully. /// /// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it. - pub async fn stop(mut self) { + async fn stop(mut self) { // we don't actually care if the message couldn't be sent - let _ = self.to_job.send(ToJob::STOP).await; + if let Err(_) = self.to_job.send(ToJob::STOP).await { + // no need to wait further here: the job is either stalled or + // disconnected, and in either case, we can just abort it immediately + self.abort_handle.abort(); + return; + } let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION); match future::select(stop_timer, self.finished).await { @@ -310,7 +318,7 @@ pub trait JobTrait: Unpin { /// Message type from the job. Typically a subset of AllMessages. type FromJob: 'static + Into + Send; /// Job runtime error. - type Error: std::fmt::Debug; + type Error: 'static + std::fmt::Debug + Send; /// Extra arguments this job needs to run properly. /// /// If no extra information is needed, it is perfectly acceptable to set it to `()`. @@ -323,8 +331,8 @@ pub trait JobTrait: Unpin { fn run( parent: Hash, run_args: Self::RunArgs, - rx_to: mpsc::Receiver, - tx_from: mpsc::Sender, + receiver: mpsc::Receiver, + sender: mpsc::Sender, ) -> Pin> + Send>>; /// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job @@ -342,6 +350,18 @@ pub trait JobTrait: Unpin { } } +/// Error which can be returned by the jobs manager +/// +/// Wraps the utility error type and the job-specific error +#[derive(Debug, derive_more::From)] +pub enum JobsError { + /// utility error + #[from] + Utility(Error), + /// internal job error + Job(JobError), +} + /// Jobs manager for a subsystem /// /// - Spawns new jobs for a given relay-parent on demand. @@ -356,9 +376,10 @@ pub struct Jobs { #[pin] outgoing_msgs: StreamUnordered>, job: std::marker::PhantomData, + errors: Option, JobsError)>>, } -impl Jobs { +impl Jobs { /// Create a new Jobs manager which handles spawning appropriate jobs. pub fn new(spawner: Spawner) -> Self { Self { @@ -366,15 +387,31 @@ impl Jobs { running: HashMap::new(), outgoing_msgs: StreamUnordered::new(), job: std::marker::PhantomData, + errors: None, } } + /// Monitor errors which may occur during handling of a spawned job. + /// + /// By default, an error in a job is simply logged. Once this is called, + /// the error is forwarded onto the provided channel. + /// + /// Errors if the error channel already exists. + pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option, JobsError)>) -> Result<(), Error> { + if self.errors.is_some() { return Err(Error::AlreadyForwarding) } + self.errors = Some(tx); + Ok(()) + } + /// Spawn a new job for this `parent_hash`, with whatever args are appropriate. fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs) -> Result<(), Error> { let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (finished_tx, finished) = oneshot::channel(); + // clone the error transmitter to move into the future + let err_tx = self.errors.clone(); + let (future, abort_handle) = future::abortable(async move { if let Err(e) = Job::run(parent_hash, run_args, to_job_rx, from_job_tx).await { log::error!( @@ -383,12 +420,26 @@ impl Jobs { parent_hash, e, ); + + if let Some(mut err_tx) = err_tx { + // if we can't send the notification of error on the error channel, then + // there's no point trying to propagate this error onto the channel too + // all we can do is warn that error propagatio has failed + if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await { + log::warn!("failed to forward error: {:?}", e); + } + } } }); - // discard output + // the spawn mechanism requires that the spawned future has no output let future = async move { + // job errors are already handled within the future, meaning + // that any errors here are due to the abortable mechanism. + // failure to abort isn't of interest. let _ = future.await; + // transmission failure here is only possible if the receiver is closed, + // which means the handle is dropped, which means we don't care anymore let _ = finished_tx.send(()); }; self.spawner.spawn(Job::NAME, future.boxed()); @@ -472,13 +523,14 @@ pub struct JobManager { run_args: Job::RunArgs, context: std::marker::PhantomData, job: std::marker::PhantomData, + errors: Option, JobsError)>>, } impl JobManager where Spawner: SpawnNamed + Clone + Send + Unpin, Context: SubsystemContext, - Job: JobTrait, + Job: 'static + JobTrait, Job::RunArgs: Clone, Job::ToJob: TryFrom + TryFrom<::Message> + Sync, { @@ -489,9 +541,22 @@ where run_args, context: std::marker::PhantomData, job: std::marker::PhantomData, + errors: None, } } + /// Monitor errors which may occur during handling of a spawned job. + /// + /// By default, an error in a job is simply logged. Once this is called, + /// the error is forwarded onto the provided channel. + /// + /// Errors if the error channel already exists. + pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option, JobsError)>) -> Result<(), Error> { + if self.errors.is_some() { return Err(Error::AlreadyForwarding) } + self.errors = Some(tx); + Ok(()) + } + /// Run this subsystem /// /// Conceptually, this is very simple: it just loops forever. @@ -500,23 +565,41 @@ where /// - On other incoming messages, if they can be converted into Job::ToJob and /// include a hash, then they're forwarded to the appropriate individual job. /// - On outgoing messages from the jobs, it forwards them to the overseer. - pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner) { + /// + /// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur. + /// Otherwise, most are logged and then discarded. + pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner, mut err_tx: Option, JobsError)>>) { let mut jobs = Jobs::new(spawner.clone()); + if let Some(ref err_tx) = err_tx { + jobs.forward_errors(err_tx.clone()).expect("we never call this twice in this context; qed"); + } loop { select! { - incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args).await { break }, - outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx).await { break }, + incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &mut err_tx).await { break }, + outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break }, complete => break, } } } + // if we have a channel on which to forward errors, do so + async fn fwd_err(hash: Option, err: JobsError, err_tx: &mut Option, JobsError)>>) { + if let Some(err_tx) = err_tx { + // if we can't send on the error transmission channel, we can't do anything useful about it + // still, we can at least log the failure + if let Err(e) = err_tx.send((hash, err)).await { + log::warn!("failed to forward error: {:?}", e); + } + } + } + // handle an incoming message. return true if we should break afterwards. async fn handle_incoming( incoming: SubsystemResult>, jobs: &mut Jobs, run_args: &Job::RunArgs, + err_tx: &mut Option, JobsError)>> ) -> bool { use crate::FromOverseer::{Communication, Signal}; use crate::OverseerSignal::{Conclude, StartWork, StopWork}; @@ -525,12 +608,14 @@ where Ok(Signal(StartWork(hash))) => { if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { log::error!("Failed to spawn a job: {:?}", e); + Self::fwd_err(Some(hash), e.into(), err_tx).await; return true; } } Ok(Signal(StopWork(hash))) => { if let Err(e) = jobs.stop_job(hash).await { log::error!("Failed to stop a job: {:?}", e); + Self::fwd_err(Some(hash), e.into(), err_tx).await; return true; } } @@ -540,17 +625,21 @@ where // // Forwarding the stream to a drain means we wait until all of the items in the stream // have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`. + use futures::sink::drain; use futures::stream::StreamExt; use futures::stream::FuturesUnordered; - let unordered = jobs.running + if let Err(e) = jobs.running .drain() .map(|(_, handle)| handle.stop()) - .collect::>(); - // now wait for all the futures to complete; collect a vector of their results - // this is strictly less efficient than draining them into oblivion, but this compiles, and that doesn't - // https://github.com/paritytech/polkadot/pull/1376#pullrequestreview-446488645 - let _ = async move { unordered.collect::>() }.await; + .collect::>() + .map(Ok) + .forward(drain()) + .await + { + log::error!("failed to stop all jobs on conclude signal: {:?}", e); + Self::fwd_err(None, Error::from(e).into(), err_tx).await; + } return true; } @@ -560,12 +649,14 @@ where Some(hash) => { if let Err(err) = jobs.send_msg(hash, to_job).await { log::error!("Failed to send a message to a job: {:?}", err); + Self::fwd_err(Some(hash), err.into(), err_tx).await; return true; } } None => { if let Err(err) = Job::handle_unanchored_msg(to_job) { log::error!("Failed to handle unhashed message: {:?}", err); + Self::fwd_err(None, JobsError::Job(err), err_tx).await; return true; } } @@ -574,6 +665,7 @@ where } Err(err) => { log::error!("error receiving message from subsystem context: {:?}", err); + Self::fwd_err(None, Error::from(err).into(), err_tx).await; return true; } } @@ -581,11 +673,12 @@ where } // handle an outgoing message. return true if we should break afterwards. - async fn handle_outgoing(outgoing: Option, ctx: &mut Context) -> bool { + async fn handle_outgoing(outgoing: Option, ctx: &mut Context, err_tx: &mut Option, JobsError)>>) -> bool { match outgoing { Some(msg) => { - // discard errors when sending the message upstream - let _ = ctx.send_message(msg.into()).await; + if let Err(e) = ctx.send_message(msg.into()).await { + Self::fwd_err(None, Error::from(e).into(), err_tx).await; + } } None => return true, } @@ -598,16 +691,18 @@ where Spawner: SpawnNamed + Send + Clone + Unpin + 'static, Context: SubsystemContext, ::Message: Into, - Job: JobTrait + Send, + Job: 'static + JobTrait + Send, Job::RunArgs: Clone + Sync, Job::ToJob: TryFrom + Sync, { fn start(self, ctx: Context) -> SpawnedSubsystem { let spawner = self.spawner.clone(); let run_args = self.run_args.clone(); + let errors = self.errors; + let future = Box::pin(async move { - Self::run(ctx, run_args, spawner).await; + Self::run(ctx, run_args, spawner, errors).await; }); SpawnedSubsystem { @@ -616,3 +711,242 @@ where } } } + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use crate::{ + messages::{AllMessages, CandidateSelectionMessage}, + test_helpers::{self, make_subsystem_context}, + util::{ + self, + JobsError, + JobManager, + JobTrait, + ToJobTrait, + }, + FromOverseer, + OverseerSignal, + }; + use futures::{ + channel::mpsc, + executor, + Future, + FutureExt, + stream::{self, StreamExt}, + SinkExt, + }; + use futures_timer::Delay; + use polkadot_primitives::v1::Hash; + use std::{ + collections::HashMap, + convert::TryFrom, + pin::Pin, + time::Duration, + }; + + // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; + // you can leave the subsystem itself to the job manager. + + // for purposes of demonstration, we're going to whip up a fake subsystem. + // this will 'select' candidates which are pre-loaded in the job + + // job structs are constructed within JobTrait::run + // most will want to retain the sender and receiver, as well as whatever other data they like + struct FakeCandidateSelectionJob { + receiver: mpsc::Receiver, + } + + // ToJob implementations require the following properties: + // + // - have a Stop variant (to impl ToJobTrait) + // - impl ToJobTrait + // - impl TryFrom + // - impl From (from SubsystemContext::Message) + // + // Mostly, they are just a type-safe subset of AllMessages that this job is prepared to receive + enum ToJob { + CandidateSelection(CandidateSelectionMessage), + Stop, + } + + impl ToJobTrait for ToJob { + const STOP: Self = ToJob::Stop; + + fn relay_parent(&self) -> Option { + match self { + Self::CandidateSelection(csm) => csm.relay_parent(), + Self::Stop => None, + } + } + } + + impl TryFrom for ToJob { + type Error = (); + + fn try_from(msg: AllMessages) -> Result { + match msg { + AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)), + _ => Err(()) + } + } + } + + impl From for ToJob { + fn from(csm: CandidateSelectionMessage) -> ToJob { + ToJob::CandidateSelection(csm) + } + } + + // FromJob must be infallibly convertable into AllMessages. + // + // It exists to be a type-safe subset of AllMessages that this job is specified to send. + // + // Note: the Clone impl here is not generally required; it's just ueful for this test context because + // we include it in the RunArgs + #[derive(Clone)] + enum FromJob { + Test(String), + } + + impl From for AllMessages { + fn from(from_job: FromJob) -> AllMessages { + match from_job { + FromJob::Test(s) => AllMessages::Test(s), + } + } + } + + // Error will mostly be a wrapper to make the try operator more convenient; + // deriving From implementations for most variants is recommended. + // It must implement Debug for logging. + #[derive(Debug, derive_more::From)] + enum Error { + #[from] + Sending(mpsc::SendError) + } + + impl JobTrait for FakeCandidateSelectionJob { + type ToJob = ToJob; + type FromJob = FromJob; + type Error = Error; + // RunArgs can be anything that a particular job needs supplied from its external context + // in order to create the Job. In this case, they're a hashmap of parents to the mock outputs + // expected from that job. + // + // Note that it's not recommended to use something as heavy as a hashmap in production: the + // RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in + // an Arc. Within a testing context, that efficiency is less important. + type RunArgs = HashMap>; + + const NAME: &'static str = "FakeCandidateSelectionJob"; + + /// Run a job for the parent block indicated + // + // this function is in charge of creating and executing the job's main loop + fn run( + parent: Hash, + mut run_args: Self::RunArgs, + receiver: mpsc::Receiver, + mut sender: mpsc::Sender, + ) -> Pin> + Send>> { + async move { + let job = FakeCandidateSelectionJob { + receiver, + }; + + // most jobs will have a request-response cycle at the heart of their run loop. + // however, in this case, we never receive valid messages, so we may as well + // just send all of our (mock) output messages now + let mock_output = run_args.remove(&parent).unwrap_or_default(); + let mut stream = stream::iter(mock_output.into_iter().map(Ok)); + sender.send_all(&mut stream).await?; + + // it isn't necessary to break run_loop into its own function, + // but it's convenient to separate the concerns in this way + job.run_loop().await + }.boxed() + } + } + + impl FakeCandidateSelectionJob { + async fn run_loop(mut self) -> Result<(), Error> { + while let Some(msg) = self.receiver.next().await { + match msg { + ToJob::CandidateSelection(_csm) => { + unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet"); + } + ToJob::Stop => break, + } + } + + Ok(()) + } + } + + // with the job defined, it's straightforward to get a subsystem implementation. + type FakeCandidateSelectionSubsystem = JobManager; + + // this type lets us pretend to be the overseer + type OverseerHandle = test_helpers::TestSubsystemContextHandle; + + fn test_harness>(run_args: HashMap>, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option, JobsError)>) -> T) { + let pool = sp_core::testing::SpawnBlockingExecutor::new(); + let (context, overseer_handle) = make_subsystem_context(pool.clone()); + let (err_tx, err_rx) = mpsc::channel(16); + + let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, pool, Some(err_tx)); + let test_future = test(overseer_handle, err_rx); + let timeout = Delay::new(Duration::from_secs(2)); + + futures::pin_mut!(test_future); + futures::pin_mut!(subsystem); + futures::pin_mut!(timeout); + + executor::block_on(async move { + futures::select! { + _ = test_future.fuse() => (), + _ = subsystem.fuse() => (), + _ = timeout.fuse() => panic!("test timed out instead of completing"), + } + }); + } + + #[test] + fn starting_and_stopping_job_works() { + let relay_parent: Hash = [0; 32].into(); + let mut run_args = HashMap::new(); + let test_message = format!("greetings from {}", relay_parent); + run_args.insert(relay_parent.clone(), vec![FromJob::Test(test_message.clone())]); + + test_harness(run_args, |mut overseer_handle, err_rx| async move { + overseer_handle.send(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))).await; + assert_matches!( + overseer_handle.recv().await, + AllMessages::Test(msg) if msg == test_message + ); + overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await; + + let errs: Vec<_> = err_rx.collect().await; + assert_eq!(errs.len(), 0); + }); + } + + #[test] + fn stopping_non_running_job_fails() { + let relay_parent: Hash = [0; 32].into(); + let run_args = HashMap::new(); + + test_harness(run_args, |mut overseer_handle, err_rx| async move { + overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await; + + let errs: Vec<_> = err_rx.collect().await; + assert_eq!(errs.len(), 1); + assert_eq!(errs[0].0, Some(relay_parent)); + assert_matches!( + errs[0].1, + JobsError::Utility(util::Error::JobNotFound(match_relay_parent)) if relay_parent == match_relay_parent + ); + }); + } +} diff --git a/polkadot/node/test-helpers/subsystem/Cargo.toml b/polkadot/node/test-helpers/subsystem/Cargo.toml deleted file mode 100644 index 8175a5a6b1..0000000000 --- a/polkadot/node/test-helpers/subsystem/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "polkadot-subsystem-test-helpers" -version = "0.1.0" -authors = ["Parity Technologies "] -edition = "2018" -description = "Helpers for testing subsystems" - -[dependencies] -futures = "0.3.5" -async-trait = "0.1" -polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } -parking_lot = "0.10.0" -sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }