jobs: don't early exit when there are no jobs (#1621)

* jobs: don't early exit when there are no jobs

* utils: fix merged test

* utils: less verbose

* utils: add an assert subsystem is running

* utils: use TimeoutExt from test-helpers

* test-helpers: use TimeoutExt
This commit is contained in:
Andronik Ordian
2020-08-21 16:45:39 +02:00
committed by GitHub
parent 44354e717b
commit e899a3f844
2 changed files with 46 additions and 33 deletions
@@ -275,16 +275,13 @@ pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
let overseer = overseer_factory(handle); let overseer = overseer_factory(handle);
let test = test_factory(context); let test = test_factory(context);
let timeout = Delay::new(Duration::from_secs(2)); futures::pin_mut!(overseer, test);
futures::pin_mut!(overseer, test, timeout);
futures::executor::block_on(async move { futures::executor::block_on(async move {
futures::select! { future::join(overseer, test)
_ = overseer.fuse() => (), .timeout(Duration::from_secs(2))
_ = test.fuse() => (), .await
_ = timeout.fuse() => panic!("test timed out instead of completing"), .expect("test timed out instead of completing")
}
}); });
} }
+41 -25
View File
@@ -643,12 +643,17 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
// pin-project the outgoing messages // pin-project the outgoing messages
self.project().outgoing_msgs.poll_next(cx).map(|opt| { let result = self.project().outgoing_msgs.poll_next(cx).map(|opt| {
opt.and_then(|(stream_yield, _)| match stream_yield { opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg), StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None, StreamYield::Finished(_) => None,
}) })
}) });
// we don't want the stream to end if the jobs are empty at some point
match result {
task::Poll::Ready(None) => task::Poll::Pending,
otherwise => otherwise,
}
} }
} }
@@ -731,7 +736,7 @@ where
loop { loop {
select! { select! {
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break }, incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break },
outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break }, outgoing = jobs.next().fuse() => Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await,
complete => break, complete => break,
} }
} }
@@ -841,21 +846,16 @@ where
false false
} }
// handle an outgoing message. return true if we should break afterwards. // handle an outgoing message.
async fn handle_outgoing( async fn handle_outgoing(
outgoing: Option<Job::FromJob>, outgoing: Option<Job::FromJob>,
ctx: &mut Context, ctx: &mut Context,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) -> bool { ) {
match outgoing { let msg = outgoing.expect("the Jobs stream never ends; qed");
Some(msg) => { if let Err(e) = ctx.send_message(msg.into()).await {
if let Err(e) = ctx.send_message(msg.into()).await { Self::fwd_err(None, Error::from(e).into(), err_tx).await;
Self::fwd_err(None, Error::from(e).into(), err_tx).await;
}
}
None => return true,
} }
false
} }
} }
@@ -985,11 +985,10 @@ mod tests {
channel::mpsc, channel::mpsc,
executor, executor,
stream::{self, StreamExt}, stream::{self, StreamExt},
Future, FutureExt, SinkExt, future, Future, FutureExt, SinkExt,
}; };
use futures_timer::Delay;
use polkadot_primitives::v1::Hash; use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context, TimeoutExt as _};
use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration}; use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
@@ -1157,16 +1156,14 @@ mod tests {
let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx)); let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx));
let test_future = test(overseer_handle, err_rx); let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(subsystem, test_future, timeout); futures::pin_mut!(subsystem, test_future);
executor::block_on(async move { executor::block_on(async move {
futures::select! { future::join(subsystem, test_future)
_ = test_future.fuse() => (), .timeout(Duration::from_secs(2))
_ = subsystem.fuse() => (), .await
_ = timeout.fuse() => panic!("test timed out instead of completing"), .expect("test timed out instead of completing")
}
}); });
} }
@@ -1195,6 +1192,10 @@ mod tests {
))) )))
.await; .await;
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
let errs: Vec<_> = err_rx.collect().await; let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0); assert_eq!(errs.len(), 0);
}); });
@@ -1224,12 +1225,23 @@ mod tests {
#[test] #[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() { fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let run_args = HashMap::new(); let relay_parent = Hash::repeat_byte(0x01);
let mut run_args = HashMap::new();
run_args.insert(
relay_parent.clone(),
vec![FromJob::Test],
);
test_harness(run_args, |mut overseer_handle, err_rx| async move { test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
)))
.await;
// send to a non running job // send to a non running job
overseer_handle overseer_handle
.send(FromOverseer::Communication { .send(FromOverseer::Communication {
msg: Default::default(), msg: Default::default(),
}) })
.await; .await;
@@ -1240,6 +1252,10 @@ mod tests {
AllMessages::CandidateSelection(_) AllMessages::CandidateSelection(_)
); );
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
let errs: Vec<_> = err_rx.collect().await; let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0); assert_eq!(errs.len(), 0);
}); });