utils: handle race condition gracefully (#1583)

* utils: handle race condition gracefully

* utils: add a test

* update Cargo.lock

* utils: remove a warning

* utils: init logger in tests

* utils: update the outdated comment

* util: wait for both subsystem and test_future to finish

* Revert "util: wait for both subsystem and test_future to finish"

This reverts commit 075b39242cd7200e3f4c24ea42f702c3124ce41c.
This commit is contained in:
Andronik Ordian
2020-08-21 15:30:18 +02:00
committed by GitHub
parent cc19f13468
commit 44354e717b
3 changed files with 38 additions and 4 deletions
+36 -4
View File
@@ -611,10 +611,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}
/// Send a message to the appropriate job for this `parent_hash`.
/// Will not return an error if the job is not running.
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> {
match self.running.get_mut(&parent_hash) {
Some(job) => job.send_msg(msg).await?,
None => return Err(Error::JobNotFound(parent_hash)),
None => {
// don't bring down the subsystem, this can happen to due a race condition
},
}
Ok(())
}
@@ -1140,6 +1143,14 @@ mod tests {
run_args: HashMap<Hash, Vec<FromJob>>,
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16);
@@ -1148,9 +1159,7 @@ mod tests {
let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(test_future);
futures::pin_mut!(subsystem);
futures::pin_mut!(timeout);
futures::pin_mut!(subsystem, test_future, timeout);
executor::block_on(async move {
futures::select! {
@@ -1213,6 +1222,29 @@ mod tests {
});
}
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let run_args = HashMap::new();
test_harness(run_args, |mut overseer_handle, err_rx| async move {
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CandidateSelection(_)
);
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}
#[test]
fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new();