Move artifacts states into memory in PVF validation host (#3907)

* pvf host: store only compiled artifacts on disk

* Correctly handle failed artifacts

* Serialize result of PVF preparation uniquely

* Set the artifact state depending on the result

* Return the result of PVF preparation directly

* Move PrepareError to the error module

* Update doc comments

* Update misleading comment

* Cleanup docs

* Conclude a test job with an error

Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
Chris Sosnin
2021-10-22 19:37:58 +03:00
committed by GitHub
parent ad33b8749b
commit 182667830f
7 changed files with 200 additions and 137 deletions
+22 -7
View File
@@ -16,6 +16,7 @@
use super::worker::{self, Outcome};
use crate::{
error::PrepareError,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
@@ -78,9 +79,16 @@ pub enum FromPool {
/// The given worker was just spawned and is ready to be used.
Spawned(Worker),
/// The given worker either succeeded or failed the given job. Under any circumstances the
/// artifact file has been written. The `bool` says whether the worker ripped.
Concluded(Worker, bool),
/// The given worker either succeeded or failed the given job.
Concluded {
/// A key for retrieving the worker data from the pool.
worker: Worker,
/// Indicates whether the worker process was killed.
rip: bool,
/// [`Ok`] indicates that compiled artifact is successfully stored on disk.
/// Otherwise, an [error](PrepareError) is supplied.
result: Result<(), PrepareError>,
},
/// The given worker ceased to exist.
Rip(Worker),
@@ -295,7 +303,7 @@ fn handle_mux(
},
PoolEvent::StartWork(worker, outcome) => {
match outcome {
Outcome::Concluded(idle) => {
Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
@@ -310,7 +318,7 @@ fn handle_mux(
let old = data.idle.replace(idle);
assert_matches!(old, None, "attempt to overwrite an idle worker");
reply(from_pool, FromPool::Concluded(worker, false))?;
reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
Ok(())
},
@@ -321,9 +329,16 @@ fn handle_mux(
Ok(())
},
Outcome::DidntMakeIt => {
Outcome::DidNotMakeIt => {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Concluded(worker, true))?;
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
},
)?;
}
Ok(())
+26 -20
View File
@@ -17,7 +17,9 @@
//! A queue that handles requests for PVF preparation.
use super::pool::{self, Worker};
use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET};
use crate::{
artifacts::ArtifactId, error::PrepareError, metrics::Metrics, Priority, Pvf, LOG_TARGET,
};
use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
@@ -29,7 +31,7 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue::Prepared`] response. In case there is a need to bump the priority, use
/// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
@@ -37,9 +39,13 @@ pub enum ToQueue {
}
/// A response from queue.
#[derive(Debug, PartialEq, Eq)]
pub enum FromQueue {
Prepared(ArtifactId),
#[derive(Debug)]
pub struct FromQueue {
/// Identifier of an artifact.
pub(crate) artifact_id: ArtifactId,
/// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
/// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied.
pub(crate) result: Result<(), PrepareError>,
}
#[derive(Default)]
@@ -299,7 +305,8 @@ async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Resul
use pool::FromPool::*;
match from_pool {
Spawned(worker) => handle_worker_spawned(queue, worker).await?,
Concluded(worker, rip) => handle_worker_concluded(queue, worker, rip).await?,
Concluded { worker, rip, result } =>
handle_worker_concluded(queue, worker, rip, result).await?,
Rip(worker) => handle_worker_rip(queue, worker).await?,
}
Ok(())
@@ -320,6 +327,7 @@ async fn handle_worker_concluded(
queue: &mut Queue,
worker: Worker,
rip: bool,
result: Result<(), PrepareError>,
) -> Result<(), Fatal> {
queue.metrics.prepare_concluded();
@@ -377,7 +385,7 @@ async fn handle_worker_concluded(
"prepare worker concluded",
);
reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?;
reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?;
// Figure out what to do with the worker.
if rip {
@@ -641,12 +649,9 @@ mod tests {
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
test.send_from_pool(pool::FromPool::Concluded(w, false));
test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) });
assert_eq!(
test.poll_and_recv_from_queue().await,
FromQueue::Prepared(pvf(1).as_artifact_id())
);
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}
#[async_std::test]
@@ -671,7 +676,7 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Concluded(w1, false));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
@@ -704,7 +709,7 @@ mod tests {
// That's a bit silly in this context, but in production there will be an entire pool up
// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
// we just check that edge case of an edge case works.
test.send_from_pool(pool::FromPool::Concluded(w1, false));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}
@@ -749,15 +754,12 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
// Conclude worker 1 and rip it.
test.send_from_pool(pool::FromPool::Concluded(w1, true));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) });
// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(
test.poll_and_recv_from_queue().await,
FromQueue::Prepared(pvf(1).as_artifact_id())
);
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}
#[async_std::test]
@@ -773,7 +775,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Concluded(w1, true));
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
});
test.poll_ensure_to_pool_is_empty().await;
}
+80 -69
View File
@@ -15,7 +15,8 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
artifacts::Artifact,
artifacts::CompiledArtifact,
error::PrepareError,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
@@ -29,6 +30,8 @@ use async_std::{
};
use futures::FutureExt as _;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{sync::Arc, time::Duration};
const NICENESS_BACKGROUND: i32 = 10;
@@ -48,7 +51,7 @@ pub async fn spawn(
pub enum Outcome {
/// The worker has finished the work assigned to it.
Concluded(IdleWorker),
Concluded { worker: IdleWorker, result: Result<(), PrepareError> },
/// The host tried to reach the worker but failed. This is most likely because the worked was
/// killed by the system.
Unreachable,
@@ -59,7 +62,7 @@ pub enum Outcome {
/// the artifact).
///
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
DidntMakeIt,
DidNotMakeIt,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -99,13 +102,11 @@ pub async fn start_work(
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines.
//
// In that case we should handle these gracefully by writing the artifact file by ourselves.
// We may potentially overwrite the artifact in rare cases where the worker didn't make
// it to report back the result.
// In that case we should propagate the error to the pool.
#[derive(Debug)]
enum Selected {
Done,
Done(Result<(), PrepareError>),
IoErr,
Deadline,
}
@@ -113,41 +114,48 @@ pub async fn start_work(
let selected = futures::select! {
res = framed_recv(&mut stream).fuse() => {
match res {
Ok(x) if x == &[1u8] => {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);
async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done)
.unwrap_or_else(|err| {
tracing::warn!(
Ok(response_bytes) => {
// By convention we expect encoded `Result<(), PrepareError>`.
if let Ok(result) =
<Result<(), PrepareError>>::decode(&mut response_bytes.clone().as_slice())
{
if result.is_ok() {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
}
Ok(response_bytes) => {
use sp_core::hexdisplay::HexDisplay;
let bound_bytes =
&response_bytes[..response_bytes.len().min(4)];
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
} else {
Selected::Done(result)
}
} else {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
}
},
Err(err) => {
tracing::warn!(
@@ -164,24 +172,11 @@ pub async fn start_work(
};
match selected {
Selected::Done => {
Selected::Done(result) => {
renice(pid, NICENESS_FOREGROUND);
Outcome::Concluded(IdleWorker { stream, pid })
},
Selected::IoErr | Selected::Deadline => {
let bytes = Artifact::DidntMakeIt.serialize();
// best effort: there is nothing we can do here if the write fails.
if let Err(err) = async_std::fs::write(&artifact_path, &bytes).await {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"preparation didn't make it, because of `{:?}`: {:?}",
selected,
err,
);
}
Outcome::DidntMakeIt
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
},
Selected::IoErr | Selected::Deadline => Outcome::DidNotMakeIt,
}
})
.await
@@ -205,7 +200,7 @@ where
"failed to create a temp file for the artifact: {:?}",
err,
);
return Outcome::DidntMakeIt
return Outcome::DidNotMakeIt
},
};
@@ -288,31 +283,47 @@ pub fn worker_entrypoint(socket_path: &str) {
worker_pid = %std::process::id(),
"worker: preparing artifact",
);
let artifact_bytes = prepare_artifact(&code).serialize();
// Write the serialized artifact into into a temp file.
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: writing artifact to {}",
dest.display(),
);
async_std::fs::write(&dest, &artifact_bytes).await?;
let result = match prepare_artifact(&code) {
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
},
Ok(compiled_artifact) => {
// Write the serialized artifact into a temp file.
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare
// worker is only required to send an empty `Ok` to the pool
// to indicate the success.
// Return back a byte that signals finishing the work.
framed_send(&mut stream, &[1u8]).await?;
let artifact_bytes = compiled_artifact.encode();
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: writing artifact to {}",
dest.display(),
);
async_std::fs::write(&dest, &artifact_bytes).await?;
Ok(())
},
};
framed_send(&mut stream, result.encode().as_slice()).await?;
}
});
}
fn prepare_artifact(code: &[u8]) -> Artifact {
fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
let blob = match crate::executor_intf::prevalidate(code) {
Err(err) => return Artifact::PrevalidationErr(format!("{:?}", err)),
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};
match crate::executor_intf::prepare(blob) {
Ok(compiled_artifact) => Artifact::Compiled { compiled_artifact },
Err(err) => Artifact::PreparationErr(format!("{:?}", err)),
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
}