mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 13:21:01 +00:00
Let the PVF host kill the worker on timeout (#6381)
* Let the PVF host kill the worker on timeout * Fix comment * Fix inaccurate comments; add missing return statement * Fix a comment * Fix comment
This commit is contained in:
@@ -74,6 +74,8 @@ pub enum Outcome {
|
||||
|
||||
/// Given the idle token of a worker and parameters of work, communicates with the worker and
|
||||
/// returns the outcome.
|
||||
///
|
||||
/// NOTE: Returning the `HardTimeout` or `IoErr` errors will trigger the child process being killed.
|
||||
pub async fn start_work(
|
||||
worker: IdleWorker,
|
||||
artifact: ArtifactPathId,
|
||||
@@ -148,7 +150,7 @@ pub async fn start_work(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
validation_code_hash = ?artifact.id.code_hash,
|
||||
"execution worker exceeded alloted time for execution",
|
||||
"execution worker exceeded allotted time for execution",
|
||||
);
|
||||
// TODO: This case is not really a hard timeout as the timeout here in the host is
|
||||
// lenient. Should fix this as part of
|
||||
|
||||
@@ -390,7 +390,7 @@ async fn run(
|
||||
from_prepare_queue = from_prepare_queue_rx.next() => {
|
||||
let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
|
||||
|
||||
// Note that preparation always succeeds.
|
||||
// Note that the preparation outcome is always reported as concluded.
|
||||
//
|
||||
// That's because the error conditions are written into the artifact and will be
|
||||
// reported at the time of the execution. It potentially, but not necessarily, can
|
||||
|
||||
@@ -294,12 +294,15 @@ fn handle_mux(
|
||||
Ok(())
|
||||
},
|
||||
PoolEvent::StartWork(worker, outcome) => {
|
||||
// If we receive any outcome other than `Concluded`, we attempt to kill the worker
|
||||
// process.
|
||||
match outcome {
|
||||
Outcome::Concluded { worker: idle, result } => {
|
||||
let data = match spawned.get_mut(worker) {
|
||||
None => {
|
||||
// Perhaps the worker was killed meanwhile and the result is no longer
|
||||
// relevant.
|
||||
// relevant. We already send `Rip` when purging if we detect that the
|
||||
// worker is dead.
|
||||
return Ok(())
|
||||
},
|
||||
Some(data) => data,
|
||||
|
||||
@@ -78,6 +78,9 @@ enum Selected {
|
||||
|
||||
/// Given the idle token of a worker and parameters of work, communicates with the worker and
|
||||
/// returns the outcome.
|
||||
///
|
||||
/// NOTE: Returning the `TimedOut` or `DidNotMakeIt` errors will trigger the child process being
|
||||
/// killed.
|
||||
pub async fn start_work(
|
||||
worker: IdleWorker,
|
||||
code: Arc<Vec<u8>>,
|
||||
@@ -149,6 +152,7 @@ pub async fn start_work(
|
||||
},
|
||||
};
|
||||
|
||||
// NOTE: A `TimedOut` or `DidNotMakeIt` error triggers the child process being killed.
|
||||
match selected {
|
||||
// Timed out on the child. This should already be logged by the child.
|
||||
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
|
||||
@@ -162,6 +166,9 @@ pub async fn start_work(
|
||||
}
|
||||
|
||||
/// Handles the case where we successfully received response bytes on the host from the child.
|
||||
///
|
||||
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
|
||||
/// cleared by `with_tmp_file`.
|
||||
async fn handle_response_bytes(
|
||||
response_bytes: Vec<u8>,
|
||||
pid: u32,
|
||||
@@ -201,9 +208,6 @@ async fn handle_response_bytes(
|
||||
);
|
||||
|
||||
// Return a timeout error.
|
||||
//
|
||||
// NOTE: The artifact exists, but is located in a temporary file which
|
||||
// will be cleared by `with_tmp_file`.
|
||||
return Selected::Deadline
|
||||
}
|
||||
|
||||
|
||||
@@ -199,10 +199,8 @@ where
|
||||
}
|
||||
|
||||
/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
|
||||
/// from sleeping and then either sleeps for the remaining CPU time, or kills the process if we
|
||||
/// exceed the CPU timeout.
|
||||
///
|
||||
/// NOTE: Killed processes are detected and cleaned up in `purge_dead`.
|
||||
/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error
|
||||
/// if we exceed the CPU timeout.
|
||||
///
|
||||
/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the
|
||||
/// background. When it wakes, it will see that the flag has been set and return.
|
||||
@@ -233,7 +231,11 @@ pub async fn cpu_time_monitor_loop(
|
||||
timeout.as_millis(),
|
||||
);
|
||||
|
||||
// Send back a TimedOut error on timeout.
|
||||
// Send back a `TimedOut` error.
|
||||
//
|
||||
// NOTE: This will cause the worker, whether preparation or execution, to be killed by
|
||||
// the host. We do not kill the process here because it would interfere with the proper
|
||||
// handling of this error.
|
||||
let encoded_result = match job_kind {
|
||||
JobKind::Prepare => {
|
||||
let result: Result<(), PrepareError> = Err(PrepareError::TimedOut);
|
||||
@@ -244,8 +246,8 @@ pub async fn cpu_time_monitor_loop(
|
||||
result.encode()
|
||||
},
|
||||
};
|
||||
// If we error there is nothing else we can do here, and we are killing the process,
|
||||
// anyway. The receiving side will just have to time out.
|
||||
// If we error here there is nothing we can do apart from log it. The receiving side
|
||||
// will just have to time out.
|
||||
if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -255,8 +257,7 @@ pub async fn cpu_time_monitor_loop(
|
||||
);
|
||||
}
|
||||
|
||||
// Kill the process.
|
||||
std::process::exit(1);
|
||||
return
|
||||
}
|
||||
|
||||
// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
|
||||
|
||||
Reference in New Issue
Block a user