mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 17:31:05 +00:00
Check spawned worker version vs node version before PVF preparation (#6861)
* Check spawned worker version vs node version before PVF preparation * Address discussions * Propagate errors and shutdown preparation and execution pipelines properly * Add logs; Fix execution worker checks * Revert "Propagate errors and shutdown preparation and execution pipelines properly" This reverts commit b96cc3160ff58db5ff001d8ca0bfea9bd4bdd0f2. * Don't try to shut down; report the condition and exit worker * Get rid of `VersionMismatch` preparation error * Merge master * Add docs; Fix tests * Update Cargo.lock * Kill again, but only the main node process * Move unsafe code to a common safe function * Fix libc dependency error on MacOS * pvf spawning: Add some logging, add a small integration test * Minor fixes * Restart CI --------- Co-authored-by: Marcin S <marcin@realemail.net>
This commit is contained in:
@@ -47,9 +47,13 @@ pub async fn spawn(
|
||||
executor_params: ExecutorParams,
|
||||
spawn_timeout: Duration,
|
||||
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
|
||||
let (mut idle_worker, worker_handle) =
|
||||
spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout)
|
||||
.await?;
|
||||
let (mut idle_worker, worker_handle) = spawn_with_program_path(
|
||||
"execute",
|
||||
program_path,
|
||||
&["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
|
||||
spawn_timeout,
|
||||
)
|
||||
.await?;
|
||||
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
|
||||
.await
|
||||
.map_err(|error| {
|
||||
@@ -260,11 +264,25 @@ impl Response {
|
||||
}
|
||||
|
||||
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
|
||||
/// the path to the socket used to communicate with the host.
|
||||
pub fn worker_entrypoint(socket_path: &str) {
|
||||
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
|
||||
/// is checked against the worker version. A mismatch results in immediate worker termination.
|
||||
/// `None` is used for tests and in other situations when version check is not necessary.
|
||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
|
||||
let handshake = recv_handshake(&mut stream).await?;
|
||||
let worker_pid = std::process::id();
|
||||
if let Some(version) = node_version {
|
||||
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"Node and worker version mismatch, node needs restarting, forcing shutdown",
|
||||
);
|
||||
crate::kill_parent_node_in_emergency();
|
||||
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
|
||||
}
|
||||
}
|
||||
|
||||
let handshake = recv_handshake(&mut stream).await?;
|
||||
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
|
||||
})?);
|
||||
@@ -273,7 +291,7 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %std::process::id(),
|
||||
%worker_pid,
|
||||
"worker: validating artifact {}",
|
||||
artifact_path.display(),
|
||||
);
|
||||
@@ -307,7 +325,7 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %std::process::id(),
|
||||
%worker_pid,
|
||||
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
|
||||
cpu_time_elapsed.as_millis(),
|
||||
execution_timeout.as_millis(),
|
||||
|
||||
@@ -114,6 +114,7 @@ pub use pvf::PvfPrepData;
|
||||
|
||||
pub use host::{start, Config, ValidationHost};
|
||||
pub use metrics::Metrics;
|
||||
pub(crate) use worker_common::kill_parent_node_in_emergency;
|
||||
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
|
||||
|
||||
pub use execute::worker_entrypoint as execute_worker_entrypoint;
|
||||
|
||||
@@ -52,7 +52,13 @@ pub async fn spawn(
|
||||
program_path: &Path,
|
||||
spawn_timeout: Duration,
|
||||
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
|
||||
spawn_with_program_path("prepare", program_path, &["prepare-worker"], spawn_timeout).await
|
||||
spawn_with_program_path(
|
||||
"prepare",
|
||||
program_path,
|
||||
&["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
|
||||
spawn_timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub enum Outcome {
|
||||
@@ -321,7 +327,9 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
|
||||
}
|
||||
|
||||
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
|
||||
/// the path to the socket used to communicate with the host.
|
||||
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
|
||||
/// is checked against the worker version. A mismatch results in immediate worker termination.
|
||||
/// `None` is used for tests and in other situations when version check is not necessary.
|
||||
///
|
||||
/// # Flow
|
||||
///
|
||||
@@ -342,10 +350,22 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
|
||||
///
|
||||
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
||||
/// send that in the `PrepareResult`.
|
||||
pub fn worker_entrypoint(socket_path: &str) {
|
||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
|
||||
let worker_pid = std::process::id();
|
||||
if let Some(version) = node_version {
|
||||
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"Node and worker version mismatch, node needs restarting, forcing shutdown",
|
||||
);
|
||||
crate::kill_parent_node_in_emergency();
|
||||
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let worker_pid = std::process::id();
|
||||
let (pvf, dest) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
|
||||
@@ -61,22 +61,34 @@ macro_rules! decl_puppet_worker_main {
|
||||
$crate::sp_tracing::try_init_simple();
|
||||
|
||||
let args = std::env::args().collect::<Vec<_>>();
|
||||
if args.len() < 2 {
|
||||
if args.len() < 3 {
|
||||
panic!("wrong number of arguments");
|
||||
}
|
||||
|
||||
let mut version = None;
|
||||
let mut socket_path: &str = "";
|
||||
|
||||
for i in 2..args.len() {
|
||||
match args[i].as_ref() {
|
||||
"--socket-path" => socket_path = args[i + 1].as_str(),
|
||||
"--node-version" => version = Some(args[i + 1].as_str()),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
let subcommand = &args[1];
|
||||
match subcommand.as_ref() {
|
||||
"exit" => {
|
||||
std::process::exit(1);
|
||||
},
|
||||
"sleep" => {
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
},
|
||||
"prepare-worker" => {
|
||||
let socket_path = &args[2];
|
||||
$crate::prepare_worker_entrypoint(socket_path);
|
||||
$crate::prepare_worker_entrypoint(&socket_path, version);
|
||||
},
|
||||
"execute-worker" => {
|
||||
let socket_path = &args[2];
|
||||
$crate::execute_worker_entrypoint(socket_path);
|
||||
$crate::execute_worker_entrypoint(&socket_path, version);
|
||||
},
|
||||
other => panic!("unknown subcommand: {}", other),
|
||||
}
|
||||
|
||||
@@ -61,6 +61,8 @@ pub async fn spawn_with_program_path(
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%debug_id,
|
||||
?program_path,
|
||||
?extra_args,
|
||||
"cannot bind unix socket: {:?}",
|
||||
err,
|
||||
);
|
||||
@@ -68,10 +70,12 @@ pub async fn spawn_with_program_path(
|
||||
})?;
|
||||
|
||||
let handle =
|
||||
WorkerHandle::spawn(program_path, extra_args, socket_path).map_err(|err| {
|
||||
WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%debug_id,
|
||||
?program_path,
|
||||
?extra_args,
|
||||
"cannot spawn a worker: {:?}",
|
||||
err,
|
||||
);
|
||||
@@ -84,6 +88,8 @@ pub async fn spawn_with_program_path(
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%debug_id,
|
||||
?program_path,
|
||||
?extra_args,
|
||||
"cannot accept a worker: {:?}",
|
||||
err,
|
||||
);
|
||||
@@ -92,6 +98,14 @@ pub async fn spawn_with_program_path(
|
||||
Ok((IdleWorker { stream, pid: handle.id() }, handle))
|
||||
}
|
||||
_ = Delay::new(spawn_timeout).fuse() => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%debug_id,
|
||||
?program_path,
|
||||
?extra_args,
|
||||
?spawn_timeout,
|
||||
"spawning and connecting to socket timed out",
|
||||
);
|
||||
Err(SpawnErr::AcceptTimeout)
|
||||
}
|
||||
}
|
||||
@@ -162,6 +176,13 @@ where
|
||||
F: FnMut(Handle, UnixStream) -> Fut,
|
||||
Fut: futures::Future<Output = io::Result<Never>>,
|
||||
{
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %std::process::id(),
|
||||
"starting pvf worker ({})",
|
||||
debug_id,
|
||||
);
|
||||
|
||||
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
|
||||
let handle = rt.handle();
|
||||
let err = rt
|
||||
@@ -179,7 +200,7 @@ where
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %std::process::id(),
|
||||
"pvf worker ({}): {:?}",
|
||||
"quitting pvf worker ({}): {:?}",
|
||||
debug_id,
|
||||
err,
|
||||
);
|
||||
@@ -280,6 +301,7 @@ impl WorkerHandle {
|
||||
) -> io::Result<Self> {
|
||||
let mut child = process::Command::new(program.as_ref())
|
||||
.args(extra_args)
|
||||
.arg("--socket-path")
|
||||
.arg(socket_path.as_ref().as_os_str())
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.kill_on_drop(true)
|
||||
@@ -393,3 +415,20 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
|
||||
r.read_exact(&mut buf).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGKILL`
|
||||
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
|
||||
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
|
||||
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
|
||||
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
|
||||
/// no leftover artifacts are possible.
|
||||
pub(crate) fn kill_parent_node_in_emergency() {
|
||||
unsafe {
|
||||
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
|
||||
// some corner cases, which is checked. `kill()` never fails.
|
||||
let ppid = libc::getppid();
|
||||
if ppid > 1 {
|
||||
libc::kill(ppid, libc::SIGKILL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user