PVF: Add back socket path parameter, use tmp socket path (#1780)

This commit is contained in:
Marcin S
2023-10-05 18:37:54 +02:00
committed by GitHub
parent 86955eef90
commit 51c0c24213
5 changed files with 110 additions and 65 deletions
@@ -18,7 +18,7 @@
pub mod security;
use crate::{worker_dir, SecurityStatus, LOG_TARGET};
use crate::{SecurityStatus, LOG_TARGET};
use cpu_time::ProcessTime;
use futures::never::Never;
use std::{
@@ -115,6 +115,7 @@ macro_rules! decl_worker_main {
},
}
let mut socket_path = None;
let mut worker_dir_path = None;
let mut node_version = None;
let mut can_enable_landlock = false;
@@ -123,6 +124,10 @@ macro_rules! decl_worker_main {
let mut i = 2;
while i < args.len() {
match args[i].as_ref() {
"--socket-path" => {
socket_path = Some(args[i + 1].as_str());
i += 1
},
"--worker-dir-path" => {
worker_dir_path = Some(args[i + 1].as_str());
i += 1
@@ -138,16 +143,24 @@ macro_rules! decl_worker_main {
}
i += 1;
}
let socket_path = socket_path.expect("the --socket-path argument is required");
let worker_dir_path =
worker_dir_path.expect("the --worker-dir-path argument is required");
let socket_path = std::path::Path::new(socket_path).to_owned();
let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();
let security_status = $crate::SecurityStatus {
can_enable_landlock,
can_unshare_user_namespace_and_change_root,
};
$entrypoint(worker_dir_path, node_version, Some($worker_version), security_status);
$entrypoint(
socket_path,
worker_dir_path,
node_version,
Some($worker_version),
security_status,
);
}
};
}
@@ -177,6 +190,7 @@ impl fmt::Display for WorkerKind {
// the version that this crate was compiled with.
pub fn worker_event_loop<F, Fut>(
worker_kind: WorkerKind,
socket_path: PathBuf,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
@@ -190,6 +204,7 @@ pub fn worker_event_loop<F, Fut>(
gum::debug!(
target: LOG_TARGET,
%worker_pid,
?socket_path,
?worker_dir_path,
?security_status,
"starting pvf worker ({})",
@@ -237,12 +252,9 @@ pub fn worker_event_loop<F, Fut>(
}
// Connect to the socket.
let socket_path = worker_dir::socket(&worker_dir_path);
let stream = || -> std::io::Result<UnixStream> {
let stream = UnixStream::connect(&socket_path)?;
// Remove the socket here. We don't also need to do this on the host-side; on failed
// rendezvous, the host will delete the whole worker dir.
std::fs::remove_file(&socket_path)?;
let _ = std::fs::remove_file(&socket_path);
Ok(stream)
}();
let stream = match stream {
@@ -20,7 +20,6 @@ use std::path::{Path, PathBuf};
const WORKER_EXECUTE_ARTIFACT_NAME: &str = "artifact";
const WORKER_PREPARE_TMP_ARTIFACT_NAME: &str = "tmp-artifact";
const WORKER_SOCKET_NAME: &str = "socket";
pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf {
worker_dir_path.join(WORKER_EXECUTE_ARTIFACT_NAME)
@@ -29,7 +28,3 @@ pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf {
pub fn prepare_tmp_artifact(worker_dir_path: &Path) -> PathBuf {
worker_dir_path.join(WORKER_PREPARE_TMP_ARTIFACT_NAME)
}
pub fn socket(worker_dir_path: &Path) -> PathBuf {
worker_dir_path.join(WORKER_SOCKET_NAME)
}
@@ -111,6 +111,8 @@ fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()>
///
/// # Parameters
///
/// - `socket_path`: specifies the path to the socket used to communicate with the host.
///
/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
///
/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
@@ -121,6 +123,7 @@ fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()>
///
/// - `security_status`: contains the detected status of security features.
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
@@ -128,6 +131,7 @@ pub fn worker_entrypoint(
) {
worker_event_loop(
WorkerKind::Execute,
socket_path,
worker_dir_path,
node_version,
worker_version,
@@ -87,6 +87,8 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
///
/// # Parameters
///
/// - `socket_path`: specifies the path to the socket used to communicate with the host.
///
/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
///
/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
@@ -116,6 +118,7 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
@@ -123,6 +126,7 @@ pub fn worker_entrypoint(
) {
worker_event_loop(
WorkerKind::Prepare,
socket_path,
worker_dir_path,
node_version,
worker_version,
+84 -54
View File
@@ -20,7 +20,7 @@ use crate::LOG_TARGET;
use futures::FutureExt as _;
use futures_timer::Delay;
use pin_project::pin_project;
use polkadot_node_core_pvf_common::{worker_dir, SecurityStatus};
use polkadot_node_core_pvf_common::SecurityStatus;
use rand::Rng;
use std::{
fmt, mem,
@@ -67,71 +67,99 @@ pub async fn spawn_with_program_path(
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let program_path = program_path.into();
let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
let socket_path = worker_dir::socket(&worker_dir.path);
let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
let listener = UnixListener::bind(&socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir,
?socket_path,
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Bind
})?;
with_transient_socket_path(debug_id, |socket_path| {
let socket_path = socket_path.to_owned();
let handle = WorkerHandle::spawn(&program_path, &extra_args, &worker_dir.path, security_status)
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir.path,
?socket_path,
"cannot spawn a worker: {:?}",
err,
);
SpawnErr::ProcessSpawn
})?;
let worker_dir_path = worker_dir.path.clone();
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|err| {
async move {
let listener = UnixListener::bind(&socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?worker_dir,
?socket_path,
"cannot accept a worker: {:?}",
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Accept
SpawnErr::Bind
})?;
Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
let handle = WorkerHandle::spawn(
&program_path,
&extra_args,
&socket_path,
&worker_dir.path,
security_status,
)
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir.path,
?socket_path,
"cannot spawn a worker: {:?}",
err,
);
SpawnErr::ProcessSpawn
})?;
let worker_dir_path = worker_dir.path.clone();
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
"cannot accept a worker: {:?}",
err,
);
SpawnErr::Accept
})?;
Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
})
.await
}
async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
where
F: FnOnce(&Path) -> Fut,
Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
{
let socket_path = tmppath(&format!("pvf-host-{}", debug_id))
.await
.map_err(|_| SpawnErr::TmpPath)?;
let result = f(&socket_path).await;
// Best effort to remove the socket file. Under normal circumstances the socket will be removed
// by the worker. We make sure that it is removed here, just in case a failed rendezvous.
let _ = tokio::fs::remove_file(socket_path).await;
result
}
/// Returns a path under the given `dir`. The path name will start with the given prefix.
@@ -169,7 +197,6 @@ pub async fn tmppath_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
}
/// The same as [`tmppath_in`], but uses [`std::env::temp_dir`] as the directory.
#[cfg(test)]
pub async fn tmppath(prefix: &str) -> io::Result<PathBuf> {
let temp_dir = PathBuf::from(std::env::temp_dir());
tmppath_in(prefix, &temp_dir).await
@@ -234,6 +261,7 @@ impl WorkerHandle {
fn spawn(
program: impl AsRef<Path>,
extra_args: &[String],
socket_path: impl AsRef<Path>,
worker_dir_path: impl AsRef<Path>,
security_status: SecurityStatus,
) -> io::Result<Self> {
@@ -257,6 +285,8 @@ impl WorkerHandle {
}
let mut child = command
.args(extra_args)
.arg("--socket-path")
.arg(socket_path.as_ref().as_os_str())
.arg("--worker-dir-path")
.arg(worker_dir_path.as_ref().as_os_str())
.args(&security_args)