New PVF validation host (#2710)

* Implement PVF validation host

* WIP: Diener

* Increase the alloted compilation time

* Add more comments

* Minor clean up

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix pruning artifact removal

* Fix formatting and newlines

* Fix the thread pool

* Update node/core/pvf/src/executor_intf.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Remove redundant test declaration

* Don't convert the path into an intermediate string

* Try to workaround the test failure

* Use the puppet_worker trick again

* Fix a blip

* Move `ensure_wasmtime_version` under the tests mod

* Add a macro for puppet_workers

* fix build for not real-overseer

* Rename the puppet worker for adder collator

* play it safe with the name of adder puppet worker

* Typo: triggered

* Add more comments

* Do not kill exec worker on every error

* Plumb Duration for timeouts

* typo: critical

* Add proofs

* Clean unused imports

* Revert "WIP: Diener"

This reverts commit b9f54e513366c7a6dfdd117ac19fbdc46b900b4d.

* Sync version of wasmtime

* Update cargo.lock

* Update Substrate

* Merge fixes still

* Update wasmtime version in test

* bastifmt

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Squash spaces

* Trailing new line for testing.rs

* Remove controversial code

* comment about biasing

* Fix suggestion

* Add comments

* make it more clear why unwrap_err

* tmpfile retry

* proper proofs for claim_idle

* Remove mutex from ValidationHost

* Add some more logging

* Extract exec timeout into a constant

* Add some clarifying logging

* Use blake2_256

* Clean up the merge

Specifically the leftovers after removing real-overseer

* Update parachain/test-parachains/adder/collator/Cargo.toml

Co-authored-by: Andronik Ordian <write@reusable.software>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Sergei Shulepov
2021-04-09 01:09:56 +03:00
committed by GitHub
parent 896ec8dbc3
commit 59b4d6511f
43 changed files with 5108 additions and 1991 deletions
+311
View File
@@ -0,0 +1,311 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::LOG_TARGET;
use always_assert::always;
use async_std::{
io,
path::{Path, PathBuf},
};
use polkadot_core_primitives::Hash;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
use parity_scale_codec::{Encode, Decode};
use futures::StreamExt;
/// A final product of preparation process. Contains either a ready to run compiled artifact or
/// a description what went wrong.
#[derive(Encode, Decode)]
pub enum Artifact {
/// During the prevalidation stage of preparation an issue was found with the PVF.
PrevalidationErr(String),
/// Compilation failed for the given PVF.
PreparationErr(String),
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
/// or were killed. This state is reported by the validation host (not by the worker).
DidntMakeIt,
/// The PVF passed all the checks and is ready for execution.
Compiled { compiled_artifact: Vec<u8> },
}
impl Artifact {
/// Serializes this struct into a byte buffer.
pub fn serialize(&self) -> Vec<u8> {
self.encode()
}
/// Deserialize the given byte buffer to an artifact.
pub fn deserialize(mut bytes: &[u8]) -> Result<Self, String> {
Artifact::decode(&mut bytes).map_err(|e| format!("{:?}", e))
}
}
/// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to
/// multiple engine implementations the artifact ID should include the engine type as well.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactId {
code_hash: Hash,
}
impl ArtifactId {
const PREFIX: &'static str = "wasmtime_1_";
/// Creates a new artifact ID with the given hash.
pub fn new(code_hash: Hash) -> Self {
Self { code_hash }
}
/// Tries to recover the artifact id from the given file name.
pub fn from_file_name(file_name: &str) -> Option<Self> {
use std::str::FromStr as _;
let file_name = file_name.strip_prefix(Self::PREFIX)?;
let code_hash = Hash::from_str(file_name).ok()?;
Some(Self { code_hash })
}
/// Returns the expected path to this artifact given the root of the cache.
pub fn path(&self, cache_path: &Path) -> PathBuf {
let file_name = format!("{}{}", Self::PREFIX, self.code_hash.to_string());
cache_path.join(file_name)
}
}
pub enum ArtifactState {
/// The artifact is ready to be used by the executor.
///
/// That means that the artifact should be accessible through the path obtained by the artifact
/// id (unless, it was removed externally).
Prepared {
/// The time when the artifact was the last time needed.
///
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
},
/// A task to prepare this artifact is scheduled.
Preparing,
}
/// A container of all known artifact ids and their states.
pub struct Artifacts {
artifacts: HashMap<ArtifactId, ArtifactState>,
}
impl Artifacts {
/// Scan the given cache root for the artifacts.
///
/// The recognized artifacts will be filled in the table and unrecognized will be removed.
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all it's parents are created.
let _ = async_std::fs::create_dir_all(cache_path).await;
let artifacts = match scan_for_known_artifacts(cache_path).await {
Ok(a) => a,
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
"unable to seed the artifacts in memory cache: {:?}. Starting with a clean one",
err,
);
HashMap::new()
}
};
Self { artifacts }
}
#[cfg(test)]
pub(crate) fn empty() -> Self {
Self {
artifacts: HashMap::new(),
}
}
/// Returns the state of the given artifact by its ID.
pub fn artifact_state_mut(&mut self, artifact_id: &ArtifactId) -> Option<&mut ArtifactState> {
self.artifacts.get_mut(artifact_id)
}
/// Inform the table about the artifact with the given ID. The state will be set to "preparing".
///
/// This function must be used only for brand new artifacts and should never be used for
/// replacing existing ones.
pub fn insert_preparing(&mut self, artifact_id: ArtifactId) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Preparing)
.is_none());
}
/// Insert an artifact with the given ID as "prepared".
///
/// This function must be used only for brand new artifacts and should never be used for
/// replacing existing ones.
#[cfg(test)]
pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed })
.is_none());
}
/// Remove and retrive the artifacts from the table that are older than the supplied Time-To-Live.
pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<ArtifactId> {
let now = SystemTime::now();
let mut to_remove = vec![];
for (k, v) in self.artifacts.iter() {
if let ArtifactState::Prepared {
last_time_needed, ..
} = *v {
if now
.duration_since(last_time_needed)
.map(|age| age > artifact_ttl)
.unwrap_or(false)
{
to_remove.push(k.clone());
}
}
}
for artifact in &to_remove {
self.artifacts.remove(artifact);
}
to_remove
}
}
/// Goes over all files in the given directory, collecting all recognizable artifacts. All files
/// that do not look like artifacts are removed.
///
/// All recognized artifacts will be created with the current datetime.
async fn scan_for_known_artifacts(
cache_path: &Path,
) -> io::Result<HashMap<ArtifactId, ArtifactState>> {
let mut result = HashMap::new();
let now = SystemTime::now();
let mut dir = async_std::fs::read_dir(cache_path).await?;
while let Some(res) = dir.next().await {
let entry = res?;
if entry.file_type().await?.is_dir() {
tracing::debug!(
target: LOG_TARGET,
"{} is a dir, and dirs do not belong to us. Removing",
entry.path().display(),
);
let _ = async_std::fs::remove_dir_all(entry.path()).await;
}
let path = entry.path();
let file_name = match path.file_name() {
None => {
// A file without a file name? Weird, just skip it.
continue;
}
Some(file_name) => file_name,
};
let file_name = match file_name.to_str() {
None => {
tracing::debug!(
target: LOG_TARGET,
"{} is not utf-8. Removing",
path.display(),
);
let _ = async_std::fs::remove_file(&path).await;
continue;
}
Some(file_name) => file_name,
};
let artifact_id = match ArtifactId::from_file_name(file_name) {
None => {
tracing::debug!(
target: LOG_TARGET,
"{} is not a recognized artifact. Removing",
path.display(),
);
let _ = async_std::fs::remove_file(&path).await;
continue;
}
Some(artifact_id) => artifact_id,
};
// A sanity check so that we really can access the artifact through the artifact id.
if artifact_id.path(cache_path).is_file().await {
result.insert(
artifact_id,
ArtifactState::Prepared {
last_time_needed: now,
},
);
} else {
tracing::warn!(
target: LOG_TARGET,
"{} is not accessible by artifact_id {:?}",
cache_path.display(),
artifact_id,
);
}
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::ArtifactId;
#[test]
fn ensure_wasmtime_version() {
assert_eq!(
wasmtime_jit::VERSION,
"0.24.0",
"wasmtime version is updated. Check the prefix.",
);
// If the version bump is significant, change `ArtifactId::PREFIX`.
//
// If in doubt bump it. This will lead to removal of the existing artifacts in the on-disk cache
// and recompilation.
}
#[test]
fn from_file_name() {
assert!(ArtifactId::from_file_name("").is_none());
assert!(ArtifactId::from_file_name("junk").is_none());
assert_eq!(
ArtifactId::from_file_name(
"wasmtime_1_0x0022800000000000000000000000000000000000000000000000000000000000"
),
Some(ArtifactId::new(
hex_literal::hex![
"0022800000000000000000000000000000000000000000000000000000000000"
]
.into()
)),
);
}
}
+56
View File
@@ -0,0 +1,56 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
/// A error raised during validation of the candidate.
#[derive(Debug, Clone)]
pub enum ValidationError {
/// The error was raised because the candidate is invalid.
InvalidCandidate(InvalidCandidate),
/// This error is raised due to inability to serve the request.
InternalError(String),
}
/// A description of an error raised during executing a PVF and can be attributed to the combination
/// of the candidate [`polkadot_parachain::primitives::ValidationParams`] and the PVF.
#[derive(Debug, Clone)]
pub enum InvalidCandidate {
/// The failure is reported by the worker. The string contains the error message.
///
/// This also includes the errors reported by the preparation pipeline.
WorkerReportedError(String),
/// The worker has died during validation of a candidate. That may fall in one of the following
/// categories, which we cannot distinguish programmatically:
///
/// (a) Some sort of transient glitch caused the worker process to abort. An example would be that
/// the host machine ran out of free memory and the OOM killer started killing the processes,
/// and in order to save the parent it will "sacrifice child" first.
///
/// (b) The candidate triggered a code path that has lead to the process death. For example,
/// the PVF found a way to consume unbounded amount of resources and then it either exceeded
/// an rlimit (if set) or, again, invited OOM killer. Another possibility is a bug in
/// wasmtime allowed the PVF to gain control over the execution worker.
///
/// We attribute such an event to an invalid candidate in either case.
///
/// The rationale for this is that a glitch may lead to unfair rejecting candidate by a single
/// validator. If the glitch is somewhat more persistant the validator will reject all candidate
/// thrown at it and hopefully the operator notices it by decreased reward performance of the
/// validator. On the other hand, if the worker died because of (b) we would have better chances
/// to stop the attack.
AmbigiousWorkerDeath,
/// PVF execution (compilation is not included) took more time than was allotted.
HardTimeout,
}
+27
View File
@@ -0,0 +1,27 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Execution part of the pipeline.
//!
//! The validation host [runs the queue][`start`] communicating with it by sending [`ToQueue`]
//! messages. The queue will spawn workers in new processes. Those processes should jump to
//! [`worker_entrypoint`].
mod queue;
mod worker;
pub use queue::{ToQueue, start};
pub use worker::worker_entrypoint;
+344
View File
@@ -0,0 +1,344 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A queue that handles requests for PVF execution.
use crate::{
worker_common::{IdleWorker, WorkerHandle},
host::ResultSender,
LOG_TARGET, InvalidCandidate, ValidationError,
};
use super::worker::Outcome;
use std::{collections::VecDeque, fmt, time::Duration};
use futures::{
Future, FutureExt,
channel::mpsc,
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
};
use async_std::path::PathBuf;
use slotmap::HopSlotMap;
slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
Enqueue {
artifact_path: PathBuf,
params: Vec<u8>,
result_tx: ResultSender,
},
}
struct ExecuteJob {
artifact_path: PathBuf,
params: Vec<u8>,
result_tx: ResultSender,
}
struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
}
impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}
struct Workers {
/// The registry of running workers.
running: HopSlotMap<Worker, WorkerData>,
/// The number of spawning but not yet spawned workers.
spawn_inflight: usize,
/// The maximum number of workers queue can have at once.
capacity: usize,
}
impl Workers {
fn can_afford_one_more(&self) -> bool {
self.spawn_inflight + self.running.len() < self.capacity
}
fn find_available(&self) -> Option<Worker> {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
}
/// Find the associated data by the worker token and extract it's [`IdleWorker`] token.
///
/// Returns `None` if either worker is not recognized or idle token is absent.
fn claim_idle(&mut self, worker: Worker) -> Option<IdleWorker> {
self
.running
.get_mut(worker)?
.idle
.take()
}
}
enum QueueEvent {
Spawn((IdleWorker, WorkerHandle)),
StartWork(Worker, Outcome, ResultSender),
}
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
struct Queue {
/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver<ToQueue>,
program_path: PathBuf,
spawn_timeout: Duration,
/// The queue of jobs that are waiting for a worker to pick up.
queue: VecDeque<ExecuteJob>,
workers: Workers,
mux: Mux,
}
impl Queue {
fn new(
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
to_queue_rx: mpsc::Receiver<ToQueue>,
) -> Self {
Self {
program_path,
spawn_timeout,
to_queue_rx,
queue: VecDeque::new(),
mux: Mux::new(),
workers: Workers {
running: HopSlotMap::with_capacity_and_key(10),
spawn_inflight: 0,
capacity: worker_capacity,
},
}
}
async fn run(mut self) {
loop {
futures::select! {
to_queue = self.to_queue_rx.next() => {
if let Some(to_queue) = to_queue {
handle_to_queue(&mut self, to_queue);
} else {
break;
}
}
ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
}
purge_dead(&mut self.workers).await;
}
}
}
async fn purge_dead(workers: &mut Workers) {
let mut to_remove = vec![];
for (worker, data) in workers.running.iter_mut() {
if futures::poll!(&mut data.handle).is_ready() {
// a resolved future means that the worker has terminated. Weed it out.
to_remove.push(worker);
}
}
for w in to_remove {
let _ = workers.running.remove(w);
}
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue {
artifact_path,
params,
result_tx,
} = to_queue;
let job = ExecuteJob {
artifact_path,
params,
result_tx,
};
if let Some(available) = queue.workers.find_available() {
assign(queue, available, job);
} else {
if queue.workers.can_afford_one_more() {
spawn_extra_worker(queue);
}
queue.queue.push_back(job);
}
}
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
QueueEvent::Spawn((idle, handle)) => {
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData {
idle: Some(idle),
handle,
});
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
}
QueueEvent::StartWork(worker, outcome, result_tx) => {
handle_job_finish(queue, worker, outcome, result_tx);
}
}
}
/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
/// worker. Otherwise, puts back into the available workers list.
fn handle_job_finish(queue: &mut Queue, worker: Worker, outcome: Outcome, result_tx: ResultSender) {
let (idle_worker, result) = match outcome {
Outcome::Ok {
result_descriptor,
duration_ms,
idle_worker,
} => {
// TODO: propagate the soft timeout
drop(duration_ms);
(Some(idle_worker), Ok(result_descriptor))
}
Outcome::InvalidCandidate { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::InvalidCandidate(
InvalidCandidate::WorkerReportedError(err),
)),
),
Outcome::InternalError { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::InternalError(err)),
),
Outcome::HardTimeout => (
None,
Err(ValidationError::InvalidCandidate(
InvalidCandidate::HardTimeout,
)),
),
Outcome::IoErr => (
None,
Err(ValidationError::InvalidCandidate(
InvalidCandidate::AmbigiousWorkerDeath,
)),
),
};
// First we send the result. It may fail due the other end of the channel being dropped, that's
// legitimate and we don't treat that as an error.
let _ = result_tx.send(result);
// Then, we should deal with the worker:
//
// - if the `idle_worker` token was returned we should either schedule the next task or just put
// it back so that the next incoming job will be able to claim it
//
// - if the `idle_worker` token was consumed, all the metadata pertaining to that worker should
// be removed.
if let Some(idle_worker) = idle_worker {
if let Some(data) = queue.workers.running.get_mut(worker) {
data.idle = Some(idle_worker);
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
}
} else {
// Note it's possible that the worker was purged already by `purge_dead`
queue.workers.running.remove(worker);
if !queue.queue.is_empty() {
// The worker has died and we still have work we have to do. Request an extra worker.
//
// That can potentially overshoot, but that should be OK.
spawn_extra_worker(queue);
}
}
}
fn spawn_extra_worker(queue: &mut Queue) {
queue
.mux
.push(spawn_worker_task(queue.program_path.clone(), queue.spawn_timeout).boxed());
queue.workers.spawn_inflight += 1;
}
async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> QueueEvent {
use futures_timer::Delay;
loop {
match super::worker::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break QueueEvent::Spawn((idle, handle)),
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
"failed to spawn an execute worker: {:?}",
err,
);
// Assume that the failure intermittent and retry after a delay.
Delay::new(Duration::from_secs(3)).await;
}
}
}
}
/// Ask the given worker to perform the given job.
///
/// The worker must be running and idle.
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
let idle = queue
.workers
.claim_idle(worker)
.expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
qed."
);
queue.mux.push(
async move {
let outcome = super::worker::start_work(idle, job.artifact_path, job.params).await;
QueueEvent::StartWork(worker, outcome, job.result_tx)
}
.boxed(),
);
}
pub fn start(
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let run = Queue::new(
program_path,
worker_capacity,
spawn_timeout,
to_queue_rx,
)
.run();
(to_queue_tx, run)
}
@@ -0,0 +1,272 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
artifacts::Artifact,
LOG_TARGET,
executor_intf::TaskExecutor,
worker_common::{
IdleWorker, SpawnErr, WorkerHandle, bytes_to_path, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, worker_event_loop,
},
};
use std::time::{Duration, Instant};
use async_std::{
io,
os::unix::net::UnixStream,
path::{Path, PathBuf},
};
use futures::FutureExt;
use futures_timer::Delay;
use polkadot_parachain::primitives::ValidationResult;
use parity_scale_codec::{Encode, Decode};
const EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
/// The program should be able to handle `<program-path> execute-worker <socket-path>` invocation.
pub async fn spawn(
program_path: &Path,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
spawn_with_program_path(
"execute",
program_path,
&["execute-worker"],
spawn_timeout,
)
.await
}
/// Outcome of PVF execution.
pub enum Outcome {
/// PVF execution completed successfully and the result is returned. The worker is ready for
/// another job.
Ok {
result_descriptor: ValidationResult,
duration_ms: u64,
idle_worker: IdleWorker,
},
/// The candidate validation failed. It may be for example because the preparation process
/// produced an error or the wasm execution triggered a trap.
InvalidCandidate {
err: String,
idle_worker: IdleWorker,
},
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
InternalError {
err: String,
idle_worker: IdleWorker,
},
/// The execution time exceeded the hard limit. The worker is terminated.
HardTimeout,
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
IoErr,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
pub async fn start_work(
worker: IdleWorker,
artifact_path: PathBuf,
validation_params: Vec<u8>,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"starting execute for {}",
artifact_path.display(),
);
if send_request(&mut stream, &artifact_path, &validation_params).await.is_err() {
return Outcome::IoErr;
}
let response = futures::select! {
response = recv_response(&mut stream).fuse() => {
match response {
Err(_err) => return Outcome::IoErr,
Ok(response) => response,
}
},
_ = Delay::new(EXECUTION_TIMEOUT).fuse() => return Outcome::HardTimeout,
};
match response {
Response::Ok {
result_descriptor,
duration_ms,
} => Outcome::Ok {
result_descriptor,
duration_ms,
idle_worker: IdleWorker { stream, pid },
},
Response::InvalidCandidate(err) => Outcome::InvalidCandidate {
err,
idle_worker: IdleWorker { stream, pid },
},
Response::InternalError(err) => Outcome::InternalError {
err,
idle_worker: IdleWorker { stream, pid },
},
}
}
async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
validation_params: &[u8],
) -> io::Result<()> {
framed_send(stream, path_to_bytes(artifact_path)).await?;
framed_send(stream, validation_params).await
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>)> {
let artifact_path = framed_recv(stream).await?;
let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
let params = framed_recv(stream).await?;
Ok((artifact_path, params))
}
async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
framed_send(stream, &response.encode()).await
}
async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
let response_bytes = framed_recv(stream).await?;
Response::decode(&mut &response_bytes[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_response: decode error: {:?}", e),
)
})
}
#[derive(Encode, Decode)]
enum Response {
Ok {
result_descriptor: ValidationResult,
duration_ms: u64,
},
InvalidCandidate(String),
InternalError(String),
}
impl Response {
fn format_invalid(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InvalidCandidate(ctx.to_string())
} else {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}
}
/// The entrypoint that the spawned execute worker should start with. The socket_path specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |mut stream| async move {
let executor = TaskExecutor::new().map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("cannot create task executor: {}", e),
)
})?;
loop {
let (artifact_path, params) = recv_request(&mut stream).await?;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: validating artifact {}",
artifact_path.display(),
);
let response = validate_using_artifact(&artifact_path, &params, &executor).await;
send_response(&mut stream, response).await?;
}
});
}
async fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
spawner: &TaskExecutor,
) -> Response {
let artifact_bytes = match async_std::fs::read(artifact_path).await {
Err(e) => {
return Response::InternalError(format!(
"failed to read the artifact at {}: {:?}",
artifact_path.display(),
e,
))
}
Ok(b) => b,
};
let artifact = match Artifact::deserialize(&artifact_bytes) {
Err(e) => return Response::InternalError(format!("artifact deserialization: {:?}", e)),
Ok(a) => a,
};
let compiled_artifact = match &artifact {
Artifact::PrevalidationErr(msg) => {
return Response::format_invalid("prevalidation", msg);
}
Artifact::PreparationErr(msg) => {
return Response::format_invalid("preparation", msg);
}
Artifact::DidntMakeIt => {
return Response::format_invalid("preparation timeout", "");
}
Artifact::Compiled { compiled_artifact } => compiled_artifact,
};
let validation_started_at = Instant::now();
let descriptor_bytes =
match crate::executor_intf::execute(compiled_artifact, params, spawner.clone()) {
Err(err) => {
return Response::format_invalid("execute", &err.to_string());
}
Ok(d) => d,
};
let duration_ms = validation_started_at.elapsed().as_millis() as u64;
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) => {
return Response::InvalidCandidate(format!(
"validation result decoding failed: {}",
err
))
}
Ok(r) => r,
};
Response::Ok {
result_descriptor,
duration_ms,
}
}
+239
View File
@@ -0,0 +1,239 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Interface to the Substrate Executor
use std::any::{TypeId, Any};
use sc_executor_common::{
runtime_blob::RuntimeBlob,
wasm_runtime::{InvokeMethod, WasmModule as _},
};
use sc_executor_wasmtime::{Config, Semantics};
use sp_core::{
storage::{ChildInfo, TrackedStorageKey},
};
use sp_wasm_interface::HostFunctions as _;
const CONFIG: Config = Config {
// TODO: Make sure we don't use more than 1GB: https://github.com/paritytech/polkadot/issues/699
heap_pages: 1024,
allow_missing_func_imports: true,
cache_path: None,
semantics: Semantics {
fast_instance_reuse: false,
stack_depth_metering: false,
},
};
/// Runs the prevaldation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error::WasmError> {
let blob = RuntimeBlob::new(code)?;
// It's assumed this function will take care of any prevalidation logic
// that needs to be done.
//
// Do nothing for now.
Ok(blob)
}
/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
/// artifact which can then be used to pass into [`execute`].
pub fn prepare(blob: RuntimeBlob) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics)
}
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
pub fn execute(
compiled_artifact: &[u8],
params: &[u8],
spawner: impl sp_core::traits::SpawnNamed + 'static,
) -> Result<Vec<u8>, sc_executor_common::error::Error> {
let mut extensions = sp_externalities::Extensions::new();
extensions.register(sp_core::traits::TaskExecutorExt::new(spawner));
let mut ext = ValidationExternalities(extensions);
sc_executor::with_externalities_safe(&mut ext, || {
let runtime = sc_executor_wasmtime::create_runtime(
sc_executor_wasmtime::CodeSupplyMode::Artifact { compiled_artifact },
CONFIG,
HostFunctions::host_functions(),
)?;
runtime
.new_instance()?
.call(InvokeMethod::Export("validate_block"), params)
})?
}
type HostFunctions = sp_io::SubstrateHostFunctions;
/// The validation externalities that will panic on any storage related access.
struct ValidationExternalities(sp_externalities::Extensions);
impl sp_externalities::Externalities for ValidationExternalities {
fn storage(&self, _: &[u8]) -> Option<Vec<u8>> {
panic!("storage: unsupported feature for parachain validation")
}
fn storage_hash(&self, _: &[u8]) -> Option<Vec<u8>> {
panic!("storage_hash: unsupported feature for parachain validation")
}
fn child_storage_hash(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
panic!("child_storage_hash: unsupported feature for parachain validation")
}
fn child_storage(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
panic!("child_storage: unsupported feature for parachain validation")
}
fn kill_child_storage(&mut self, _: &ChildInfo, _: Option<u32>) -> (bool, u32) {
panic!("kill_child_storage: unsupported feature for parachain validation")
}
fn clear_prefix(&mut self, _: &[u8]) {
panic!("clear_prefix: unsupported feature for parachain validation")
}
fn clear_child_prefix(&mut self, _: &ChildInfo, _: &[u8]) {
panic!("clear_child_prefix: unsupported feature for parachain validation")
}
fn place_storage(&mut self, _: Vec<u8>, _: Option<Vec<u8>>) {
panic!("place_storage: unsupported feature for parachain validation")
}
fn place_child_storage(&mut self, _: &ChildInfo, _: Vec<u8>, _: Option<Vec<u8>>) {
panic!("place_child_storage: unsupported feature for parachain validation")
}
fn storage_root(&mut self) -> Vec<u8> {
panic!("storage_root: unsupported feature for parachain validation")
}
fn child_storage_root(&mut self, _: &ChildInfo) -> Vec<u8> {
panic!("child_storage_root: unsupported feature for parachain validation")
}
fn storage_changes_root(&mut self, _: &[u8]) -> Result<Option<Vec<u8>>, ()> {
panic!("storage_changes_root: unsupported feature for parachain validation")
}
fn next_child_storage_key(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
panic!("next_child_storage_key: unsupported feature for parachain validation")
}
fn next_storage_key(&self, _: &[u8]) -> Option<Vec<u8>> {
panic!("next_storage_key: unsupported feature for parachain validation")
}
fn storage_append(&mut self, _key: Vec<u8>, _value: Vec<u8>) {
panic!("storage_append: unsupported feature for parachain validation")
}
fn storage_start_transaction(&mut self) {
panic!("storage_start_transaction: unsupported feature for parachain validation")
}
fn storage_rollback_transaction(&mut self) -> Result<(), ()> {
panic!("storage_rollback_transaction: unsupported feature for parachain validation")
}
fn storage_commit_transaction(&mut self) -> Result<(), ()> {
panic!("storage_commit_transaction: unsupported feature for parachain validation")
}
fn wipe(&mut self) {
panic!("wipe: unsupported feature for parachain validation")
}
fn commit(&mut self) {
panic!("commit: unsupported feature for parachain validation")
}
fn read_write_count(&self) -> (u32, u32, u32, u32) {
panic!("read_write_count: unsupported feature for parachain validation")
}
fn reset_read_write_count(&mut self) {
panic!("reset_read_write_count: unsupported feature for parachain validation")
}
fn get_whitelist(&self) -> Vec<TrackedStorageKey> {
panic!("get_whitelist: unsupported feature for parachain validation")
}
fn set_whitelist(&mut self, _: Vec<TrackedStorageKey>) {
panic!("set_whitelist: unsupported feature for parachain validation")
}
fn set_offchain_storage(&mut self, _: &[u8], _: std::option::Option<&[u8]>) {
panic!("set_offchain_storage: unsupported feature for parachain validation")
}
}
impl sp_externalities::ExtensionStore for ValidationExternalities {
fn extension_by_type_id(&mut self, type_id: TypeId) -> Option<&mut dyn Any> {
self.0.get_mut(type_id)
}
fn register_extension_with_type_id(
&mut self,
type_id: TypeId,
extension: Box<dyn sp_externalities::Extension>,
) -> Result<(), sp_externalities::Error> {
self.0.register_with_type_id(type_id, extension)
}
fn deregister_extension_by_type_id(
&mut self,
type_id: TypeId,
) -> Result<(), sp_externalities::Error> {
if self.0.deregister(type_id) {
Ok(())
} else {
Err(sp_externalities::Error::ExtensionIsNotRegistered(type_id))
}
}
}
/// An implementation of `SpawnNamed` on top of a futures' thread pool.
///
/// This is a light handle meaning it will only clone the handle not create a new thread pool.
#[derive(Clone)]
pub(crate) struct TaskExecutor(futures::executor::ThreadPool);
impl TaskExecutor {
pub(crate) fn new() -> Result<Self, String> {
futures::executor::ThreadPoolBuilder::new()
.pool_size(4)
.name_prefix("pvf-task-executor")
.create()
.map_err(|e| e.to_string())
.map(Self)
}
}
impl sp_core::traits::SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
}
File diff suppressed because it is too large Load Diff
+100
View File
@@ -0,0 +1,100 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs)]
//! A crate that implements PVF validation host.
//!
//! This crate provides a simple API. You first [`start`] the validation host, which gives you the
//! [handle][`ValidationHost`] and the future you need to poll.
//!
//! Then using the handle the client can send two types of requests:
//!
//! (a) PVF execution. This accepts the PVF [params][`polkadot_parachain::primitives::ValidationParams`]
//! and the PVF [code][`Pvf`], prepares (verifies and compiles) the code, and then executes PVF
//! with the params.
//!
//! (b) Heads up. This request allows to signal that the given PVF may be needed soon and that it
//! should be prepared for execution.
//!
//! The preparation results are cached for some time after they either used or was signalled in heads up.
//! All requests that depends on preparation of the same PVF are bundled together and will be executed
//! as soon as the artifact is prepared.
//!
//! # Priority
//!
//! PVF execution requests can specify the [priority][`Priority`] with which the given request should
//! be handled. Different priority levels have different effects. This is discussed below.
//!
//! Preparation started by a heads up signal always starts in with the background priority. If there
//! is already a request for that PVF preparation under way the priority is inherited. If after heads
//! up, a new PVF execution request comes in with a higher priority, then the original task's priority
//! will be adjusted to match the new one if it's larger.
//!
//! Priority can never go down, only up.
//!
//! # Under the hood
//!
//! Under the hood, the validation host is built using a bunch of communicating processes, not
//! dissimilar to actors. Each of such "processes" is a future task that contains an event loop that
//! processes incoming messages, potentially delegating sub-tasks to other "processes".
//!
//! Two of these processes are queues. The first one is for preparation jobs and the second one is for
//! execution. Both of the queues are backed by separate pools of workers of different kind.
//!
//! Preparation workers handle preparation requests by preverifying and instrumenting PVF wasm code,
//! and then passing it into the compiler, to prepare the artifact.
//!
//! Artifact is a final product of preparation. If the preparation succeeded, then the artifact will
//! contain the compiled code usable for quick execution by a worker later on.
//!
//! If the preparation failed, then the worker will still write the artifact with the error message.
//! We save the artifact with the error so that we don't try to prepare the artifacts that are broken
//! repeatedly.
//!
//! The artifact is saved on disk and is also tracked by an in memory table. This in memory table
//! doesn't contain the artifact contents though, only a flag that the given artifact is compiled.
//!
//! The execute workers will be fed by the requests from the execution queue, which is basically a
//! combination of a path to the compiled artifact and the
//! [params][`polkadot_parachain::primitives::ValidationParams`].
//!
//! Each fixed interval of time a pruning task will run. This task will remove all artifacts that
//! weren't used or received a heads up signal for a while.
mod artifacts;
mod error;
mod execute;
mod executor_intf;
mod host;
mod prepare;
mod priority;
mod pvf;
mod worker_common;
#[doc(hidden)]
pub mod testing;
pub use error::{ValidationError, InvalidCandidate};
pub use priority::Priority;
pub use pvf::Pvf;
pub use host::{start, Config, ValidationHost};
pub use execute::worker_entrypoint as execute_worker_entrypoint;
pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
const LOG_TARGET: &str = "parachain::pvf";
+31
View File
@@ -0,0 +1,31 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Preparation part of pipeline
//!
//! The validation host spins up two processes: the queue (by running [`start_queue`]) and the pool
//! (by running [`start_pool`]).
//!
//! The pool will spawn workers in new processes and those should execute pass control to
//! [`worker_entrypoint`].
mod pool;
mod queue;
mod worker;
pub use queue::{ToQueue, FromQueue, start as start_queue};
pub use pool::start as start_pool;
pub use worker::worker_entrypoint;
+336
View File
@@ -0,0 +1,336 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
use super::{
worker::{self, Outcome},
};
use std::{fmt, sync::Arc, task::Poll, time::Duration};
use async_std::path::{Path, PathBuf};
use futures::{
Future, FutureExt, StreamExt, channel::mpsc, future::BoxFuture, stream::FuturesUnordered,
};
use slotmap::HopSlotMap;
use assert_matches::assert_matches;
use always_assert::never;
slotmap::new_key_type! { pub struct Worker; }
/// Messages that the pool handles.
#[derive(Debug, PartialEq, Eq)]
pub enum ToPool {
/// Request a new worker to spawn.
///
/// This request won't fail in case if the worker cannot be created. Instead, we consider
/// the failures transient and we try to spawn a worker after a delay.
///
/// [`FromPool::Spawned`] will be returned as soon as the worker is spawned.
///
/// The client should anticipate a [`FromPool::Rip`] message, in case the spawned worker was
/// stopped for some reason.
Spawn,
/// Kill the given worker. No-op if the given worker is not running.
///
/// [`FromPool::Rip`] won't be sent in this case. However, the client should be prepared to
/// receive [`FromPool::Rip`] nonetheless, since the worker may be have been ripped before
/// this message is processed.
Kill(Worker),
/// If the given worker was started with the background priority, then it will be raised up to
/// normal priority. Otherwise, it's no-op.
BumpPriority(Worker),
/// Request the given worker to start working on the given code.
///
/// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent back.
///
/// This should not be sent again until the concluded message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
},
}
/// A message sent from pool to its client.
#[derive(Debug)]
pub enum FromPool {
/// The given worker was just spawned and is ready to be used.
Spawned(Worker),
/// The given worker either succeeded or failed the given job. Under any circumstances the
/// artifact file has been written. The bool says whether the worker ripped.
Concluded(Worker, bool),
/// The given worker ceased to exist.
Rip(Worker),
}
struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
}
impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}
enum PoolEvent {
Spawn(IdleWorker, WorkerHandle),
StartWork(Worker, Outcome),
}
type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
struct Pool {
program_path: PathBuf,
spawn_timeout: Duration,
to_pool: mpsc::Receiver<ToPool>,
from_pool: mpsc::UnboundedSender<FromPool>,
spawned: HopSlotMap<Worker, WorkerData>,
mux: Mux,
}
/// A fatal error that warrants stopping the event loop of the pool.
struct Fatal;
async fn run(
Pool {
program_path,
spawn_timeout,
to_pool,
mut from_pool,
mut spawned,
mut mux,
}: Pool,
) {
macro_rules! break_if_fatal {
($expr:expr) => {
match $expr {
Err(Fatal) => break,
Ok(v) => v,
}
};
}
let mut to_pool = to_pool.fuse();
loop {
futures::select! {
to_pool = to_pool.next() => {
let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
handle_to_pool(
&program_path,
spawn_timeout,
&mut spawned,
&mut mux,
to_pool,
)
}
ev = mux.select_next_some() => break_if_fatal!(handle_mux(&mut from_pool, &mut spawned, ev)),
}
break_if_fatal!(purge_dead(&mut from_pool, &mut spawned).await);
}
}
async fn purge_dead(
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
) -> Result<(), Fatal> {
let mut to_remove = vec![];
for (worker, data) in spawned.iter_mut() {
if data.idle.is_none() {
// The idle token is missing, meaning this worker is now occupied: skip it. This is
// because the worker process is observed by the work task and should it reach the
// deadline or be terminated it will be handled by the corresponding mux event.
continue;
}
if let Poll::Ready(()) = futures::poll!(&mut data.handle) {
// a resolved future means that the worker has terminated. Weed it out.
to_remove.push(worker);
}
}
for w in to_remove {
let _ = spawned.remove(w);
reply(from_pool, FromPool::Rip(w))?;
}
Ok(())
}
fn handle_to_pool(
program_path: &Path,
spawn_timeout: Duration,
spawned: &mut HopSlotMap<Worker, WorkerData>,
mux: &mut Mux,
to_pool: ToPool,
) {
match to_pool {
ToPool::Spawn => {
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
}
ToPool::StartWork {
worker,
code,
artifact_path,
background_priority,
} => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
mux.push(
start_work_task(worker, idle, code, artifact_path, background_priority)
.boxed(),
);
} else {
// idle token is present after spawn and after a job is concluded;
// the precondition for `StartWork` is it should be sent only if all previous work
// items concluded;
// thus idle token is Some;
// qed.
never!("unexpected abscence of the idle token in prepare pool");
}
} else {
// That's a relatively normal situation since the queue may send `start_work` and
// before receiving it the pool would report that the worker died.
}
}
ToPool::Kill(worker) => {
// It may be absent if it were previously already removed by `purge_dead`.
let _ = spawned.remove(worker);
}
ToPool::BumpPriority(worker) => {
if let Some(data) = spawned.get(worker) {
worker::bump_priority(&data.handle);
}
}
}
}
async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> PoolEvent {
use futures_timer::Delay;
loop {
match worker::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
"failed to spawn a prepare worker: {:?}",
err,
);
// Assume that the failure intermittent and retry after a delay.
Delay::new(Duration::from_secs(3)).await;
}
}
}
}
async fn start_work_task(
worker: Worker,
idle: IdleWorker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
) -> PoolEvent {
let outcome = worker::start_work(idle, code, artifact_path, background_priority).await;
PoolEvent::StartWork(worker, outcome)
}
fn handle_mux(
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
event: PoolEvent,
) -> Result<(), Fatal> {
match event {
PoolEvent::Spawn(idle, handle) => {
let worker = spawned.insert(WorkerData {
idle: Some(idle),
handle,
});
reply(from_pool, FromPool::Spawned(worker))?;
Ok(())
}
PoolEvent::StartWork(worker, outcome) => {
match outcome {
Outcome::Concluded(idle) => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
// relevant.
return Ok(());
}
Some(data) => data,
};
// We just replace the idle worker that was loaned from this option during
// the work starting.
let old = data.idle.replace(idle);
assert_matches!(old, None, "attempt to overwrite an idle worker");
reply(from_pool, FromPool::Concluded(worker, false))?;
Ok(())
}
Outcome::DidntMakeIt => {
if let Some(_data) = spawned.remove(worker) {
reply(from_pool, FromPool::Concluded(worker, true))?;
}
Ok(())
}
}
}
}
}
fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result<(), Fatal> {
from_pool.unbounded_send(m).map_err(|_| Fatal)
}
/// Spins up the pool and returns the future that should be polled to make the pool functional.
pub fn start(
program_path: PathBuf,
spawn_timeout: Duration,
) -> (
mpsc::Sender<ToPool>,
mpsc::UnboundedReceiver<FromPool>,
impl Future<Output = ()>,
) {
let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
let run = run(Pool {
program_path,
spawn_timeout,
to_pool: to_pool_rx,
from_pool: from_pool_tx,
spawned: HopSlotMap::with_capacity_and_key(20),
mux: Mux::new(),
});
(to_pool_tx, from_pool_rx, run)
}
+894
View File
@@ -0,0 +1,894 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A queue that handles requests for PVF preparation.
use super::{
pool::{self, Worker},
};
use crate::{LOG_TARGET, Priority, Pvf, artifacts::ArtifactId};
use futures::{Future, SinkExt, channel::mpsc, stream::StreamExt as _};
use std::collections::{HashMap, VecDeque};
use async_std::path::PathBuf;
use always_assert::{always, never};
/// A request to pool.
#[derive(Debug)]
pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue::Prepared`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
Amend {
priority: Priority,
artifact_id: ArtifactId,
},
}
/// A response from queue.
#[derive(Debug, PartialEq, Eq)]
pub enum FromQueue {
Prepared(ArtifactId),
}
#[derive(Default)]
struct Limits {
/// The maximum number of workers this pool can ever host. This is expected to be a small
/// number, e.g. within a dozen.
hard_capacity: usize,
/// The number of workers we want aim to have. If there is a critical job and we are already
/// at `soft_capacity`, we are allowed to grow up to `hard_capacity`. Thus this should be equal
/// or smaller than `hard_capacity`.
soft_capacity: usize,
}
impl Limits {
/// Returns `true` if the queue is allowed to request one more worker.
fn can_afford_one_more(&self, spawned_num: usize, critical: bool) -> bool {
let cap = if critical {
self.hard_capacity
} else {
self.soft_capacity
};
spawned_num < cap
}
/// Offer the worker back to the pool. The passed worker ID must be considered unusable unless
/// it wasn't taken by the pool, in which case it will be returned as `Some`.
fn should_cull(&mut self, spawned_num: usize) -> bool {
spawned_num > self.soft_capacity
}
}
slotmap::new_key_type! { pub struct Job; }
struct JobData {
/// The priority of this job. Can be bumped.
priority: Priority,
pvf: Pvf,
worker: Option<Worker>,
}
#[derive(Default)]
struct WorkerData {
job: Option<Job>,
}
impl WorkerData {
fn is_idle(&self) -> bool {
self.job.is_none()
}
}
/// A queue structured like this is prone to starving, however, we don't care that much since we expect
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
#[derive(Default)]
struct Unscheduled {
background: VecDeque<Job>,
normal: VecDeque<Job>,
critical: VecDeque<Job>,
}
impl Unscheduled {
fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
match prio {
Priority::Background => &mut self.background,
Priority::Normal => &mut self.normal,
Priority::Critical => &mut self.critical,
}
}
fn add(&mut self, prio: Priority, job: Job) {
self.queue_mut(prio).push_back(job);
}
fn readd(&mut self, prio: Priority, job: Job) {
self.queue_mut(prio).push_front(job);
}
fn is_empty(&self) -> bool {
self.background.is_empty() && self.normal.is_empty() && self.critical.is_empty()
}
fn next(&mut self) -> Option<Job> {
let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
check(Priority::Critical)
.or_else(|| check(Priority::Normal))
.or_else(|| check(Priority::Background))
}
}
struct Queue {
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
to_pool_tx: mpsc::Sender<pool::ToPool>,
from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
cache_path: PathBuf,
limits: Limits,
jobs: slotmap::SlotMap<Job, JobData>,
/// A mapping from artifact id to a job.
artifact_id_to_job: HashMap<ArtifactId, Job>,
/// The registry of all workers.
workers: slotmap::SparseSecondaryMap<Worker, WorkerData>,
/// The number of workers requested to spawn but not yet spawned.
spawn_inflight: usize,
/// The jobs that are not yet scheduled. These are waiting until the next `poll` where they are
/// processed all at once.
unscheduled: Unscheduled,
}
/// A fatal error that warrants stopping the queue.
struct Fatal;
impl Queue {
fn new(
soft_capacity: usize,
hard_capacity: usize,
cache_path: PathBuf,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
to_pool_tx: mpsc::Sender<pool::ToPool>,
from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
) -> Self {
Self {
to_queue_rx,
from_queue_tx,
to_pool_tx,
from_pool_rx,
cache_path,
spawn_inflight: 0,
limits: Limits {
hard_capacity,
soft_capacity,
},
jobs: slotmap::SlotMap::with_key(),
unscheduled: Unscheduled::default(),
artifact_id_to_job: HashMap::new(),
workers: slotmap::SparseSecondaryMap::new(),
}
}
async fn run(mut self) {
macro_rules! break_if_fatal {
($expr:expr) => {
if let Err(Fatal) = $expr {
break;
}
};
}
loop {
// biased to make it behave deterministically for tests.
futures::select_biased! {
to_queue = self.to_queue_rx.select_next_some() =>
break_if_fatal!(handle_to_queue(&mut self, to_queue).await),
from_pool = self.from_pool_rx.select_next_some() =>
break_if_fatal!(handle_from_pool(&mut self, from_pool).await),
}
}
}
}
async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue {
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
}
ToQueue::Amend {
priority,
artifact_id,
} => {
handle_amend(queue, priority, artifact_id).await?;
}
}
Ok(())
}
async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> {
let artifact_id = pvf.as_artifact_id();
if never!(
queue.artifact_id_to_job.contains_key(&artifact_id),
"second Enqueue sent for a known artifact"
) {
// This function is called in response to a `Enqueue` message;
// Precondtion for `Enqueue` is that it is sent only once for a PVF;
// Thus this should always be `false`;
// qed.
tracing::warn!(
target: LOG_TARGET,
"duplicate `enqueue` command received for {:?}",
artifact_id,
);
return Ok(());
}
let job = queue.jobs.insert(JobData {
priority,
pvf,
worker: None,
});
queue.artifact_id_to_job.insert(artifact_id, job);
if let Some(available) = find_idle_worker(queue) {
// This may seem not fair (w.r.t priority) on the first glance, but it should be. This is
// because as soon as a worker finishes with the job it's immediatelly given the next one.
assign(queue, available, job).await?;
} else {
spawn_extra_worker(queue, priority.is_critical()).await?;
queue.unscheduled.add(priority, job);
}
Ok(())
}
fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
queue
.workers
.iter()
.filter(|(_, data)| data.is_idle())
.map(|(k, _)| k)
.next()
}
async fn handle_amend(
queue: &mut Queue,
priority: Priority,
artifact_id: ArtifactId,
) -> Result<(), Fatal> {
if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) {
let mut job_data: &mut JobData = &mut queue.jobs[job];
if job_data.priority < priority {
// The new priority is higher. We should do two things:
// - if the worker was already spawned with the background prio and the new one is not
// (it's already the case, if we are in this branch but we still do the check for
// clarity), then we should tell the pool to bump the priority for the worker.
//
// - save the new priority in the job.
if let Some(worker) = job_data.worker {
if job_data.priority.is_background() && !priority.is_background() {
send_pool(&mut queue.to_pool_tx, pool::ToPool::BumpPriority(worker)).await?;
}
}
job_data.priority = priority;
}
}
Ok(())
}
async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
use pool::FromPool::*;
match from_pool {
Spawned(worker) => handle_worker_spawned(queue, worker).await?,
Concluded(worker, rip) => handle_worker_concluded(queue, worker, rip).await?,
Rip(worker) => handle_worker_rip(queue, worker).await?,
}
Ok(())
}
async fn handle_worker_spawned(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
queue.workers.insert(worker, WorkerData::default());
queue.spawn_inflight -= 1;
if let Some(job) = queue.unscheduled.next() {
assign(queue, worker, job).await?;
}
Ok(())
}
async fn handle_worker_concluded(
queue: &mut Queue,
worker: Worker,
rip: bool,
) -> Result<(), Fatal> {
macro_rules! never_none {
($expr:expr) => {
match $expr {
Some(v) => v,
None => {
// Precondition of calling this is that the $expr is never none;
// Assume the conditions holds, then this never is not hit;
// qed.
never!("never_none, {}", stringify!($expr));
return Ok(());
}
}
};
}
// Find out on which artifact was the worker working.
// workers are registered upon spawn and removed in one of the following cases:
// 1. received rip signal
// 2. received concluded signal with rip=true;
// concluded signal only comes from a spawned worker and only once;
// rip signal is not sent after conclusion with rip=true;
// the worker should be registered;
// this can't be None;
// qed.
let worker_data = never_none!(queue.workers.get_mut(worker));
// worker_data.job is set only by `assign` and removed only here for a worker;
// concluded signal only comes for a worker that was previously assigned and only once;
// the worker should have the job;
// this can't be None;
// qed.
let job = never_none!(worker_data.job.take());
// job_data is inserted upon enqueue and removed only here;
// as was established above, this worker was previously `assign`ed to the job;
// that implies that the job was enqueued;
// conclude signal only comes once;
// we are just to remove the job for the first and the only time;
// this can't be None;
// qed.
let job_data = never_none!(queue.jobs.remove(job));
let artifact_id = job_data.pvf.as_artifact_id();
queue.artifact_id_to_job.remove(&artifact_id);
reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?;
// Figure out what to do with the worker.
if rip {
let worker_data = queue.workers.remove(worker);
// worker should exist, it's asserted above;
// qed.
always!(worker_data.is_some());
if !queue.unscheduled.is_empty() {
// That is unconditionally not critical just to not accidentally fill up
// the pool up to the hard cap.
spawn_extra_worker(queue, false).await?;
}
} else {
if queue
.limits
.should_cull(queue.workers.len() + queue.spawn_inflight)
{
// We no longer need services of this worker. Kill it.
queue.workers.remove(worker);
send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?;
} else {
// see if there are more work available and schedule it.
if let Some(job) = queue.unscheduled.next() {
assign(queue, worker, job).await?;
}
}
}
Ok(())
}
async fn handle_worker_rip(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
let worker_data = queue.workers.remove(worker);
if let Some(WorkerData { job: Some(job), .. }) = worker_data {
// This is an edge case where the worker ripped after we sent assignment but before it
// was received by the pool.
let priority = queue
.jobs
.get(job)
.map(|data| data.priority)
.unwrap_or_else(|| {
// job is inserted upon enqueue and removed on concluded signal;
// this is enclosed in the if statement that narrows the situation to before
// conclusion;
// that means that the job still exists and is known;
// this path cannot be hit;
// qed.
never!("the job of the ripped worker must be known but it is not");
Priority::Normal
});
queue.unscheduled.readd(priority, job);
}
// If there are still jobs left, spawn another worker to replace the ripped one (but only if it
// was indeed removed). That is unconditionally not critical just to not accidentally fill up
// the pool up to the hard cap.
if worker_data.is_some() && !queue.unscheduled.is_empty() {
spawn_extra_worker(queue, false).await?;
}
Ok(())
}
/// Spawns an extra worker if possible.
async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fatal> {
if queue
.limits
.can_afford_one_more(queue.workers.len() + queue.spawn_inflight, critical)
{
queue.spawn_inflight += 1;
send_pool(&mut queue.to_pool_tx, pool::ToPool::Spawn).await?;
}
Ok(())
}
/// Attaches the work to the given worker telling the poll about the job.
async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
let job_data = &mut queue.jobs[job];
let artifact_id = job_data.pvf.as_artifact_id();
let artifact_path = artifact_id.path(&queue.cache_path);
job_data.worker = Some(worker);
queue.workers[worker].job = Some(job);
send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
background_priority: job_data.priority.is_background(),
},
)
.await?;
Ok(())
}
fn reply(from_queue_tx: &mut mpsc::UnboundedSender<FromQueue>, m: FromQueue) -> Result<(), Fatal> {
from_queue_tx.unbounded_send(m).map_err(|_| {
// The host has hung up and thus it's fatal and we should shutdown ourselves.
Fatal
})
}
async fn send_pool(
to_pool_tx: &mut mpsc::Sender<pool::ToPool>,
m: pool::ToPool,
) -> Result<(), Fatal> {
to_pool_tx.send(m).await.map_err(|_| {
// The pool has hung up and thus we are no longer are able to fulfill our duties. Shutdown.
Fatal
})
}
/// Spins up the queue and returns the future that should be polled to make the queue functional.
pub fn start(
soft_capacity: usize,
hard_capacity: usize,
cache_path: PathBuf,
to_pool_tx: mpsc::Sender<pool::ToPool>,
from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
) -> (
mpsc::Sender<ToQueue>,
mpsc::UnboundedReceiver<FromQueue>,
impl Future<Output = ()>,
) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(150);
let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
let run = Queue::new(
soft_capacity,
hard_capacity,
cache_path,
to_queue_rx,
from_queue_tx,
to_pool_tx,
from_pool_rx,
)
.run();
(to_queue_tx, from_queue_rx, run)
}
#[cfg(test)]
mod tests {
use slotmap::SlotMap;
use assert_matches::assert_matches;
use futures::{FutureExt, future::BoxFuture};
use std::task::Poll;
use super::*;
/// Creates a new pvf which artifact id can be uniquely identified by the given number.
fn pvf(descriminator: u32) -> Pvf {
Pvf::from_discriminator(descriminator)
}
async fn run_until<R>(
task: &mut (impl Future<Output = ()> + Unpin),
mut fut: (impl Future<Output = R> + Unpin),
) -> R {
let start = std::time::Instant::now();
let fut = &mut fut;
loop {
if start.elapsed() > std::time::Duration::from_secs(1) {
// We expect that this will take only a couple of iterations and thus to take way
// less than a second.
panic!("timeout");
}
if let Poll::Ready(r) = futures::poll!(&mut *fut) {
break r;
}
if futures::poll!(&mut *task).is_ready() {
panic!()
}
}
}
struct Test {
_tempdir: tempfile::TempDir,
run: BoxFuture<'static, ()>,
workers: SlotMap<Worker, ()>,
from_pool_tx: mpsc::UnboundedSender<pool::FromPool>,
to_pool_rx: mpsc::Receiver<pool::ToPool>,
to_queue_tx: mpsc::Sender<ToQueue>,
from_queue_rx: mpsc::UnboundedReceiver<FromQueue>,
}
impl Test {
fn new(soft_capacity: usize, hard_capacity: usize) -> Self {
let tempdir = tempfile::tempdir().unwrap();
let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
let workers: SlotMap<Worker, ()> = SlotMap::with_key();
let (to_queue_tx, from_queue_rx, run) = start(
soft_capacity,
hard_capacity,
tempdir.path().to_owned().into(),
to_pool_tx,
from_pool_rx,
);
Self {
_tempdir: tempdir,
run: run.boxed(),
workers,
from_pool_tx,
to_pool_rx,
to_queue_tx,
from_queue_rx,
}
}
fn send_queue(&mut self, to_queue: ToQueue) {
self.to_queue_tx
.send(to_queue)
.now_or_never()
.unwrap()
.unwrap();
}
async fn poll_and_recv_from_queue(&mut self) -> FromQueue {
let from_queue_rx = &mut self.from_queue_rx;
run_until(
&mut self.run,
async { from_queue_rx.next().await.unwrap() }.boxed(),
)
.await
}
fn send_from_pool(&mut self, from_pool: pool::FromPool) {
self.from_pool_tx
.send(from_pool)
.now_or_never()
.unwrap()
.unwrap();
}
async fn poll_and_recv_to_pool(&mut self) -> pool::ToPool {
let to_pool_rx = &mut self.to_pool_rx;
run_until(
&mut self.run,
async { to_pool_rx.next().await.unwrap() }.boxed(),
)
.await
}
async fn poll_ensure_to_pool_is_empty(&mut self) {
use futures_timer::Delay;
use std::time::Duration;
let to_pool_rx = &mut self.to_pool_rx;
run_until(
&mut self.run,
async {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
_ = to_pool_rx.next().fuse() => {
panic!("to pool supposed to be empty")
}
}
}
.boxed(),
)
.await
}
}
#[async_std::test]
async fn properly_concludes() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Background,
pvf: pvf(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
test.send_from_pool(pool::FromPool::Concluded(w, false));
assert_eq!(
test.poll_and_recv_from_queue().await,
FromQueue::Prepared(pvf(1).as_artifact_id())
);
}
#[async_std::test]
async fn dont_spawn_over_soft_limit_unless_critical() {
let mut test = Test::new(2, 3);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
});
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(2),
});
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(3),
});
// Receive only two spawns.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
let w2 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
test.send_from_pool(pool::FromPool::Spawned(w2));
// Get two start works.
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
test.send_from_pool(pool::FromPool::Concluded(w1, false));
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
// Enqueue a critical job.
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: pvf(4),
});
// 2 out of 2 are working, but there is a critical job incoming. That means that spawning
// another worker is warranted.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
}
#[async_std::test]
async fn cull_unwanted() {
let mut test = Test::new(1, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
// Enqueue a critical job, which warrants spawning over the soft limit.
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: pvf(2),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
// However, before the new worker had a chance to spawn, the first worker finishes with its
// job. The old worker will be killed while the new worker will be let live, even though
// it's not instantiated.
//
// That's a bit silly in this context, but in production there will be an entire pool up
// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
// we just check that edge case of an edge case works.
test.send_from_pool(pool::FromPool::Concluded(w1, false));
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}
#[async_std::test]
async fn bump_prio_on_urgency_change() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Background,
pvf: pvf(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
test.send_queue(ToQueue::Amend {
priority: Priority::Normal,
artifact_id: pvf(1).as_artifact_id(),
});
assert_eq!(
test.poll_and_recv_to_pool().await,
pool::ToPool::BumpPriority(w)
);
}
#[async_std::test]
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
});
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(2),
});
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(3),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
let w2 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
test.send_from_pool(pool::FromPool::Spawned(w2));
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
// Conclude worker 1 and rip it.
test.send_from_pool(pool::FromPool::Concluded(w1, true));
// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(
test.poll_and_recv_from_queue().await,
FromQueue::Prepared(pvf(1).as_artifact_id())
);
}
#[async_std::test]
async fn doesnt_resurrect_ripped_worker_if_no_work() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
test.send_from_pool(pool::FromPool::Concluded(w1, true));
test.poll_ensure_to_pool_is_empty().await;
}
#[async_std::test]
async fn rip_for_start_work() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
// Now, to the interesting part. After the queue normally issues the start_work command to
// the pool, before receiving the command the queue may report that the worker ripped.
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
test.send_from_pool(pool::FromPool::Rip(w1));
// In this case, the pool should spawn a new worker and request it to work on the item.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w2 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w2));
assert_matches!(
test.poll_and_recv_to_pool().await,
pool::ToPool::StartWork { .. }
);
}
}
@@ -0,0 +1,213 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
LOG_TARGET,
artifacts::Artifact,
worker_common::{
IdleWorker, SpawnErr, WorkerHandle, bytes_to_path, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile, worker_event_loop,
},
};
use async_std::{
io,
os::unix::net::UnixStream,
path::{PathBuf, Path},
};
use futures::FutureExt as _;
use futures_timer::Delay;
use std::{sync::Arc, time::Duration};
const NICENESS_BACKGROUND: i32 = 10;
const NICENESS_FOREGROUND: i32 = 0;
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(10);
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
/// The program should be able to handle `<program-path> prepare-worker <socket-path>` invocation.
pub async fn spawn(
program_path: &Path,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
spawn_with_program_path(
"prepare",
program_path,
&["prepare-worker"],
spawn_timeout,
)
.await
}
pub enum Outcome {
/// The worker has finished the work assigned to it.
Concluded(IdleWorker),
/// The execution was interrupted abruptly and the worker is not available anymore. For example,
/// this could've happen because the worker hadn't finished the work until the given deadline.
///
/// Note that in this case the artifact file is written (unless there was an error writing the
/// the artifact).
///
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
DidntMakeIt,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
pub async fn start_work(
worker: IdleWorker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
%background_priority,
"starting prepare for {}",
artifact_path.display(),
);
if background_priority {
renice(pid, NICENESS_BACKGROUND);
}
if let Err(err) = send_request(&mut stream, code).await {
tracing::warn!("failed to send a prepare request to pid={}: {:?}", pid, err);
return Outcome::DidntMakeIt;
}
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines.
//
// In that case we should handle these gracefully by writing the artifact file by ourselves.
// We may potentially overwrite the artifact in rare cases where the worker didn't make
// it to report back the result.
enum Selected {
Done,
IoErr,
Deadline,
}
let selected = futures::select! {
artifact_path_bytes = framed_recv(&mut stream).fuse() => {
match artifact_path_bytes {
Ok(bytes) => {
if let Some(tmp_path) = bytes_to_path(&bytes) {
async_std::fs::rename(tmp_path, &artifact_path)
.await
.map(|_| Selected::Done)
.unwrap_or(Selected::IoErr)
} else {
Selected::IoErr
}
},
Err(_) => Selected::IoErr,
}
},
_ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline,
};
match selected {
Selected::Done => {
renice(pid, NICENESS_FOREGROUND);
Outcome::Concluded(IdleWorker { stream, pid })
}
Selected::IoErr | Selected::Deadline => {
let bytes = Artifact::DidntMakeIt.serialize();
// best effort: there is nothing we can do here if the write fails.
let _ = async_std::fs::write(&artifact_path, &bytes).await;
Outcome::DidntMakeIt
}
}
}
async fn send_request(stream: &mut UnixStream, code: Arc<Vec<u8>>) -> io::Result<()> {
framed_send(stream, &*code).await
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<Vec<u8>> {
framed_recv(stream).await
}
pub fn bump_priority(handle: &WorkerHandle) {
let pid = handle.id();
renice(pid, NICENESS_FOREGROUND);
}
fn renice(pid: u32, niceness: i32) {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"changing niceness to {}",
niceness,
);
// Consider upstreaming this to the `nix` crate.
unsafe {
if -1 == libc::setpriority(libc::PRIO_PROCESS, pid, niceness) {
let err = std::io::Error::last_os_error();
tracing::warn!(target: LOG_TARGET, "failed to set the priority: {:?}", err,);
}
}
}
/// The entrypoint that the spawned prepare worker should start with. The socket_path specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |mut stream| async move {
loop {
let code = recv_request(&mut stream).await?;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: preparing artifact",
);
let artifact_bytes = prepare_artifact(&code).serialize();
// Write the serialized artifact into into a temp file.
let dest = tmpfile("prepare-artifact-").await?;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: writing artifact to {}",
dest.display(),
);
async_std::fs::write(&dest, &artifact_bytes).await?;
// Communicate the results back to the host.
framed_send(&mut stream, &path_to_bytes(&dest)).await?;
}
});
}
fn prepare_artifact(code: &[u8]) -> Artifact {
let blob = match crate::executor_intf::prevalidate(code) {
Err(err) => {
return Artifact::PrevalidationErr(format!("{:?}", err));
}
Ok(b) => b,
};
match crate::executor_intf::prepare(blob) {
Ok(compiled_artifact) => Artifact::Compiled { compiled_artifact },
Err(err) => Artifact::PreparationErr(format!("{:?}", err)),
}
}
+46
View File
@@ -0,0 +1,46 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
/// A priority assigned to execution of a PVF.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
/// Jobs in this priority will be executed in the background, meaning that they will be only
/// given spare CPU time.
///
/// This is mainly for cache warmings.
Background,
/// Normal priority for things that do not require immediate response, but still need to be
/// done pretty quick.
///
/// Approvals and disputes fall into this category.
Normal,
/// This priority is used for requests that are required to be processed as soon as possible.
///
/// For example, backing is on critical path and require execution as soon as possible.
Critical,
}
impl Priority {
/// Returns `true` if `self` is `Crticial`
pub fn is_critical(self) -> bool {
self == Priority::Critical
}
/// Returns `true` if `self` is `Background`
pub fn is_background(self) -> bool {
self == Priority::Background
}
}
+56
View File
@@ -0,0 +1,56 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::artifacts::ArtifactId;
use polkadot_core_primitives::Hash;
use sp_core::blake2_256;
use std::{fmt, sync::Arc};
/// A struct that carries code of a parachain validation function and it's hash.
///
/// Should be cheap to clone.
#[derive(Clone)]
pub struct Pvf {
pub(crate) code: Arc<Vec<u8>>,
pub(crate) code_hash: Hash,
}
impl fmt::Debug for Pvf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Pvf {{ code, code_hash: {:?} }}", self.code_hash)
}
}
impl Pvf {
/// Returns an instance of the PVF out of the given PVF code.
pub fn from_code(code: Vec<u8>) -> Self {
let code = Arc::new(code);
let code_hash = blake2_256(&code).into();
Self { code, code_hash }
}
/// Creates a new pvf which artifact id can be uniquely identified by the given number.
#[cfg(test)]
pub(crate) fn from_discriminator(num: u32) -> Self {
let descriminator_buf = num.to_le_bytes().to_vec();
Pvf::from_code(descriminator_buf)
}
/// Returns the artifact ID that corresponds to this PVF.
pub(crate) fn as_artifact_id(&self) -> ArtifactId {
ArtifactId::new(self.code_hash)
}
}
+70
View File
@@ -0,0 +1,70 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Various things for testing other crates.
//!
//! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final
//! artifact even for production builds.
pub mod worker_common {
pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
}
/// A function that emulates the stitches together behaviors of the preparation and the execution
/// worker in a single synchronous function.
pub fn validate_candidate(
code: &[u8],
params: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
use crate::executor_intf::{prevalidate, prepare, execute, TaskExecutor};
let blob = prevalidate(code)?;
let artifact = prepare(blob)?;
let executor = TaskExecutor::new()?;
let result = execute(&artifact, params, executor)?;
Ok(result)
}
/// Use this macro to declare a `fn main() {}` that will check the arguments and dispatch them to
/// the appropriate worker, making the executable that can be used for spawning workers.
#[macro_export]
macro_rules! decl_puppet_worker_main {
() => {
fn main() {
let args = std::env::args().collect::<Vec<_>>();
if args.len() < 2 {
panic!("wrong number of arguments");
}
let subcommand = &args[1];
match subcommand.as_ref() {
"sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
}
"prepare-worker" => {
let socket_path = &args[2];
$crate::prepare_worker_entrypoint(socket_path);
}
"execute-worker" => {
let socket_path = &args[2];
$crate::execute_worker_entrypoint(socket_path);
}
other => panic!("unknown subcommand: {}", other),
}
}
};
}
+294
View File
@@ -0,0 +1,294 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Common logic for implementation of worker processes.
use crate::LOG_TARGET;
use async_std::{
io,
os::unix::net::{UnixListener, UnixStream},
path::{PathBuf, Path},
};
use futures::{
AsyncRead, AsyncWrite, AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _, never::Never,
};
use futures_timer::Delay;
use rand::Rng;
use std::{
fmt, mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use pin_project::pin_project;
/// This is publicly exposed only for integration tests.
#[doc(hidden)]
pub async fn spawn_with_program_path(
debug_id: &'static str,
program_path: impl Into<PathBuf>,
extra_args: &'static [&'static str],
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let program_path = program_path.into();
with_transient_socket_path(debug_id, |socket_path| {
let socket_path = socket_path.to_owned();
async move {
let listener = UnixListener::bind(&socket_path)
.await
.map_err(|_| SpawnErr::Bind)?;
let handle = WorkerHandle::spawn(program_path, extra_args, socket_path)
.map_err(|_| SpawnErr::ProcessSpawn)?;
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|_| SpawnErr::Accept)?;
Ok((IdleWorker { stream, pid: handle.id() }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
Err(SpawnErr::AcceptTimeout)
}
}
}
})
.await
}
async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
where
F: FnOnce(&Path) -> Fut,
Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
{
let socket_path = tmpfile(&format!("pvf-host-{}", debug_id))
.await
.map_err(|_| SpawnErr::TmpFile)?;
let result = f(&socket_path).await;
// Best effort to remove the socket file. Under normal circumstances the socket will be removed
// by the worker. We make sure that it is removed here, just in case a failed rendezvous.
let _ = async_std::fs::remove_file(socket_path).await;
result
}
/// Returns a path under the location for temporary files. The file name will start with the given
/// prefix.
///
/// There is only a certain number of retries. If exceeded this function will give up and return an
/// error.
pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
fn tmppath(prefix: &str) -> PathBuf {
use rand::distributions::Alphanumeric;
const DESCRIMINATOR_LEN: usize = 10;
let mut buf = Vec::with_capacity(prefix.len() + DESCRIMINATOR_LEN);
buf.extend(prefix.as_bytes());
buf.extend(
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(DESCRIMINATOR_LEN),
);
let s = std::str::from_utf8(&buf)
.expect("the string is collected from a valid utf-8 sequence; qed");
let mut temp_dir = PathBuf::from(std::env::temp_dir());
temp_dir.push(s);
temp_dir
}
const NUM_RETRIES: usize = 50;
for _ in 0..NUM_RETRIES {
let candidate_path = tmppath(prefix);
if !candidate_path.exists().await {
return Ok(candidate_path)
}
}
Err(
io::Error::new(io::ErrorKind::Other, "failed to create a temporary file")
)
}
pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F)
where
F: FnMut(UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
let err = async_std::task::block_on::<_, io::Result<Never>>(async move {
let stream = UnixStream::connect(socket_path).await?;
let _ = async_std::fs::remove_file(socket_path).await;
event_loop(stream).await
})
.unwrap_err(); // it's never `Ok` because it's `Ok(Never)`
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"pvf worker ({}): {:?}",
debug_id,
err,
);
}
/// A struct that represents an idle worker.
///
/// This struct is supposed to be used as a token that is passed by move into a subroutine that
/// initiates a job. If the worker dies on the duty, then the token is not returned back.
#[derive(Debug)]
pub struct IdleWorker {
/// The stream to which the child process is connected.
pub stream: UnixStream,
/// The identifier of this process. Used to reset the niceness.
pub pid: u32,
}
/// An error happened during spawning a worker process.
#[derive(Clone, Debug)]
pub enum SpawnErr {
/// Cannot obtain a temporary file location.
TmpFile,
/// Cannot bind the socket to the given path.
Bind,
/// An error happened during accepting a connection to the socket.
Accept,
/// An error happened during spawning the process.
ProcessSpawn,
/// The deadline alloted for the worker spawning and connecting to the socket has elapsed.
AcceptTimeout,
}
/// This is a representation of a potentially running worker. Drop it and the process will be killed.
///
/// A worker's handle is also a future that resolves when it's detected that the worker's process
/// has been terminated. Since the worker is running in another process it is obviously not necessarily
/// to poll this future to make the worker run, it's only for termination detection.
///
/// This future relies on the fact that a child process's stdout fd is closed upon it's termination.
#[pin_project]
pub struct WorkerHandle {
child: async_process::Child,
#[pin]
stdout: async_process::ChildStdout,
drop_box: Box<[u8]>,
}
impl WorkerHandle {
fn spawn(
program: impl AsRef<Path>,
extra_args: &[&str],
socket_path: impl AsRef<Path>,
) -> io::Result<Self> {
let mut child = async_process::Command::new(program.as_ref())
.args(extra_args)
.arg(socket_path.as_ref().as_os_str())
.stdout(async_process::Stdio::piped())
.kill_on_drop(true)
.spawn()?;
let stdout = child
.stdout
.take()
.expect("the process spawned with piped stdout should have the stdout handle");
Ok(WorkerHandle {
child,
stdout,
// We don't expect the bytes to be ever read. But in case we do, we should not use a buffer
// of a small size, because otherwise if the child process does return any data we will end up
// issuing a syscall for each byte. We also prefer not to do allocate that on the stack, since
// each poll the buffer will be allocated and initialized (and that's due poll_read takes &mut [u8]
// and there are no guarantees that a `poll_read` won't ever read from there even though that's
// unlikely).
//
// OTOH, we also don't want to be super smart here and we could just afford to allocate a buffer
// for that here.
drop_box: vec![0; 8192].into_boxed_slice(),
})
}
/// Returns the process id of this worker.
pub fn id(&self) -> u32 {
self.child.id()
}
}
impl futures::Future for WorkerHandle {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut *me.drop_box)) {
Ok(0) => {
// 0 means EOF means the child was terminated. Resolve.
Poll::Ready(())
}
Ok(_bytes_read) => {
// weird, we've read something. Pretend that never happened and reschedule ourselves.
cx.waker().wake_by_ref();
Poll::Pending
}
Err(_) => {
// The implementation is guaranteed to not to return WouldBlock and Interrupted. This
// leaves us with a legit errors which we suppose were due to termination.
Poll::Ready(())
}
}
}
}
impl fmt::Debug for WorkerHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerHandle(pid={})", self.id())
}
}
/// Convert the given path into a byte buffer.
pub fn path_to_bytes(path: &Path) -> &[u8] {
// Ideally, we take the OsStr of the path, send that and reconstruct this on the other side.
// However, libstd doesn't provide us with such an option. There are crates out there that
// allow for extraction of a path, but TBH it doesn't seem to be a real issue.
//
// However, should be there reports we can incorporate such a crate here.
path.to_str().expect("non-UTF-8 path").as_bytes()
}
/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a
/// a proper utf-8 string.
pub fn bytes_to_path(bytes: &[u8]) -> Option<PathBuf> {
std::str::from_utf8(bytes).ok().map(PathBuf::from)
}
pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
let len_buf = buf.len().to_le_bytes();
w.write_all(&len_buf).await?;
w.write_all(buf).await?;
Ok(())
}
pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>> {
let mut len_buf = [0u8; mem::size_of::<usize>()];
r.read_exact(&mut len_buf).await?;
let len = usize::from_le_bytes(len_buf);
let mut buf = vec![0; len];
r.read_exact(&mut buf).await?;
Ok(buf)
}