mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 13:21:01 +00:00
PVF: Minor refactor in workers code (#7012)
* Move version check to `worker_event_loop` * More minor refactors - More consistent use of `format_invalid` and `format_internal`. - Fix a doc error. - Fix some poorly-named local variables.
This commit is contained in:
@@ -29,12 +29,11 @@ pub enum PrepareError {
|
|||||||
Prevalidation(String),
|
Prevalidation(String),
|
||||||
/// Compilation failed for the given PVF.
|
/// Compilation failed for the given PVF.
|
||||||
Preparation(String),
|
Preparation(String),
|
||||||
/// An unexpected panic has occured in the preparation worker.
|
/// An unexpected panic has occurred in the preparation worker.
|
||||||
Panic(String),
|
Panic(String),
|
||||||
/// Failed to prepare the PVF due to the time limit.
|
/// Failed to prepare the PVF due to the time limit.
|
||||||
TimedOut,
|
TimedOut,
|
||||||
/// An IO error occurred while receiving the result from the worker process. This state is reported by the
|
/// An IO error occurred. This state is reported by either the validation host or by the worker.
|
||||||
/// validation host (not by the worker).
|
|
||||||
IoErr(String),
|
IoErr(String),
|
||||||
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
|
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
|
||||||
/// validation host (not by the worker).
|
/// validation host (not by the worker).
|
||||||
|
|||||||
@@ -261,6 +261,13 @@ impl Response {
|
|||||||
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
|
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn format_internal(ctx: &'static str, msg: &str) -> Self {
|
||||||
|
if msg.is_empty() {
|
||||||
|
Self::InternalError(ctx.to_string())
|
||||||
|
} else {
|
||||||
|
Self::InternalError(format!("{}: {}", ctx, msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
|
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
|
||||||
@@ -268,19 +275,8 @@ impl Response {
|
|||||||
/// is checked against the worker version. A mismatch results in immediate worker termination.
|
/// is checked against the worker version. A mismatch results in immediate worker termination.
|
||||||
/// `None` is used for tests and in other situations when version check is not necessary.
|
/// `None` is used for tests and in other situations when version check is not necessary.
|
||||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||||
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
|
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
|
||||||
let worker_pid = std::process::id();
|
let worker_pid = std::process::id();
|
||||||
if let Some(version) = node_version {
|
|
||||||
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
|
|
||||||
gum::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
%worker_pid,
|
|
||||||
"Node and worker version mismatch, node needs restarting, forcing shutdown",
|
|
||||||
);
|
|
||||||
crate::kill_parent_node_in_emergency();
|
|
||||||
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let handshake = recv_handshake(&mut stream).await?;
|
let handshake = recv_handshake(&mut stream).await?;
|
||||||
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
|
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
|
||||||
@@ -301,7 +297,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
let cpu_time_start = ProcessTime::now();
|
let cpu_time_start = ProcessTime::now();
|
||||||
|
|
||||||
// Spawn a new thread that runs the CPU time monitor.
|
// Spawn a new thread that runs the CPU time monitor.
|
||||||
let thread_fut = rt_handle
|
let cpu_time_monitor_fut = rt_handle
|
||||||
.spawn_blocking(move || {
|
.spawn_blocking(move || {
|
||||||
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
|
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
|
||||||
})
|
})
|
||||||
@@ -313,14 +309,14 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
})
|
})
|
||||||
.fuse();
|
.fuse();
|
||||||
|
|
||||||
pin_mut!(thread_fut);
|
pin_mut!(cpu_time_monitor_fut);
|
||||||
pin_mut!(execute_fut);
|
pin_mut!(execute_fut);
|
||||||
|
|
||||||
let response = select_biased! {
|
let response = select_biased! {
|
||||||
// If this future is not selected, the join handle is dropped and the thread will
|
// If this future is not selected, the join handle is dropped and the thread will
|
||||||
// finish in the background.
|
// finish in the background.
|
||||||
join_res = thread_fut => {
|
cpu_time_monitor_res = cpu_time_monitor_fut => {
|
||||||
match join_res {
|
match cpu_time_monitor_res {
|
||||||
Ok(Some(cpu_time_elapsed)) => {
|
Ok(Some(cpu_time_elapsed)) => {
|
||||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||||
gum::warn!(
|
gum::warn!(
|
||||||
@@ -333,12 +329,12 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
Response::TimedOut
|
Response::TimedOut
|
||||||
},
|
},
|
||||||
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
|
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
|
||||||
Err(e) => Response::InternalError(format!("{}", e)),
|
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
execute_res = execute_fut => {
|
execute_res = execute_fut => {
|
||||||
let _ = finished_tx.send(());
|
let _ = finished_tx.send(());
|
||||||
execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e)))
|
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -367,7 +363,7 @@ fn validate_using_artifact(
|
|||||||
|
|
||||||
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
|
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Response::InvalidCandidate(format!("validation result decoding failed: {}", err)),
|
return Response::format_invalid("validation result decoding failed", &err.to_string()),
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -114,7 +114,6 @@ pub use pvf::PvfPrepData;
|
|||||||
|
|
||||||
pub use host::{start, Config, ValidationHost};
|
pub use host::{start, Config, ValidationHost};
|
||||||
pub use metrics::Metrics;
|
pub use metrics::Metrics;
|
||||||
pub(crate) use worker_common::kill_parent_node_in_emergency;
|
|
||||||
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
|
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
|
||||||
|
|
||||||
pub use execute::worker_entrypoint as execute_worker_entrypoint;
|
pub use execute::worker_entrypoint as execute_worker_entrypoint;
|
||||||
|
|||||||
@@ -351,19 +351,8 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
|
|||||||
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
||||||
/// send that in the `PrepareResult`.
|
/// send that in the `PrepareResult`.
|
||||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||||
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
|
worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
|
||||||
let worker_pid = std::process::id();
|
let worker_pid = std::process::id();
|
||||||
if let Some(version) = node_version {
|
|
||||||
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
|
|
||||||
gum::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
%worker_pid,
|
|
||||||
"Node and worker version mismatch, node needs restarting, forcing shutdown",
|
|
||||||
);
|
|
||||||
crate::kill_parent_node_in_emergency();
|
|
||||||
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (pvf, dest) = recv_request(&mut stream).await?;
|
let (pvf, dest) = recv_request(&mut stream).await?;
|
||||||
|
|||||||
@@ -171,18 +171,35 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
|
|||||||
tmpfile_in(prefix, &temp_dir).await
|
tmpfile_in(prefix, &temp_dir).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F)
|
pub fn worker_event_loop<F, Fut>(
|
||||||
where
|
debug_id: &'static str,
|
||||||
|
socket_path: &str,
|
||||||
|
node_version: Option<&str>,
|
||||||
|
mut event_loop: F,
|
||||||
|
) where
|
||||||
F: FnMut(Handle, UnixStream) -> Fut,
|
F: FnMut(Handle, UnixStream) -> Fut,
|
||||||
Fut: futures::Future<Output = io::Result<Never>>,
|
Fut: futures::Future<Output = io::Result<Never>>,
|
||||||
{
|
{
|
||||||
gum::debug!(
|
let worker_pid = std::process::id();
|
||||||
target: LOG_TARGET,
|
gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);
|
||||||
worker_pid = %std::process::id(),
|
|
||||||
"starting pvf worker ({})",
|
|
||||||
debug_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
|
// Check for a mismatch between the node and worker versions.
|
||||||
|
if let Some(version) = node_version {
|
||||||
|
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
|
||||||
|
gum::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
%worker_pid,
|
||||||
|
"Node and worker version mismatch, node needs restarting, forcing shutdown",
|
||||||
|
);
|
||||||
|
kill_parent_node_in_emergency();
|
||||||
|
let err: io::Result<Never> =
|
||||||
|
Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"));
|
||||||
|
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err);
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the main worker loop.
|
||||||
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
|
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
|
||||||
let handle = rt.handle();
|
let handle = rt.handle();
|
||||||
let err = rt
|
let err = rt
|
||||||
@@ -197,13 +214,7 @@ where
|
|||||||
// It's never `Ok` because it's `Ok(Never)`.
|
// It's never `Ok` because it's `Ok(Never)`.
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
|
|
||||||
gum::debug!(
|
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err);
|
||||||
target: LOG_TARGET,
|
|
||||||
worker_pid = %std::process::id(),
|
|
||||||
"quitting pvf worker ({}): {:?}",
|
|
||||||
debug_id,
|
|
||||||
err,
|
|
||||||
);
|
|
||||||
|
|
||||||
// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
|
// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
|
||||||
// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
|
// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
|
||||||
@@ -422,7 +433,7 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
|
|||||||
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
|
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
|
||||||
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
|
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
|
||||||
/// no leftover artifacts are possible.
|
/// no leftover artifacts are possible.
|
||||||
pub(crate) fn kill_parent_node_in_emergency() {
|
fn kill_parent_node_in_emergency() {
|
||||||
unsafe {
|
unsafe {
|
||||||
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
|
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
|
||||||
// some corner cases, which is checked. `kill()` never fails.
|
// some corner cases, which is checked. `kill()` never fails.
|
||||||
|
|||||||
Reference in New Issue
Block a user