From 20ab68270f2793376cf5ac3047d2aaa818724880 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Fri, 21 May 2021 09:53:03 +0200 Subject: [PATCH] Put WIP artifacts next to ready ones (#3057) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Put WIP artifacts next to ready ones Fixes #3044 * Apply suggestions from code review Co-authored-by: Bastian Köcher Co-authored-by: Bastian Köcher --- polkadot/node/core/pvf/src/host.rs | 3 +- polkadot/node/core/pvf/src/prepare/pool.rs | 21 +- polkadot/node/core/pvf/src/prepare/worker.rs | 193 ++++++++++++++----- polkadot/node/core/pvf/src/worker_common.rs | 21 +- 4 files changed, 180 insertions(+), 58 deletions(-) diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 4197acccb3..d740622ea1 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -155,7 +155,8 @@ pub fn start(config: Config) -> (ValidationHost, impl Future) { let validation_host = ValidationHost { to_host_tx }; let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool( - config.prepare_worker_program_path.to_owned(), + config.prepare_worker_program_path.clone(), + config.cache_path.clone(), config.prepare_worker_spawn_timeout, ); diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 1e3a4eb8ee..618c71e253 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -104,6 +104,7 @@ type Mux = FuturesUnordered>; struct Pool { program_path: PathBuf, + cache_path: PathBuf, spawn_timeout: Duration, to_pool: mpsc::Receiver, from_pool: mpsc::UnboundedSender, @@ -117,6 +118,7 @@ struct Fatal; async fn run( Pool { program_path, + cache_path, spawn_timeout, to_pool, mut from_pool, @@ -141,6 +143,7 @@ async fn run( let to_pool = break_if_fatal!(to_pool.ok_or(Fatal)); handle_to_pool( &program_path, + &cache_path, spawn_timeout, &mut spawned, &mut mux, @@ -181,6 +184,7 @@ async fn purge_dead( fn handle_to_pool( program_path: &Path, + cache_path: &Path, spawn_timeout: Duration, spawned: &mut HopSlotMap, mux: &mut Mux, @@ -199,8 +203,15 @@ fn handle_to_pool( if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { mux.push( - start_work_task(worker, idle, code, artifact_path, background_priority) - .boxed(), + start_work_task( + worker, + idle, + code, + cache_path.to_owned(), + artifact_path, + background_priority + ) + .boxed(), ); } else { // idle token is present after spawn and after a job is concluded; @@ -251,10 +262,12 @@ async fn start_work_task( worker: Worker, idle: IdleWorker, code: Arc>, + cache_path: PathBuf, artifact_path: PathBuf, background_priority: bool, ) -> PoolEvent { - let outcome = worker::start_work(idle, code, artifact_path, background_priority).await; + let outcome = + worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await; PoolEvent::StartWork(worker, outcome) } @@ -314,6 +327,7 @@ fn reply(from_pool: &mut mpsc::UnboundedSender, m: FromPool) -> Result /// Spins up the pool and returns the future that should be polled to make the pool functional. pub fn start( program_path: PathBuf, + cache_path: PathBuf, spawn_timeout: Duration, ) -> ( mpsc::Sender, @@ -325,6 +339,7 @@ pub fn start( let run = run(Pool { program_path, + cache_path, spawn_timeout, to_pool: to_pool_rx, from_pool: from_pool_tx, diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker.rs index 9a6bdb1f89..307396b01a 100644 --- a/polkadot/node/core/pvf/src/prepare/worker.rs +++ b/polkadot/node/core/pvf/src/prepare/worker.rs @@ -19,7 +19,7 @@ use crate::{ artifacts::Artifact, worker_common::{ IdleWorker, SpawnErr, WorkerHandle, bytes_to_path, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, tmpfile, worker_event_loop, + spawn_with_program_path, tmpfile_in, worker_event_loop, }, }; use async_std::{ @@ -70,6 +70,7 @@ pub enum Outcome { pub async fn start_work( worker: IdleWorker, code: Arc>, + cache_path: &Path, artifact_path: PathBuf, background_priority: bool, ) -> Outcome { @@ -87,63 +88,164 @@ pub async fn start_work( renice(pid, NICENESS_BACKGROUND); } - if let Err(err) = send_request(&mut stream, code).await { - tracing::warn!("failed to send a prepare request to pid={}: {:?}", pid, err); - return Outcome::DidntMakeIt; - } + with_tmp_file(pid, cache_path, |tmp_file| async move { + if let Err(err) = send_request(&mut stream, code, &tmp_file).await { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to send a prepare request: {:?}", + err, + ); + return Outcome::DidntMakeIt; + } - // Wait for the result from the worker, keeping in mind that there may be a timeout, the - // worker may get killed, or something along these lines. - // - // In that case we should handle these gracefully by writing the artifact file by ourselves. - // We may potentially overwrite the artifact in rare cases where the worker didn't make - // it to report back the result. + // Wait for the result from the worker, keeping in mind that there may be a timeout, the + // worker may get killed, or something along these lines. + // + // In that case we should handle these gracefully by writing the artifact file by ourselves. + // We may potentially overwrite the artifact in rare cases where the worker didn't make + // it to report back the result. - enum Selected { - Done, - IoErr, - Deadline, - } + enum Selected { + Done, + IoErr, + Deadline, + } - let selected = futures::select! { - artifact_path_bytes = framed_recv(&mut stream).fuse() => { - match artifact_path_bytes { - Ok(bytes) => { - if let Some(tmp_path) = bytes_to_path(&bytes) { - async_std::fs::rename(tmp_path, &artifact_path) + let selected = futures::select! { + res = framed_recv(&mut stream).fuse() => { + match res { + Ok(x) if x == &[1u8] => { + tracing::debug!( + target: LOG_TARGET, + worker_pid = %pid, + "promoting WIP artifact {} to {}", + tmp_file.display(), + artifact_path.display(), + ); + + async_std::fs::rename(&tmp_file, &artifact_path) .await .map(|_| Selected::Done) - .unwrap_or(Selected::IoErr) - } else { + .unwrap_or_else(|err| { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to rename the artifact from {} to {}: {:?}", + tmp_file.display(), + artifact_path.display(), + err, + ); + Selected::IoErr + }) + } + Ok(response_bytes) => { + use sp_core::hexdisplay::HexDisplay; + let bound_bytes = + &response_bytes[..response_bytes.len().min(4)]; + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), + ); + Selected::IoErr + }, + Err(err) => { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to recv a prepare response: {:?}", + err, + ); Selected::IoErr } - }, - Err(_) => Selected::IoErr, + } + }, + _ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline, + }; + + match selected { + Selected::Done => { + renice(pid, NICENESS_FOREGROUND); + Outcome::Concluded(IdleWorker { stream, pid }) } - }, - _ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline, + Selected::IoErr | Selected::Deadline => { + let bytes = Artifact::DidntMakeIt.serialize(); + // best effort: there is nothing we can do here if the write fails. + let _ = async_std::fs::write(&artifact_path, &bytes).await; + Outcome::DidntMakeIt + } + } + }) + .await +} + +/// Create a temporary file for an artifact at the given cache path and execute the given +/// future/closure passing the file path in. +/// +/// The function will try best effort to not leave behind the temporary file. +async fn with_tmp_file(pid: u32, cache_path: &Path, f: F) -> Outcome +where + Fut: futures::Future, + F: FnOnce(PathBuf) -> Fut, +{ + let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await { + Ok(f) => f, + Err(err) => { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to create a temp file for the artifact: {:?}", + err, + ); + return Outcome::DidntMakeIt; + } }; - match selected { - Selected::Done => { - renice(pid, NICENESS_FOREGROUND); - Outcome::Concluded(IdleWorker { stream, pid }) - } - Selected::IoErr | Selected::Deadline => { - let bytes = Artifact::DidntMakeIt.serialize(); - // best effort: there is nothing we can do here if the write fails. - let _ = async_std::fs::write(&artifact_path, &bytes).await; - Outcome::DidntMakeIt + let outcome = f(tmp_file.clone()).await; + + // The function called above is expected to move `tmp_file` to a new location upon success. However, + // the function may as well fail and in that case we should remove the tmp file here. + // + // In any case, we try to remove the file here so that there are no leftovers. We only report + // errors that are different from the `NotFound`. + match async_std::fs::remove_file(tmp_file).await { + Ok(()) => (), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), + Err(err) => { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to remove the tmp file: {:?}", + err, + ); } } + + outcome } -async fn send_request(stream: &mut UnixStream, code: Arc>) -> io::Result<()> { - framed_send(stream, &*code).await +async fn send_request( + stream: &mut UnixStream, + code: Arc>, + tmp_file: &Path, +) -> io::Result<()> { + framed_send(stream, &*code).await?; + framed_send(stream, path_to_bytes(tmp_file)).await?; + Ok(()) } -async fn recv_request(stream: &mut UnixStream) -> io::Result> { - framed_recv(stream).await +async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf)> { + let code = framed_recv(stream).await?; + let tmp_file = framed_recv(stream).await?; + let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "prepare pvf recv_request: non utf-8 artifact path".to_string(), + ) + })?; + Ok((code, tmp_file)) } pub fn bump_priority(handle: &WorkerHandle) { @@ -173,7 +275,7 @@ fn renice(pid: u32, niceness: i32) { pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |mut stream| async move { loop { - let code = recv_request(&mut stream).await?; + let (code, dest) = recv_request(&mut stream).await?; tracing::debug!( target: LOG_TARGET, @@ -183,7 +285,6 @@ pub fn worker_entrypoint(socket_path: &str) { let artifact_bytes = prepare_artifact(&code).serialize(); // Write the serialized artifact into into a temp file. - let dest = tmpfile("prepare-artifact-").await?; tracing::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), @@ -192,8 +293,8 @@ pub fn worker_entrypoint(socket_path: &str) { ); async_std::fs::write(&dest, &artifact_bytes).await?; - // Communicate the results back to the host. - framed_send(&mut stream, &path_to_bytes(&dest)).await?; + // Return back a byte that signals finishing the work. + framed_send(&mut stream, &[1u8]).await?; } }); } diff --git a/polkadot/node/core/pvf/src/worker_common.rs b/polkadot/node/core/pvf/src/worker_common.rs index 6818baa18f..46d0b730b9 100644 --- a/polkadot/node/core/pvf/src/worker_common.rs +++ b/polkadot/node/core/pvf/src/worker_common.rs @@ -85,13 +85,12 @@ where result } -/// Returns a path under the location for temporary files. The file name will start with the given -/// prefix. +/// Returns a path under the given `dir`. The file name will start with the given prefix. /// /// There is only a certain number of retries. If exceeded this function will give up and return an /// error. -pub async fn tmpfile(prefix: &str) -> io::Result { - fn tmppath(prefix: &str) -> PathBuf { +pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result { + fn tmppath(prefix: &str, dir: &Path) -> PathBuf { use rand::distributions::Alphanumeric; const DESCRIMINATOR_LEN: usize = 10; @@ -107,15 +106,15 @@ pub async fn tmpfile(prefix: &str) -> io::Result { let s = std::str::from_utf8(&buf) .expect("the string is collected from a valid utf-8 sequence; qed"); - let mut temp_dir = PathBuf::from(std::env::temp_dir()); - temp_dir.push(s); - temp_dir + let mut file = dir.to_owned(); + file.push(s); + file } const NUM_RETRIES: usize = 50; for _ in 0..NUM_RETRIES { - let candidate_path = tmppath(prefix); + let candidate_path = tmppath(prefix, dir); if !candidate_path.exists().await { return Ok(candidate_path) } @@ -126,6 +125,12 @@ pub async fn tmpfile(prefix: &str) -> io::Result { ) } +/// The same as [`tmpfile_in`], but uses [`std::env::temp_dir`] as the directory. +pub async fn tmpfile(prefix: &str) -> io::Result { + let temp_dir = PathBuf::from(std::env::temp_dir()); + tmpfile_in(prefix, &temp_dir).await +} + pub fn worker_event_loop(debug_id: &'static str, socket_path: &str, mut event_loop: F) where F: FnMut(UnixStream) -> Fut,