mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 11:01:01 +00:00
Put WIP artifacts next to ready ones (#3057)
* Put WIP artifacts next to ready ones Fixes #3044 * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -104,6 +104,7 @@ type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
|
||||
|
||||
struct Pool {
|
||||
program_path: PathBuf,
|
||||
cache_path: PathBuf,
|
||||
spawn_timeout: Duration,
|
||||
to_pool: mpsc::Receiver<ToPool>,
|
||||
from_pool: mpsc::UnboundedSender<FromPool>,
|
||||
@@ -117,6 +118,7 @@ struct Fatal;
|
||||
async fn run(
|
||||
Pool {
|
||||
program_path,
|
||||
cache_path,
|
||||
spawn_timeout,
|
||||
to_pool,
|
||||
mut from_pool,
|
||||
@@ -141,6 +143,7 @@ async fn run(
|
||||
let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
|
||||
handle_to_pool(
|
||||
&program_path,
|
||||
&cache_path,
|
||||
spawn_timeout,
|
||||
&mut spawned,
|
||||
&mut mux,
|
||||
@@ -181,6 +184,7 @@ async fn purge_dead(
|
||||
|
||||
fn handle_to_pool(
|
||||
program_path: &Path,
|
||||
cache_path: &Path,
|
||||
spawn_timeout: Duration,
|
||||
spawned: &mut HopSlotMap<Worker, WorkerData>,
|
||||
mux: &mut Mux,
|
||||
@@ -199,8 +203,15 @@ fn handle_to_pool(
|
||||
if let Some(data) = spawned.get_mut(worker) {
|
||||
if let Some(idle) = data.idle.take() {
|
||||
mux.push(
|
||||
start_work_task(worker, idle, code, artifact_path, background_priority)
|
||||
.boxed(),
|
||||
start_work_task(
|
||||
worker,
|
||||
idle,
|
||||
code,
|
||||
cache_path.to_owned(),
|
||||
artifact_path,
|
||||
background_priority
|
||||
)
|
||||
.boxed(),
|
||||
);
|
||||
} else {
|
||||
// idle token is present after spawn and after a job is concluded;
|
||||
@@ -251,10 +262,12 @@ async fn start_work_task(
|
||||
worker: Worker,
|
||||
idle: IdleWorker,
|
||||
code: Arc<Vec<u8>>,
|
||||
cache_path: PathBuf,
|
||||
artifact_path: PathBuf,
|
||||
background_priority: bool,
|
||||
) -> PoolEvent {
|
||||
let outcome = worker::start_work(idle, code, artifact_path, background_priority).await;
|
||||
let outcome =
|
||||
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
|
||||
PoolEvent::StartWork(worker, outcome)
|
||||
}
|
||||
|
||||
@@ -314,6 +327,7 @@ fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result
|
||||
/// Spins up the pool and returns the future that should be polled to make the pool functional.
|
||||
pub fn start(
|
||||
program_path: PathBuf,
|
||||
cache_path: PathBuf,
|
||||
spawn_timeout: Duration,
|
||||
) -> (
|
||||
mpsc::Sender<ToPool>,
|
||||
@@ -325,6 +339,7 @@ pub fn start(
|
||||
|
||||
let run = run(Pool {
|
||||
program_path,
|
||||
cache_path,
|
||||
spawn_timeout,
|
||||
to_pool: to_pool_rx,
|
||||
from_pool: from_pool_tx,
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::{
|
||||
artifacts::Artifact,
|
||||
worker_common::{
|
||||
IdleWorker, SpawnErr, WorkerHandle, bytes_to_path, framed_recv, framed_send, path_to_bytes,
|
||||
spawn_with_program_path, tmpfile, worker_event_loop,
|
||||
spawn_with_program_path, tmpfile_in, worker_event_loop,
|
||||
},
|
||||
};
|
||||
use async_std::{
|
||||
@@ -70,6 +70,7 @@ pub enum Outcome {
|
||||
pub async fn start_work(
|
||||
worker: IdleWorker,
|
||||
code: Arc<Vec<u8>>,
|
||||
cache_path: &Path,
|
||||
artifact_path: PathBuf,
|
||||
background_priority: bool,
|
||||
) -> Outcome {
|
||||
@@ -87,63 +88,164 @@ pub async fn start_work(
|
||||
renice(pid, NICENESS_BACKGROUND);
|
||||
}
|
||||
|
||||
if let Err(err) = send_request(&mut stream, code).await {
|
||||
tracing::warn!("failed to send a prepare request to pid={}: {:?}", pid, err);
|
||||
return Outcome::DidntMakeIt;
|
||||
}
|
||||
with_tmp_file(pid, cache_path, |tmp_file| async move {
|
||||
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"failed to send a prepare request: {:?}",
|
||||
err,
|
||||
);
|
||||
return Outcome::DidntMakeIt;
|
||||
}
|
||||
|
||||
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
|
||||
// worker may get killed, or something along these lines.
|
||||
//
|
||||
// In that case we should handle these gracefully by writing the artifact file by ourselves.
|
||||
// We may potentially overwrite the artifact in rare cases where the worker didn't make
|
||||
// it to report back the result.
|
||||
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
|
||||
// worker may get killed, or something along these lines.
|
||||
//
|
||||
// In that case we should handle these gracefully by writing the artifact file by ourselves.
|
||||
// We may potentially overwrite the artifact in rare cases where the worker didn't make
|
||||
// it to report back the result.
|
||||
|
||||
enum Selected {
|
||||
Done,
|
||||
IoErr,
|
||||
Deadline,
|
||||
}
|
||||
enum Selected {
|
||||
Done,
|
||||
IoErr,
|
||||
Deadline,
|
||||
}
|
||||
|
||||
let selected = futures::select! {
|
||||
artifact_path_bytes = framed_recv(&mut stream).fuse() => {
|
||||
match artifact_path_bytes {
|
||||
Ok(bytes) => {
|
||||
if let Some(tmp_path) = bytes_to_path(&bytes) {
|
||||
async_std::fs::rename(tmp_path, &artifact_path)
|
||||
let selected = futures::select! {
|
||||
res = framed_recv(&mut stream).fuse() => {
|
||||
match res {
|
||||
Ok(x) if x == &[1u8] => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"promoting WIP artifact {} to {}",
|
||||
tmp_file.display(),
|
||||
artifact_path.display(),
|
||||
);
|
||||
|
||||
async_std::fs::rename(&tmp_file, &artifact_path)
|
||||
.await
|
||||
.map(|_| Selected::Done)
|
||||
.unwrap_or(Selected::IoErr)
|
||||
} else {
|
||||
.unwrap_or_else(|err| {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"failed to rename the artifact from {} to {}: {:?}",
|
||||
tmp_file.display(),
|
||||
artifact_path.display(),
|
||||
err,
|
||||
);
|
||||
Selected::IoErr
|
||||
})
|
||||
}
|
||||
Ok(response_bytes) => {
|
||||
use sp_core::hexdisplay::HexDisplay;
|
||||
let bound_bytes =
|
||||
&response_bytes[..response_bytes.len().min(4)];
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"received unexpected response from the prepare worker: {}",
|
||||
HexDisplay::from(&bound_bytes),
|
||||
);
|
||||
Selected::IoErr
|
||||
},
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"failed to recv a prepare response: {:?}",
|
||||
err,
|
||||
);
|
||||
Selected::IoErr
|
||||
}
|
||||
},
|
||||
Err(_) => Selected::IoErr,
|
||||
}
|
||||
},
|
||||
_ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline,
|
||||
};
|
||||
|
||||
match selected {
|
||||
Selected::Done => {
|
||||
renice(pid, NICENESS_FOREGROUND);
|
||||
Outcome::Concluded(IdleWorker { stream, pid })
|
||||
}
|
||||
},
|
||||
_ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline,
|
||||
Selected::IoErr | Selected::Deadline => {
|
||||
let bytes = Artifact::DidntMakeIt.serialize();
|
||||
// best effort: there is nothing we can do here if the write fails.
|
||||
let _ = async_std::fs::write(&artifact_path, &bytes).await;
|
||||
Outcome::DidntMakeIt
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Create a temporary file for an artifact at the given cache path and execute the given
|
||||
/// future/closure passing the file path in.
|
||||
///
|
||||
/// The function will try best effort to not leave behind the temporary file.
|
||||
async fn with_tmp_file<F, Fut>(pid: u32, cache_path: &Path, f: F) -> Outcome
|
||||
where
|
||||
Fut: futures::Future<Output = Outcome>,
|
||||
F: FnOnce(PathBuf) -> Fut,
|
||||
{
|
||||
let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await {
|
||||
Ok(f) => f,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"failed to create a temp file for the artifact: {:?}",
|
||||
err,
|
||||
);
|
||||
return Outcome::DidntMakeIt;
|
||||
}
|
||||
};
|
||||
|
||||
match selected {
|
||||
Selected::Done => {
|
||||
renice(pid, NICENESS_FOREGROUND);
|
||||
Outcome::Concluded(IdleWorker { stream, pid })
|
||||
}
|
||||
Selected::IoErr | Selected::Deadline => {
|
||||
let bytes = Artifact::DidntMakeIt.serialize();
|
||||
// best effort: there is nothing we can do here if the write fails.
|
||||
let _ = async_std::fs::write(&artifact_path, &bytes).await;
|
||||
Outcome::DidntMakeIt
|
||||
let outcome = f(tmp_file.clone()).await;
|
||||
|
||||
// The function called above is expected to move `tmp_file` to a new location upon success. However,
|
||||
// the function may as well fail and in that case we should remove the tmp file here.
|
||||
//
|
||||
// In any case, we try to remove the file here so that there are no leftovers. We only report
|
||||
// errors that are different from the `NotFound`.
|
||||
match async_std::fs::remove_file(tmp_file).await {
|
||||
Ok(()) => (),
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %pid,
|
||||
"failed to remove the tmp file: {:?}",
|
||||
err,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
outcome
|
||||
}
|
||||
|
||||
async fn send_request(stream: &mut UnixStream, code: Arc<Vec<u8>>) -> io::Result<()> {
|
||||
framed_send(stream, &*code).await
|
||||
async fn send_request(
|
||||
stream: &mut UnixStream,
|
||||
code: Arc<Vec<u8>>,
|
||||
tmp_file: &Path,
|
||||
) -> io::Result<()> {
|
||||
framed_send(stream, &*code).await?;
|
||||
framed_send(stream, path_to_bytes(tmp_file)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_request(stream: &mut UnixStream) -> io::Result<Vec<u8>> {
|
||||
framed_recv(stream).await
|
||||
async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> {
|
||||
let code = framed_recv(stream).await?;
|
||||
let tmp_file = framed_recv(stream).await?;
|
||||
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
|
||||
)
|
||||
})?;
|
||||
Ok((code, tmp_file))
|
||||
}
|
||||
|
||||
pub fn bump_priority(handle: &WorkerHandle) {
|
||||
@@ -173,7 +275,7 @@ fn renice(pid: u32, niceness: i32) {
|
||||
pub fn worker_entrypoint(socket_path: &str) {
|
||||
worker_event_loop("prepare", socket_path, |mut stream| async move {
|
||||
loop {
|
||||
let code = recv_request(&mut stream).await?;
|
||||
let (code, dest) = recv_request(&mut stream).await?;
|
||||
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
@@ -183,7 +285,6 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
let artifact_bytes = prepare_artifact(&code).serialize();
|
||||
|
||||
// Write the serialized artifact into into a temp file.
|
||||
let dest = tmpfile("prepare-artifact-").await?;
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
worker_pid = %std::process::id(),
|
||||
@@ -192,8 +293,8 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
);
|
||||
async_std::fs::write(&dest, &artifact_bytes).await?;
|
||||
|
||||
// Communicate the results back to the host.
|
||||
framed_send(&mut stream, &path_to_bytes(&dest)).await?;
|
||||
// Return back a byte that signals finishing the work.
|
||||
framed_send(&mut stream, &[1u8]).await?;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user