diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 5b32482899..4d5be56cdf 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -275,16 +275,13 @@ pub fn subsystem_test_harness( let overseer = overseer_factory(handle); let test = test_factory(context); - let timeout = Delay::new(Duration::from_secs(2)); - - futures::pin_mut!(overseer, test, timeout); + futures::pin_mut!(overseer, test); futures::executor::block_on(async move { - futures::select! { - _ = overseer.fuse() => (), - _ = test.fuse() => (), - _ = timeout.fuse() => panic!("test timed out instead of completing"), - } + future::join(overseer, test) + .timeout(Duration::from_secs(2)) + .await + .expect("test timed out instead of completing") }); } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index d2ddcb005f..0cd7a05c31 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -643,12 +643,17 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll> { // pin-project the outgoing messages - self.project().outgoing_msgs.poll_next(cx).map(|opt| { + let result = self.project().outgoing_msgs.poll_next(cx).map(|opt| { opt.and_then(|(stream_yield, _)| match stream_yield { StreamYield::Item(msg) => Some(msg), StreamYield::Finished(_) => None, }) - }) + }); + // we don't want the stream to end if the jobs are empty at some point + match result { + task::Poll::Ready(None) => task::Poll::Pending, + otherwise => otherwise, + } } } @@ -731,7 +736,7 @@ where loop { select! { incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break }, - outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break }, + outgoing = jobs.next().fuse() => Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await, complete => break, } } @@ -841,21 +846,16 @@ where false } - // handle an outgoing message. return true if we should break afterwards. + // handle an outgoing message. async fn handle_outgoing( outgoing: Option, ctx: &mut Context, err_tx: &mut Option, JobsError)>>, - ) -> bool { - match outgoing { - Some(msg) => { - if let Err(e) = ctx.send_message(msg.into()).await { - Self::fwd_err(None, Error::from(e).into(), err_tx).await; - } - } - None => return true, + ) { + let msg = outgoing.expect("the Jobs stream never ends; qed"); + if let Err(e) = ctx.send_message(msg.into()).await { + Self::fwd_err(None, Error::from(e).into(), err_tx).await; } - false } } @@ -985,11 +985,10 @@ mod tests { channel::mpsc, executor, stream::{self, StreamExt}, - Future, FutureExt, SinkExt, + future, Future, FutureExt, SinkExt, }; - use futures_timer::Delay; use polkadot_primitives::v1::Hash; - use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; + use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context, TimeoutExt as _}; use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration}; // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; @@ -1157,16 +1156,14 @@ mod tests { let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx)); let test_future = test(overseer_handle, err_rx); - let timeout = Delay::new(Duration::from_secs(2)); - futures::pin_mut!(subsystem, test_future, timeout); + futures::pin_mut!(subsystem, test_future); executor::block_on(async move { - futures::select! { - _ = test_future.fuse() => (), - _ = subsystem.fuse() => (), - _ = timeout.fuse() => panic!("test timed out instead of completing"), - } + future::join(subsystem, test_future) + .timeout(Duration::from_secs(2)) + .await + .expect("test timed out instead of completing") }); } @@ -1195,6 +1192,10 @@ mod tests { ))) .await; + overseer_handle + .send(FromOverseer::Signal(OverseerSignal::Conclude)) + .await; + let errs: Vec<_> = err_rx.collect().await; assert_eq!(errs.len(), 0); }); @@ -1224,12 +1225,23 @@ mod tests { #[test] fn sending_to_a_non_running_job_do_not_stop_the_subsystem() { - let run_args = HashMap::new(); + let relay_parent = Hash::repeat_byte(0x01); + let mut run_args = HashMap::new(); + run_args.insert( + relay_parent.clone(), + vec![FromJob::Test], + ); test_harness(run_args, |mut overseer_handle, err_rx| async move { + overseer_handle + .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(relay_parent), + ))) + .await; + // send to a non running job overseer_handle - .send(FromOverseer::Communication { + .send(FromOverseer::Communication { msg: Default::default(), }) .await; @@ -1240,6 +1252,10 @@ mod tests { AllMessages::CandidateSelection(_) ); + overseer_handle + .send(FromOverseer::Signal(OverseerSignal::Conclude)) + .await; + let errs: Vec<_> = err_rx.collect().await; assert_eq!(errs.len(), 0); });