diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index d3d0f6acbc..3a0cc1ebe6 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4891,6 +4891,7 @@ dependencies = [ "assert_matches", "async-trait", "derive_more 0.99.9", + "env_logger", "futures 0.3.5", "futures-timer 3.0.2", "log 0.4.11", diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 6ed2999213..2c189419cc 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -30,3 +30,4 @@ async-trait = "0.1" futures = { version = "0.3.5", features = ["thread-pool"] } parking_lot = "0.10.0" polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } +env_logger = "0.7.1" diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index ab77427153..d2ddcb005f 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -611,10 +611,13 @@ impl Jobs { } /// Send a message to the appropriate job for this `parent_hash`. + /// Will not return an error if the job is not running. async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> { match self.running.get_mut(&parent_hash) { Some(job) => job.send_msg(msg).await?, - None => return Err(Error::JobNotFound(parent_hash)), + None => { + // don't bring down the subsystem, this can happen to due a race condition + }, } Ok(()) } @@ -1140,6 +1143,14 @@ mod tests { run_args: HashMap>, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option, JobsError)>) -> T, ) { + let _ = env_logger::builder() + .is_test(true) + .filter( + None, + log::LevelFilter::Trace, + ) + .try_init(); + let pool = sp_core::testing::TaskExecutor::new(); let (context, overseer_handle) = make_subsystem_context(pool.clone()); let (err_tx, err_rx) = mpsc::channel(16); @@ -1148,9 +1159,7 @@ mod tests { let test_future = test(overseer_handle, err_rx); let timeout = Delay::new(Duration::from_secs(2)); - futures::pin_mut!(test_future); - futures::pin_mut!(subsystem); - futures::pin_mut!(timeout); + futures::pin_mut!(subsystem, test_future, timeout); executor::block_on(async move { futures::select! { @@ -1213,6 +1222,29 @@ mod tests { }); } + #[test] + fn sending_to_a_non_running_job_do_not_stop_the_subsystem() { + let run_args = HashMap::new(); + + test_harness(run_args, |mut overseer_handle, err_rx| async move { + // send to a non running job + overseer_handle + .send(FromOverseer::Communication { + msg: Default::default(), + }) + .await; + + // the subsystem is still alive + assert_matches!( + overseer_handle.recv().await, + AllMessages::CandidateSelection(_) + ); + + let errs: Vec<_> = err_rx.collect().await; + assert_eq!(errs.len(), 0); + }); + } + #[test] fn test_subsystem_impl_and_name_derivation() { let pool = sp_core::testing::TaskExecutor::new();