PVF preparation: do not conflate errors (#6384)

* PVF preparation: do not conflate errors

+ Adds some more granularity to the prepare errors.
+ Better distinguish whether errors occur on the host side or the worker.
+ Do not kill the worker if the error happened on the host side.
+ Do not retry preparation if the error was `Panic`.
+ Removes unnecessary indirection with `Selected` type.

* Add missing docs, resolve TODOs

* Address review comments and remove TODOs

* Fix error in CI

* Undo unnecessary change

* Update couple of comments

* Don't return error for stream shutdown

* Update node/core/pvf/src/worker_common.rs
This commit is contained in:
Marcin S
2022-12-20 08:32:12 -05:00
committed by GitHub
parent fcc26d42e4
commit e0a0475a05
8 changed files with 173 additions and 108 deletions
+58 -26
View File
@@ -22,7 +22,6 @@ use crate::{
LOG_TARGET,
};
use always_assert::never;
use assert_matches::assert_matches;
use async_std::path::{Path, PathBuf};
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
@@ -232,7 +231,7 @@ fn handle_to_pool(
// items concluded;
// thus idle token is Some;
// qed.
never!("unexpected abscence of the idle token in prepare pool");
never!("unexpected absence of the idle token in prepare pool");
}
} else {
// That's a relatively normal situation since the queue may send `start_work` and
@@ -294,29 +293,28 @@ fn handle_mux(
Ok(())
},
PoolEvent::StartWork(worker, outcome) => {
// If we receive any outcome other than `Concluded`, we attempt to kill the worker
// process.
// If we receive an outcome that the worker is unreachable or that an error occurred on
// the worker, we attempt to kill the worker process.
match outcome {
Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
// relevant. We already send `Rip` when purging if we detect that the
// worker is dead.
return Ok(())
},
Some(data) => data,
};
// We just replace the idle worker that was loaned from this option during
// the work starting.
let old = data.idle.replace(idle);
assert_matches!(old, None, "attempt to overwrite an idle worker");
reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
Ok(())
},
Outcome::Concluded { worker: idle, result } =>
handle_concluded_no_rip(from_pool, spawned, worker, idle, result),
// Return `Concluded`, but do not kill the worker since the error was on the host side.
Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::CreateTmpFileErr(err)),
),
// Return `Concluded`, but do not kill the worker since the error was on the host side.
Outcome::RenameTmpFileErr { worker: idle, result: _, err } =>
handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::RenameTmpFileErr(err)),
),
Outcome::Unreachable => {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Rip(worker))?;
@@ -324,14 +322,14 @@ fn handle_mux(
Ok(())
},
Outcome::DidNotMakeIt => {
Outcome::IoErr => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
result: Err(PrepareError::IoErr),
},
)?;
}
@@ -380,6 +378,40 @@ fn attempt_retire(
}
}
/// Handles the case where we received a response. There potentially was an error, but not the fault
/// of the worker as far as we know, so the worker should not be killed.
///
/// This function tries to put the idle worker back into the pool and then replies with
/// `FromPool::Concluded` with `rip: false`.
fn handle_concluded_no_rip(
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
worker: Worker,
idle: IdleWorker,
result: PrepareResult,
) -> Result<(), Fatal> {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer relevant. We
// already send `Rip` when purging if we detect that the worker is dead.
return Ok(())
},
Some(data) => data,
};
// We just replace the idle worker that was loaned from this option during
// the work starting.
let old = data.idle.replace(idle);
never!(
old.is_some(),
"old idle worker was taken out when starting work; we only replace it here; qed"
);
reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
Ok(())
}
/// Spins up the pool and returns the future that should be polled to make the pool functional.
pub fn start(
metrics: Metrics,
+1 -1
View File
@@ -761,7 +761,7 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
result: Err(PrepareError::IoErr),
});
test.poll_ensure_to_pool_is_empty().await;
}
+34 -36
View File
@@ -59,28 +59,26 @@ pub enum Outcome {
/// The host tried to reach the worker but failed. This is most likely because the worked was
/// killed by the system.
Unreachable,
/// The temporary file for the artifact could not be created at the given cache path.
CreateTmpFileErr { worker: IdleWorker, err: String },
/// The response from the worker is received, but the file cannot be renamed (moved) to the
/// final destination location.
RenameTmpFileErr { worker: IdleWorker, result: PrepareResult, err: String },
/// The worker failed to finish the job until the given deadline.
///
/// The worker is no longer usable and should be killed.
TimedOut,
/// The execution was interrupted abruptly and the worker is not available anymore.
/// An IO error occurred while receiving the result from the worker process.
///
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
DidNotMakeIt,
}
#[derive(Debug)]
enum Selected {
Done(PrepareResult),
IoErr,
Deadline,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
///
/// NOTE: Returning the `TimedOut` or `DidNotMakeIt` errors will trigger the child process being
/// killed.
/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process
/// being killed.
pub async fn start_work(
worker: IdleWorker,
code: Arc<Vec<u8>>,
@@ -97,7 +95,7 @@ pub async fn start_work(
artifact_path.display(),
);
with_tmp_file(pid, cache_path, |tmp_file| async move {
with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await {
gum::warn!(
target: LOG_TARGET,
@@ -120,10 +118,11 @@ pub async fn start_work(
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await;
let selected = match result {
match result {
// Received bytes from worker within the time limit.
Ok(Ok(response_bytes)) =>
handle_response_bytes(
IdleWorker { stream, pid },
response_bytes,
pid,
tmp_file,
@@ -139,7 +138,7 @@ pub async fn start_work(
"failed to recv a prepare response: {:?}",
err,
);
Selected::IoErr
Outcome::IoErr
},
Err(_) => {
// Timed out here on the host.
@@ -148,18 +147,8 @@ pub async fn start_work(
worker_pid = %pid,
"did not recv a prepare response within the time limit",
);
Selected::Deadline
Outcome::TimedOut
},
};
// NOTE: A `TimedOut` or `DidNotMakeIt` error triggers the child process being killed.
match selected {
// Timed out on the child. This should already be logged by the child.
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Selected::Deadline => Outcome::TimedOut,
Selected::IoErr => Outcome::DidNotMakeIt,
}
})
.await
@@ -170,12 +159,13 @@ pub async fn start_work(
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
/// cleared by `with_tmp_file`.
async fn handle_response_bytes(
worker: IdleWorker,
response_bytes: Vec<u8>,
pid: u32,
tmp_file: PathBuf,
artifact_path: PathBuf,
preparation_timeout: Duration,
) -> Selected {
) -> Outcome {
// By convention we expect encoded `PrepareResult`.
let result = match PrepareResult::decode(&mut response_bytes.as_slice()) {
Ok(result) => result,
@@ -188,12 +178,14 @@ async fn handle_response_bytes(
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
return Selected::IoErr
return Outcome::IoErr
},
};
let cpu_time_elapsed = match result {
Ok(result) => result,
Err(_) => return Selected::Done(result),
// Timed out on the child. This should already be logged by the child.
Err(PrepareError::TimedOut) => return Outcome::TimedOut,
Err(_) => return Outcome::Concluded { worker, result },
};
if cpu_time_elapsed > preparation_timeout {
@@ -208,7 +200,10 @@ async fn handle_response_bytes(
);
// Return a timeout error.
return Selected::Deadline
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Outcome::TimedOut
}
gum::debug!(
@@ -219,10 +214,9 @@ async fn handle_response_bytes(
artifact_path.display(),
);
async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
match async_std::fs::rename(&tmp_file, &artifact_path).await {
Ok(_) => Outcome::Concluded { worker, result },
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
@@ -231,15 +225,16 @@ async fn handle_response_bytes(
artifact_path.display(),
err,
);
Selected::IoErr
})
Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) }
},
}
}
/// Create a temporary file for an artifact at the given cache path and execute the given
/// future/closure passing the file path in.
///
/// The function will try best effort to not leave behind the temporary file.
async fn with_tmp_file<F, Fut>(pid: u32, cache_path: &Path, f: F) -> Outcome
async fn with_tmp_file<F, Fut>(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome
where
Fut: futures::Future<Output = Outcome>,
F: FnOnce(PathBuf) -> Fut,
@@ -253,7 +248,10 @@ where
"failed to create a temp file for the artifact: {:?}",
err,
);
return Outcome::DidNotMakeIt
return Outcome::CreateTmpFileErr {
worker: IdleWorker { stream, pid },
err: format!("{:?}", err),
}
},
};