pvf: Fix missing execution request when retrying preparation (#6537)

* pvf: Add checks for result sender when retrying preparation in tests

* pvf: Fix missing execution request when retrying preparation

* Update comment
This commit is contained in:
Marcin S
2023-01-11 15:01:43 -05:00
committed by GitHub
parent dada832405
commit ae74d33a93
+89 -13
View File
@@ -550,6 +550,10 @@ async fn handle_execute_pvf(
},
)
.await?;
// Add an execution request that will wait to run after this prepare job has
// finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
@@ -931,6 +935,13 @@ mod tests {
ValidationHost { to_host_tx }
}
async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
where
T: Send,
{
run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
}
async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
@@ -991,7 +1002,7 @@ mod tests {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
msg = to_sweeper_rx.next().fuse() => {
panic!("the sweeper supposed to be empty, but received: {:?}", msg)
panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
}
}
}
@@ -1311,12 +1322,12 @@ mod tests {
// Test that multiple prechecking requests do not trigger preparation retries if the first one
// failed.
#[tokio::test]
async fn test_precheck_prepare_retry() {
async fn test_precheck_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
// Submit a precheck request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
// The queue received the prepare request.
@@ -1333,22 +1344,34 @@ mod tests {
.await
.unwrap();
// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(PrepareError::TimedOut));
// Submit another precheck request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(PrepareError::TimedOut));
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another precheck request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();
// Assert the prepare queue is empty - we do not retry for precheck requests.
test.poll_ensure_to_prepare_queue_is_empty().await;
// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(result, Err(PrepareError::TimedOut));
}
// Test that multiple execution requests trigger preparation retries if the first one failed due
@@ -1359,7 +1382,7 @@ mod tests {
let mut host = test.host_handle();
// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
@@ -1384,8 +1407,12 @@ mod tests {
.await
.unwrap();
// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));
// Submit another execute request. We shouldn't try to prepare again, yet.
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
@@ -1399,11 +1426,15 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
@@ -1419,6 +1450,30 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();
// Preparation should have been retried and succeeded this time.
let result_tx_3 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
);
// Send an error for the execution here, just so we can check the result receiver is still
// alive.
result_tx_3
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_3.now_or_never().unwrap().unwrap(),
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
}
// Test that multiple execution requests don't trigger preparation retries if the first one
@@ -1428,8 +1483,8 @@ mod tests {
let mut test = Builder::default().build();
let mut host = test.host_handle();
// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
// Submit an execute request that fails.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
@@ -1454,8 +1509,15 @@ mod tests {
.await
.unwrap();
// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);
// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
@@ -1469,11 +1531,18 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
@@ -1486,6 +1555,13 @@ mod tests {
// Assert the prepare queue is empty - we do not retry for prevalidation errors.
test.poll_ensure_to_prepare_queue_is_empty().await;
// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);
}
// Test that multiple heads-up requests trigger preparation retries if the first one failed.