Retry failed PVF prepare jobs (#6213)

This commit is contained in:
Marcin S
2022-11-08 08:30:14 -05:00
committed by GitHub
parent 6cc91f9187
commit 63c6f184cb
2 changed files with 388 additions and 55 deletions
+15 -3
View File
@@ -103,10 +103,22 @@ pub enum ArtifactState {
last_time_needed: SystemTime,
},
/// A task to prepare this artifact is scheduled.
Preparing { waiting_for_response: Vec<PrepareResultSender> },
Preparing {
/// List of result senders that are waiting for a response.
waiting_for_response: Vec<PrepareResultSender>,
/// The number of times this artifact has failed to prepare.
num_failures: u32,
},
/// The code couldn't be compiled due to an error. Such artifacts
/// never reach the executor and stay in the host's memory.
FailedToProcess(PrepareError),
FailedToProcess {
/// Keep track of the last time that processing this artifact failed.
last_time_failed: SystemTime,
/// The number of times this artifact has failed to prepare.
num_failures: u32,
/// The last error encountered for preparation.
error: PrepareError,
},
}
/// A container of all known artifact ids and their states.
@@ -150,7 +162,7 @@ impl Artifacts {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response })
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 })
.is_none());
}
+373 -52
View File
@@ -22,6 +22,7 @@
use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
error::PrepareError,
execute,
metrics::Metrics,
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
@@ -49,6 +50,16 @@ pub const PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60);
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360);
/// The time period after which a failed preparation artifact is considered ready to be retried.
/// Note that we will only retry if another request comes in after this cooldown has passed.
#[cfg(not(test))]
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
#[cfg(test)]
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
/// The amount of times we will retry failed prepare jobs.
pub const NUM_PREPARE_RETRIES: u32 = 5;
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
@@ -97,7 +108,13 @@ impl ValidationHost {
result_tx: ResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx })
.send(ToHost::ExecutePvf(ExecutePvfInputs {
pvf,
execution_timeout,
params,
priority,
result_tx,
}))
.await
.map_err(|_| "the inner loop hung up".to_string())
}
@@ -117,20 +134,17 @@ impl ValidationHost {
}
enum ToHost {
PrecheckPvf {
pvf: Pvf,
result_tx: PrepareResultSender,
},
ExecutePvf {
pvf: Pvf,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
},
HeadsUp {
active_pvfs: Vec<Pvf>,
},
PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender },
ExecutePvf(ExecutePvfInputs),
HeadsUp { active_pvfs: Vec<Pvf> },
}
struct ExecutePvfInputs {
pvf: Pvf,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
}
/// Configuration for the validation host.
@@ -361,6 +375,8 @@ async fn run(
Some(to_host) => to_host,
};
// If the artifact failed before, it could be re-scheduled for preparation here if
// the preparation failure cooldown has elapsed.
break_if_fatal!(handle_to_host(
&cache_path,
&mut artifacts,
@@ -377,9 +393,9 @@ async fn run(
// Note that preparation always succeeds.
//
// That's because the error conditions are written into the artifact and will be
// reported at the time of the execution. It potentially, but not necessarily,
// can be scheduled as a result of this function call, in case there are pending
// executions.
// reported at the time of the execution. It potentially, but not necessarily, can
// be scheduled for execution as a result of this function call, in case there are
// pending executions.
//
// We could be eager in terms of reporting and plumb the result from the preparation
// worker but we don't for the sake of simplicity.
@@ -407,24 +423,19 @@ async fn handle_to_host(
ToHost::PrecheckPvf { pvf, result_tx } => {
handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
},
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => {
ToHost::ExecutePvf(inputs) => {
handle_execute_pvf(
cache_path,
artifacts,
prepare_queue,
execute_queue,
awaiting_prepare,
pvf,
execution_timeout,
params,
priority,
result_tx,
inputs,
)
.await?;
},
ToHost::HeadsUp { active_pvfs } => {
handle_heads_up(artifacts, prepare_queue, active_pvfs).await?;
},
ToHost::HeadsUp { active_pvfs } =>
handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
}
Ok(())
@@ -432,8 +443,9 @@ async fn handle_to_host(
/// Handles PVF prechecking requests.
///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout
/// ([`PRECHECK_PREPARATION_TIMEOUT`]).
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]).
///
/// If the prepare job failed previously, we may retry it under certain conditions.
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
@@ -448,10 +460,12 @@ async fn handle_precheck_pvf(
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(()));
},
ArtifactState::Preparing { waiting_for_response } =>
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
ArtifactState::FailedToProcess(result) => {
let _ = result_sender.send(PrepareResult::Err(result.clone()));
ArtifactState::FailedToProcess { error, .. } => {
// Do not retry failed preparation if another pre-check request comes in. We do not retry pre-checking,
// anyway.
let _ = result_sender.send(PrepareResult::Err(error.clone()));
},
}
} else {
@@ -471,22 +485,22 @@ async fn handle_precheck_pvf(
/// Handles PVF execution.
///
/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there
/// is already a preparation job, we coalesce the two preparation jobs. When preparing for
/// execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`]) than when
/// prechecking.
/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
/// preparation job, we coalesce the two preparation jobs.
///
/// If the prepare job failed previously, we may retry it under certain conditions.
///
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`])
/// than when prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
pvf: Pvf,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
inputs: ExecutePvfInputs,
) -> Result<(), Fatal> {
let ExecutePvfInputs { pvf, execution_timeout, params, priority, result_tx } = inputs;
let artifact_id = pvf.as_artifact_id();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
@@ -494,6 +508,7 @@ async fn handle_execute_pvf(
ArtifactState::Prepared { last_time_needed } => {
*last_time_needed = SystemTime::now();
// This artifact has already been prepared, send it to the execute queue.
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
@@ -505,11 +520,29 @@ async fn handle_execute_pvf(
)
.await?;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
ArtifactState::Preparing { .. } => {
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
// If we are allowed to retry the failed prepare job, change the state to
// Preparing and re-queue this job.
*state = ArtifactState::Preparing {
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
pvf,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
},
}
} else {
@@ -526,6 +559,7 @@ async fn handle_execute_pvf(
)
.await?;
// Add an execution request that will wait to run after this prepare job has finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}
@@ -546,10 +580,28 @@ async fn handle_heads_up(
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = now;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
ArtifactState::Preparing { .. } => {
// The artifact is already being prepared, so we don't need to do anything.
},
ArtifactState::FailedToProcess(_) => {},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
// If we are allowed to retry the failed prepare job, change the state to
// Preparing and re-queue this job.
*state = ArtifactState::Preparing {
waiting_for_response: vec![],
num_failures: *num_failures,
};
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf: active_pvf,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
}
},
}
} else {
// It's not in the artifacts, so we need to enqueue a job to prepare it.
@@ -599,20 +651,26 @@ async fn handle_prepare_done(
never!("the artifact is already prepared: {:?}", artifact_id);
return Ok(())
},
Some(ArtifactState::FailedToProcess(_)) => {
Some(ArtifactState::FailedToProcess { .. }) => {
// The reasoning is similar to the above, the artifact cannot be
// processed at this point.
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
return Ok(())
},
Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state,
Some(state @ ArtifactState::Preparing { .. }) => state,
};
if let ArtifactState::Preparing { waiting_for_response } = state {
let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
state
{
for result_sender in waiting_for_response.drain(..) {
let _ = result_sender.send(result.clone());
}
}
num_failures
} else {
never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
return Ok(())
};
// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
@@ -644,7 +702,11 @@ async fn handle_prepare_done(
*state = match result {
Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() },
Err(error) => ArtifactState::FailedToProcess(error.clone()),
Err(error) => ArtifactState::FailedToProcess {
last_time_failed: SystemTime::now(),
num_failures: *num_failures + 1,
error: error.clone(),
},
};
Ok(())
@@ -707,6 +769,24 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
}
}
/// Check if the conditions to retry a prepare job have been met.
fn can_retry_prepare_after_failure(
last_time_failed: SystemTime,
num_failures: u32,
error: &PrepareError,
) -> bool {
use PrepareError::*;
match error {
// Gracefully returned an error, so it will probably be reproducible. Don't retry.
Prevalidation(_) | Preparation(_) => false,
// Retry if the retry cooldown has elapsed and if we have already retried less than
// `NUM_PREPARE_RETRIES` times.
Panic(_) | TimedOut | DidNotMakeIt =>
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
num_failures <= NUM_PREPARE_RETRIES,
}
}
/// A stream that yields a pulse continuously at a given interval.
fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold(interval, {
@@ -834,6 +914,25 @@ mod tests {
.await
}
async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
use futures_timer::Delay;
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(
&mut self.run,
async {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
_ = to_prepare_queue_rx.next().fuse() => {
panic!("the prepare queue is supposed to be empty")
}
}
}
.boxed(),
)
.await
}
async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
use futures_timer::Delay;
@@ -844,7 +943,7 @@ mod tests {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
_ = to_execute_queue_rx.next().fuse() => {
panic!("the execute queue supposed to be empty")
panic!("the execute queue is supposed to be empty")
}
}
}
@@ -1168,6 +1267,228 @@ mod tests {
}
}
// Test that multiple prechecking requests do not trigger preparation retries if the first one
// failed.
#[async_std::test]
async fn test_precheck_prepare_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
// Submit a precheck request that fails.
let (result_tx, _result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
// The queue received the prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
// Send a PrepareError.
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
// Submit another precheck request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another precheck request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();
// Assert the prepare queue is empty - we do not retry for precheck requests.
test.poll_ensure_to_prepare_queue_is_empty().await;
}
// Test that multiple execution requests trigger preparation retries if the first one failed due
// to a potentially non-reproducible error.
#[async_std::test]
async fn test_execute_prepare_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
result_tx,
)
.await
.unwrap();
// The queue received the prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
// Send a PrepareError.
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
result_tx_2,
)
.await
.unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
result_tx_3,
)
.await
.unwrap();
// Assert the prepare queue contains the request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
}
// Test that multiple execution requests don't trigger preparation retries if the first one
// failed due to reproducible error (e.g. Prevalidation).
#[async_std::test]
async fn test_execute_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
result_tx,
)
.await
.unwrap();
// The queue received the prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
// Send a PrepareError.
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::Prevalidation("reproducible error".into())),
})
.await
.unwrap();
// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
result_tx_2,
)
.await
.unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
result_tx_3,
)
.await
.unwrap();
// Assert the prepare queue is empty - we do not retry for prevalidation errors.
test.poll_ensure_to_prepare_queue_is_empty().await;
}
// Test that multiple heads-up requests trigger preparation retries if the first one failed.
#[async_std::test]
async fn test_heads_up_prepare_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
// Submit a heads-up request that fails.
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
// The queue received the prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
// Send a PrepareError.
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
// Submit another heads-up request.
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another heads-up request.
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
// Assert the prepare queue contains the request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
}
#[async_std::test]
async fn cancellation() {
let mut test = Builder::default().build();