mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 05:11:09 +00:00
98.6% OF DEVELOPERS CANNOT REVIEW THIS PR! [read more...] (#7337)
* [WIP] PVF: Split out worker binaries * Address compilation problems and re-design a bit * Reorganize once more, fix tests * Reformat with new nightly to make `cargo fmt` test happy * Address `clippy` warnings * Add temporary trace to debug zombienet tests * Fix zombienet node upgrade test * Fix malus and its CI * Fix building worker binaries with malus * More fixes for malus * Remove unneeded cli subcommands * Support placing auxiliary binaries to `/usr/libexec` * Fix spelling * Spelling Co-authored-by: Marcin S. <marcin@realemail.net> * Implement review comments (mostly nits) * Fix worker node version flag * Rework getting the worker paths * Address a couple of review comments * Minor restructuring * Fix CI error * Add tests for worker binaries detection * Improve tests; try to fix CI * Move workers module into separate file * Try to fix failing test and workers not printing latest version - Tests were not finding the worker binaries - Workers were not being rebuilt when the version changed - Made some errors easier to read * Make a bunch of fixes * Rebuild nodes on version change * Fix more issues * Fix tests * Pass node version from node into dependencies to avoid recompiles - [X] get version in CLI - [X] pass it in to service - [X] pass version along to PVF - [X] remove rerun from service - [X] add rerun to CLI - [X] don’t rerun pvf/worker’s (these should be built by nodes which have rerun enabled) * Some more improvements for smoother tests - [X] Fix tests - [X] Make puppet workers pass None for version and remove rerun - [X] Make test collators self-contained * Add back rerun to PVF workers * Move worker binaries into files in cli crate As a final optimization I've separated out each worker binary from its own crate into the CLI crate. Before, the worker bin shared a crate with the worker lib, so when the binaries got recompiled so did the libs and everything transitively depending on the libs. This commit fixes this regression that was causing recompiles after every commit. * Fix bug (was passing worker version for node version) * Move workers out of cli into root src/bin/ dir - [X] Pass in node version from top-level (polkadot) - [X] Add build.rs with rerun-git-head to root dir * Add some sanity checks for workers to dockerfiles * Update malus + [X] Make it self-contained + [X] Undo multiple binary changes * Try to fix clippy errors * Address `cargo run` issue - [X] Add default-run for polkadot - [X] Add note about installation to error * Update readme (installation instructions) * Allow disabling external workers for local/testing setups + [X] cli flag to enable single-binary mode + [X] Add message to error * Revert unnecessary Cargo.lock changes * Remove unnecessary build scripts from collators * Add back missing malus commands (should fix failing ZN job) * Some minor fixes * Update Cargo.lock * Fix some build errors * Undo self-contained binaries; cli flag to disable version check + [X] Remove --dont-run-external-workers + [X] Add --disable-worker-version-check + [X] Remove PVF subcommands + [X] Redo malus changes * Try to fix failing job and add some docs for local tests --------- Co-authored-by: Dmitry Sinyavin <dmitry.sinyavin@parity.io> Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Co-authored-by: parity-processbot <>
This commit is contained in:
@@ -19,6 +19,7 @@ polkadot-node-primitives = { path = "../../primitives" }
|
||||
polkadot-node-subsystem = { path = "../../subsystem" }
|
||||
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
||||
polkadot-node-metrics = { path = "../../metrics" }
|
||||
polkadot-overseer = { path = "../../overseer" }
|
||||
|
||||
[target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies]
|
||||
polkadot-node-core-pvf = { path = "../pvf" }
|
||||
|
||||
@@ -93,9 +93,12 @@ const DEFAULT_APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12);
|
||||
pub struct Config {
|
||||
/// The path where candidate validation can store compiled artifacts for PVFs.
|
||||
pub artifacts_cache_path: PathBuf,
|
||||
/// The path to the executable which can be used for spawning PVF compilation & validation
|
||||
/// workers.
|
||||
pub program_path: PathBuf,
|
||||
/// The version of the node. `None` can be passed to skip the version check (only for tests).
|
||||
pub node_version: Option<String>,
|
||||
/// Path to the preparation worker binary
|
||||
pub prep_worker_path: PathBuf,
|
||||
/// Path to the execution worker binary
|
||||
pub exec_worker_path: PathBuf,
|
||||
}
|
||||
|
||||
/// The candidate validation subsystem.
|
||||
@@ -104,7 +107,7 @@ pub struct CandidateValidationSubsystem {
|
||||
pub metrics: Metrics,
|
||||
#[allow(missing_docs)]
|
||||
pub pvf_metrics: polkadot_node_core_pvf::Metrics,
|
||||
config: Config,
|
||||
config: Option<Config>,
|
||||
}
|
||||
|
||||
impl CandidateValidationSubsystem {
|
||||
@@ -113,7 +116,7 @@ impl CandidateValidationSubsystem {
|
||||
///
|
||||
/// Check out [`IsolationStrategy`] to get more details.
|
||||
pub fn with_config(
|
||||
config: Config,
|
||||
config: Option<Config>,
|
||||
metrics: Metrics,
|
||||
pvf_metrics: polkadot_node_core_pvf::Metrics,
|
||||
) -> Self {
|
||||
@@ -124,16 +127,14 @@ impl CandidateValidationSubsystem {
|
||||
#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
|
||||
impl<Context> CandidateValidationSubsystem {
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = run(
|
||||
ctx,
|
||||
self.metrics,
|
||||
self.pvf_metrics,
|
||||
self.config.artifacts_cache_path,
|
||||
self.config.program_path,
|
||||
)
|
||||
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
|
||||
.boxed();
|
||||
SpawnedSubsystem { name: "candidate-validation-subsystem", future }
|
||||
if let Some(config) = self.config {
|
||||
let future = run(ctx, self.metrics, self.pvf_metrics, config)
|
||||
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
|
||||
.boxed();
|
||||
SpawnedSubsystem { name: "candidate-validation-subsystem", future }
|
||||
} else {
|
||||
polkadot_overseer::DummySubsystem.start(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,11 +143,15 @@ async fn run<Context>(
|
||||
mut ctx: Context,
|
||||
metrics: Metrics,
|
||||
pvf_metrics: polkadot_node_core_pvf::Metrics,
|
||||
cache_path: PathBuf,
|
||||
program_path: PathBuf,
|
||||
Config { artifacts_cache_path, node_version, prep_worker_path, exec_worker_path }: Config,
|
||||
) -> SubsystemResult<()> {
|
||||
let (validation_host, task) = polkadot_node_core_pvf::start(
|
||||
polkadot_node_core_pvf::Config::new(cache_path, program_path),
|
||||
polkadot_node_core_pvf::Config::new(
|
||||
artifacts_cache_path,
|
||||
node_version,
|
||||
prep_worker_path,
|
||||
exec_worker_path,
|
||||
),
|
||||
pvf_metrics,
|
||||
);
|
||||
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
// Copyright (C) Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
fn main() {
|
||||
substrate_build_script_utils::generate_cargo_keys();
|
||||
}
|
||||
@@ -31,6 +31,3 @@ landlock = "0.2.0"
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.4.0"
|
||||
tempfile = "3.3.0"
|
||||
|
||||
[build-dependencies]
|
||||
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
// Copyright (C) Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
fn main() {
|
||||
substrate_build_script_utils::generate_cargo_keys();
|
||||
}
|
||||
@@ -25,6 +25,10 @@ pub mod worker;
|
||||
|
||||
pub use cpu_time::ProcessTime;
|
||||
|
||||
// Used by `decl_worker_main!`.
|
||||
#[doc(hidden)]
|
||||
pub use sp_tracing;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::pvf-common";
|
||||
|
||||
use std::mem;
|
||||
|
||||
@@ -33,34 +33,54 @@ use tokio::{io, net::UnixStream, runtime::Runtime};
|
||||
/// spawning the desired worker.
|
||||
#[macro_export]
|
||||
macro_rules! decl_worker_main {
|
||||
($expected_command:expr, $entrypoint:expr) => {
|
||||
($expected_command:expr, $entrypoint:expr, $worker_version:expr) => {
|
||||
fn print_help(expected_command: &str) {
|
||||
println!("{} {}", expected_command, $worker_version);
|
||||
println!();
|
||||
println!("PVF worker that is called by polkadot.");
|
||||
}
|
||||
|
||||
fn main() {
|
||||
::sp_tracing::try_init_simple();
|
||||
$crate::sp_tracing::try_init_simple();
|
||||
|
||||
let args = std::env::args().collect::<Vec<_>>();
|
||||
if args.len() < 3 {
|
||||
panic!("wrong number of arguments");
|
||||
if args.len() == 1 {
|
||||
print_help($expected_command);
|
||||
return
|
||||
}
|
||||
|
||||
let mut version = None;
|
||||
match args[1].as_ref() {
|
||||
"--help" | "-h" => {
|
||||
print_help($expected_command);
|
||||
return
|
||||
},
|
||||
"--version" | "-v" => {
|
||||
println!("{}", $worker_version);
|
||||
return
|
||||
},
|
||||
subcommand => {
|
||||
// Must be passed for compatibility with the single-binary test workers.
|
||||
if subcommand != $expected_command {
|
||||
panic!(
|
||||
"trying to run {} binary with the {} subcommand",
|
||||
$expected_command, subcommand
|
||||
)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
let mut node_version = None;
|
||||
let mut socket_path: &str = "";
|
||||
|
||||
for i in 2..args.len() {
|
||||
for i in (2..args.len()).step_by(2) {
|
||||
match args[i].as_ref() {
|
||||
"--socket-path" => socket_path = args[i + 1].as_str(),
|
||||
"--node-version" => version = Some(args[i + 1].as_str()),
|
||||
_ => (),
|
||||
"--node-impl-version" => node_version = Some(args[i + 1].as_str()),
|
||||
arg => panic!("Unexpected argument found: {}", arg),
|
||||
}
|
||||
}
|
||||
|
||||
let subcommand = &args[1];
|
||||
if subcommand != $expected_command {
|
||||
panic!(
|
||||
"trying to run {} binary with the {} subcommand",
|
||||
$expected_command, subcommand
|
||||
)
|
||||
}
|
||||
$entrypoint(&socket_path, version);
|
||||
$entrypoint(&socket_path, node_version, Some($worker_version));
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -75,10 +95,13 @@ pub fn bytes_to_path(bytes: &[u8]) -> Option<PathBuf> {
|
||||
std::str::from_utf8(bytes).ok().map(PathBuf::from)
|
||||
}
|
||||
|
||||
// The worker version must be passed in so that we accurately get the version of the worker, and not
|
||||
// the version that this crate was compiled with.
|
||||
pub fn worker_event_loop<F, Fut>(
|
||||
debug_id: &'static str,
|
||||
socket_path: &str,
|
||||
node_version: Option<&str>,
|
||||
worker_version: Option<&str>,
|
||||
mut event_loop: F,
|
||||
) where
|
||||
F: FnMut(UnixStream) -> Fut,
|
||||
@@ -88,11 +111,13 @@ pub fn worker_event_loop<F, Fut>(
|
||||
gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);
|
||||
|
||||
// Check for a mismatch between the node and worker versions.
|
||||
if let Some(version) = node_version {
|
||||
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
|
||||
if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) {
|
||||
if node_version != worker_version {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
%node_version,
|
||||
%worker_version,
|
||||
"Node and worker version mismatch, node needs restarting, forcing shutdown",
|
||||
);
|
||||
kill_parent_node_in_emergency();
|
||||
|
||||
@@ -121,136 +121,153 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul
|
||||
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in
|
||||
/// immediate worker termination. `None` is used for tests and in other situations when version
|
||||
/// check is not necessary.
|
||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||
worker_event_loop("execute", socket_path, node_version, |mut stream| async move {
|
||||
let worker_pid = std::process::id();
|
||||
pub fn worker_entrypoint(
|
||||
socket_path: &str,
|
||||
node_version: Option<&str>,
|
||||
worker_version: Option<&str>,
|
||||
) {
|
||||
worker_event_loop(
|
||||
"execute",
|
||||
socket_path,
|
||||
node_version,
|
||||
worker_version,
|
||||
|mut stream| async move {
|
||||
let worker_pid = std::process::id();
|
||||
|
||||
let handshake = recv_handshake(&mut stream).await?;
|
||||
let executor = Executor::new(handshake.executor_params).map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
|
||||
})?;
|
||||
let handshake = recv_handshake(&mut stream).await?;
|
||||
let executor = Executor::new(handshake.executor_params).map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
|
||||
})?;
|
||||
|
||||
loop {
|
||||
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"worker: validating artifact {}",
|
||||
artifact_path.display(),
|
||||
);
|
||||
loop {
|
||||
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"worker: validating artifact {}",
|
||||
artifact_path.display(),
|
||||
);
|
||||
|
||||
// Get the artifact bytes.
|
||||
//
|
||||
// We do this outside the thread so that we can lock down filesystem access there.
|
||||
let compiled_artifact_blob = match std::fs::read(artifact_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => {
|
||||
let response = Response::InternalError(
|
||||
InternalValidationError::CouldNotOpenFile(err.to_string()),
|
||||
);
|
||||
send_response(&mut stream, response).await?;
|
||||
continue
|
||||
},
|
||||
};
|
||||
// Get the artifact bytes.
|
||||
//
|
||||
// We do this outside the thread so that we can lock down filesystem access there.
|
||||
let compiled_artifact_blob = match std::fs::read(artifact_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => {
|
||||
let response = Response::InternalError(
|
||||
InternalValidationError::CouldNotOpenFile(err.to_string()),
|
||||
);
|
||||
send_response(&mut stream, response).await?;
|
||||
continue
|
||||
},
|
||||
};
|
||||
|
||||
// Conditional variable to notify us when a thread is done.
|
||||
let condvar = thread::get_condvar();
|
||||
// Conditional variable to notify us when a thread is done.
|
||||
let condvar = thread::get_condvar();
|
||||
|
||||
let cpu_time_start = ProcessTime::now();
|
||||
let cpu_time_start = ProcessTime::now();
|
||||
|
||||
// Spawn a new thread that runs the CPU time monitor.
|
||||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||
let cpu_time_monitor_thread = thread::spawn_worker_thread(
|
||||
"cpu time monitor thread",
|
||||
move || {
|
||||
cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::TimedOut,
|
||||
)?;
|
||||
let executor_2 = executor.clone();
|
||||
let execute_thread = thread::spawn_worker_thread_with_stack_size(
|
||||
"execute thread",
|
||||
move || {
|
||||
// Try to enable landlock.
|
||||
#[cfg(target_os = "linux")]
|
||||
// Spawn a new thread that runs the CPU time monitor.
|
||||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||
let cpu_time_monitor_thread = thread::spawn_worker_thread(
|
||||
"cpu time monitor thread",
|
||||
move || {
|
||||
cpu_time_monitor_loop(
|
||||
cpu_time_start,
|
||||
execution_timeout,
|
||||
cpu_time_monitor_rx,
|
||||
)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::TimedOut,
|
||||
)?;
|
||||
let executor_2 = executor.clone();
|
||||
let execute_thread = thread::spawn_worker_thread_with_stack_size(
|
||||
"execute thread",
|
||||
move || {
|
||||
// Try to enable landlock.
|
||||
#[cfg(target_os = "linux")]
|
||||
let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread()
|
||||
.map(LandlockStatus::from_ruleset_status)
|
||||
.map_err(|e| e.to_string());
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let landlock_status: Result<LandlockStatus, String> = Ok(LandlockStatus::NotEnforced);
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let landlock_status: Result<LandlockStatus, String> = Ok(LandlockStatus::NotEnforced);
|
||||
|
||||
(
|
||||
validate_using_artifact(
|
||||
&compiled_artifact_blob,
|
||||
¶ms,
|
||||
executor_2,
|
||||
cpu_time_start,
|
||||
),
|
||||
landlock_status,
|
||||
)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::Finished,
|
||||
EXECUTE_THREAD_STACK_SIZE,
|
||||
)?;
|
||||
|
||||
let outcome = thread::wait_for_threads(condvar);
|
||||
|
||||
let response = match outcome {
|
||||
WaitOutcome::Finished => {
|
||||
let _ = cpu_time_monitor_tx.send(());
|
||||
let (result, landlock_status) = execute_thread.join().unwrap_or_else(|e| {
|
||||
(
|
||||
Response::Panic(stringify_panic_payload(e)),
|
||||
Ok(LandlockStatus::Unavailable),
|
||||
validate_using_artifact(
|
||||
&compiled_artifact_blob,
|
||||
¶ms,
|
||||
executor_2,
|
||||
cpu_time_start,
|
||||
),
|
||||
landlock_status,
|
||||
)
|
||||
});
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::Finished,
|
||||
EXECUTE_THREAD_STACK_SIZE,
|
||||
)?;
|
||||
|
||||
// Log if landlock threw an error.
|
||||
if let Err(err) = landlock_status {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"error enabling landlock: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
let outcome = thread::wait_for_threads(condvar);
|
||||
|
||||
result
|
||||
},
|
||||
// If the CPU thread is not selected, we signal it to end, the join handle is
|
||||
// dropped and the thread will finish in the background.
|
||||
WaitOutcome::TimedOut => {
|
||||
match cpu_time_monitor_thread.join() {
|
||||
Ok(Some(cpu_time_elapsed)) => {
|
||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||
let response = match outcome {
|
||||
WaitOutcome::Finished => {
|
||||
let _ = cpu_time_monitor_tx.send(());
|
||||
let (result, landlock_status) = execute_thread.join().unwrap_or_else(|e| {
|
||||
(
|
||||
Response::Panic(stringify_panic_payload(e)),
|
||||
Ok(LandlockStatus::Unavailable),
|
||||
)
|
||||
});
|
||||
|
||||
// Log if landlock threw an error.
|
||||
if let Err(err) = landlock_status {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
|
||||
cpu_time_elapsed.as_millis(),
|
||||
execution_timeout.as_millis(),
|
||||
"error enabling landlock: {}",
|
||||
err
|
||||
);
|
||||
Response::TimedOut
|
||||
},
|
||||
Ok(None) =>
|
||||
Response::InternalError(InternalValidationError::CpuTimeMonitorThread(
|
||||
"error communicating over finished channel".into(),
|
||||
)),
|
||||
Err(e) =>
|
||||
Response::InternalError(InternalValidationError::CpuTimeMonitorThread(
|
||||
stringify_panic_payload(e),
|
||||
)),
|
||||
}
|
||||
},
|
||||
WaitOutcome::Pending =>
|
||||
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
|
||||
};
|
||||
}
|
||||
|
||||
send_response(&mut stream, response).await?;
|
||||
}
|
||||
});
|
||||
result
|
||||
},
|
||||
// If the CPU thread is not selected, we signal it to end, the join handle is
|
||||
// dropped and the thread will finish in the background.
|
||||
WaitOutcome::TimedOut => {
|
||||
match cpu_time_monitor_thread.join() {
|
||||
Ok(Some(cpu_time_elapsed)) => {
|
||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
|
||||
cpu_time_elapsed.as_millis(),
|
||||
execution_timeout.as_millis(),
|
||||
);
|
||||
Response::TimedOut
|
||||
},
|
||||
Ok(None) => Response::InternalError(
|
||||
InternalValidationError::CpuTimeMonitorThread(
|
||||
"error communicating over finished channel".into(),
|
||||
),
|
||||
),
|
||||
Err(e) => Response::InternalError(
|
||||
InternalValidationError::CpuTimeMonitorThread(
|
||||
stringify_panic_payload(e),
|
||||
),
|
||||
),
|
||||
}
|
||||
},
|
||||
WaitOutcome::Pending => unreachable!(
|
||||
"we run wait_while until the outcome is no longer pending; qed"
|
||||
),
|
||||
};
|
||||
|
||||
send_response(&mut stream, response).await?;
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn validate_using_artifact(
|
||||
|
||||
@@ -116,169 +116,189 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re
|
||||
///
|
||||
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
||||
/// send that in the `PrepareResult`.
|
||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||
worker_event_loop("prepare", socket_path, node_version, |mut stream| async move {
|
||||
let worker_pid = std::process::id();
|
||||
pub fn worker_entrypoint(
|
||||
socket_path: &str,
|
||||
node_version: Option<&str>,
|
||||
worker_version: Option<&str>,
|
||||
) {
|
||||
worker_event_loop(
|
||||
"prepare",
|
||||
socket_path,
|
||||
node_version,
|
||||
worker_version,
|
||||
|mut stream| async move {
|
||||
let worker_pid = std::process::id();
|
||||
|
||||
loop {
|
||||
let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"worker: preparing artifact",
|
||||
);
|
||||
loop {
|
||||
let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"worker: preparing artifact",
|
||||
);
|
||||
|
||||
let preparation_timeout = pvf.prep_timeout();
|
||||
let prepare_job_kind = pvf.prep_kind();
|
||||
let executor_params = (*pvf.executor_params()).clone();
|
||||
let preparation_timeout = pvf.prep_timeout();
|
||||
let prepare_job_kind = pvf.prep_kind();
|
||||
let executor_params = (*pvf.executor_params()).clone();
|
||||
|
||||
// Conditional variable to notify us when a thread is done.
|
||||
let condvar = thread::get_condvar();
|
||||
// Conditional variable to notify us when a thread is done.
|
||||
let condvar = thread::get_condvar();
|
||||
|
||||
// Run the memory tracker in a regular, non-worker thread.
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
let condvar_memory = Arc::clone(&condvar);
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
|
||||
// Run the memory tracker in a regular, non-worker thread.
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
let condvar_memory = Arc::clone(&condvar);
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
|
||||
|
||||
let cpu_time_start = ProcessTime::now();
|
||||
let cpu_time_start = ProcessTime::now();
|
||||
|
||||
// Spawn a new thread that runs the CPU time monitor.
|
||||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||
let cpu_time_monitor_thread = thread::spawn_worker_thread(
|
||||
"cpu time monitor thread",
|
||||
move || {
|
||||
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::TimedOut,
|
||||
)?;
|
||||
// Spawn another thread for preparation.
|
||||
let prepare_thread = thread::spawn_worker_thread(
|
||||
"prepare thread",
|
||||
move || {
|
||||
// Try to enable landlock.
|
||||
#[cfg(target_os = "linux")]
|
||||
// Spawn a new thread that runs the CPU time monitor.
|
||||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||
let cpu_time_monitor_thread = thread::spawn_worker_thread(
|
||||
"cpu time monitor thread",
|
||||
move || {
|
||||
cpu_time_monitor_loop(
|
||||
cpu_time_start,
|
||||
preparation_timeout,
|
||||
cpu_time_monitor_rx,
|
||||
)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::TimedOut,
|
||||
)?;
|
||||
// Spawn another thread for preparation.
|
||||
let prepare_thread = thread::spawn_worker_thread(
|
||||
"prepare thread",
|
||||
move || {
|
||||
// Try to enable landlock.
|
||||
#[cfg(target_os = "linux")]
|
||||
let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread()
|
||||
.map(LandlockStatus::from_ruleset_status)
|
||||
.map_err(|e| e.to_string());
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let landlock_status: Result<LandlockStatus, String> = Ok(LandlockStatus::NotEnforced);
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let landlock_status: Result<LandlockStatus, String> = Ok(LandlockStatus::NotEnforced);
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut result = prepare_artifact(pvf, cpu_time_start);
|
||||
#[allow(unused_mut)]
|
||||
let mut result = prepare_artifact(pvf, cpu_time_start);
|
||||
|
||||
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
|
||||
#[cfg(target_os = "linux")]
|
||||
let mut result = result.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
|
||||
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
|
||||
#[cfg(target_os = "linux")]
|
||||
let mut result = result
|
||||
.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
|
||||
|
||||
// If we are pre-checking, check for runtime construction errors.
|
||||
//
|
||||
// As pre-checking is more strict than just preparation in terms of memory and
|
||||
// time, it is okay to do extra checks here. This takes negligible time anyway.
|
||||
if let PrepareJobKind::Prechecking = prepare_job_kind {
|
||||
result = result.and_then(|output| {
|
||||
runtime_construction_check(output.0.as_ref(), executor_params)?;
|
||||
Ok(output)
|
||||
});
|
||||
}
|
||||
// If we are pre-checking, check for runtime construction errors.
|
||||
//
|
||||
// As pre-checking is more strict than just preparation in terms of memory and
|
||||
// time, it is okay to do extra checks here. This takes negligible time anyway.
|
||||
if let PrepareJobKind::Prechecking = prepare_job_kind {
|
||||
result = result.and_then(|output| {
|
||||
runtime_construction_check(output.0.as_ref(), executor_params)?;
|
||||
Ok(output)
|
||||
});
|
||||
}
|
||||
|
||||
(result, landlock_status)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::Finished,
|
||||
)?;
|
||||
(result, landlock_status)
|
||||
},
|
||||
Arc::clone(&condvar),
|
||||
WaitOutcome::Finished,
|
||||
)?;
|
||||
|
||||
let outcome = thread::wait_for_threads(condvar);
|
||||
let outcome = thread::wait_for_threads(condvar);
|
||||
|
||||
let result = match outcome {
|
||||
WaitOutcome::Finished => {
|
||||
let _ = cpu_time_monitor_tx.send(());
|
||||
let result = match outcome {
|
||||
WaitOutcome::Finished => {
|
||||
let _ = cpu_time_monitor_tx.send(());
|
||||
|
||||
match prepare_thread.join().unwrap_or_else(|err| {
|
||||
(
|
||||
Err(PrepareError::Panic(stringify_panic_payload(err))),
|
||||
Ok(LandlockStatus::Unavailable),
|
||||
)
|
||||
}) {
|
||||
(Err(err), _) => {
|
||||
// Serialized error will be written into the socket.
|
||||
Err(err)
|
||||
},
|
||||
(Ok(ok), landlock_status) => {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let (artifact, cpu_time_elapsed) = ok;
|
||||
#[cfg(target_os = "linux")]
|
||||
let (artifact, cpu_time_elapsed, max_rss) = ok;
|
||||
|
||||
// Stop the memory stats worker and get its observed memory stats.
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid).await;
|
||||
let memory_stats = MemoryStats {
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
memory_tracker_stats,
|
||||
match prepare_thread.join().unwrap_or_else(|err| {
|
||||
(
|
||||
Err(PrepareError::Panic(stringify_panic_payload(err))),
|
||||
Ok(LandlockStatus::Unavailable),
|
||||
)
|
||||
}) {
|
||||
(Err(err), _) => {
|
||||
// Serialized error will be written into the socket.
|
||||
Err(err)
|
||||
},
|
||||
(Ok(ok), landlock_status) => {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let (artifact, cpu_time_elapsed) = ok;
|
||||
#[cfg(target_os = "linux")]
|
||||
max_rss: extract_max_rss_stat(max_rss, worker_pid),
|
||||
};
|
||||
let (artifact, cpu_time_elapsed, max_rss) = ok;
|
||||
|
||||
// Log if landlock threw an error.
|
||||
if let Err(err) = landlock_status {
|
||||
// Stop the memory stats worker and get its observed memory stats.
|
||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid)
|
||||
.await;
|
||||
let memory_stats = MemoryStats {
|
||||
#[cfg(any(
|
||||
target_os = "linux",
|
||||
feature = "jemalloc-allocator"
|
||||
))]
|
||||
memory_tracker_stats,
|
||||
#[cfg(target_os = "linux")]
|
||||
max_rss: extract_max_rss_stat(max_rss, worker_pid),
|
||||
};
|
||||
|
||||
// Log if landlock threw an error.
|
||||
if let Err(err) = landlock_status {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"error enabling landlock: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
// Write the serialized artifact into a temp file.
|
||||
//
|
||||
// PVF host only keeps artifacts statuses in its memory, successfully
|
||||
// compiled code gets stored on the disk (and consequently deserialized
|
||||
// by execute-workers). The prepare worker is only required to send `Ok`
|
||||
// to the pool to indicate the success.
|
||||
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"worker: writing artifact to {}",
|
||||
temp_artifact_dest.display(),
|
||||
);
|
||||
tokio::fs::write(&temp_artifact_dest, &artifact).await?;
|
||||
|
||||
Ok(PrepareStats { cpu_time_elapsed, memory_stats })
|
||||
},
|
||||
}
|
||||
},
|
||||
// If the CPU thread is not selected, we signal it to end, the join handle is
|
||||
// dropped and the thread will finish in the background.
|
||||
WaitOutcome::TimedOut => {
|
||||
match cpu_time_monitor_thread.join() {
|
||||
Ok(Some(cpu_time_elapsed)) => {
|
||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"error enabling landlock: {}",
|
||||
err
|
||||
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
|
||||
cpu_time_elapsed.as_millis(),
|
||||
preparation_timeout.as_millis(),
|
||||
);
|
||||
}
|
||||
Err(PrepareError::TimedOut)
|
||||
},
|
||||
Ok(None) => Err(PrepareError::IoErr(
|
||||
"error communicating over closed channel".into(),
|
||||
)),
|
||||
// Errors in this thread are independent of the PVF.
|
||||
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
|
||||
}
|
||||
},
|
||||
WaitOutcome::Pending => unreachable!(
|
||||
"we run wait_while until the outcome is no longer pending; qed"
|
||||
),
|
||||
};
|
||||
|
||||
// Write the serialized artifact into a temp file.
|
||||
//
|
||||
// PVF host only keeps artifacts statuses in its memory, successfully
|
||||
// compiled code gets stored on the disk (and consequently deserialized
|
||||
// by execute-workers). The prepare worker is only required to send `Ok`
|
||||
// to the pool to indicate the success.
|
||||
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"worker: writing artifact to {}",
|
||||
temp_artifact_dest.display(),
|
||||
);
|
||||
tokio::fs::write(&temp_artifact_dest, &artifact).await?;
|
||||
|
||||
Ok(PrepareStats { cpu_time_elapsed, memory_stats })
|
||||
},
|
||||
}
|
||||
},
|
||||
// If the CPU thread is not selected, we signal it to end, the join handle is
|
||||
// dropped and the thread will finish in the background.
|
||||
WaitOutcome::TimedOut => {
|
||||
match cpu_time_monitor_thread.join() {
|
||||
Ok(Some(cpu_time_elapsed)) => {
|
||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
|
||||
cpu_time_elapsed.as_millis(),
|
||||
preparation_timeout.as_millis(),
|
||||
);
|
||||
Err(PrepareError::TimedOut)
|
||||
},
|
||||
Ok(None) => Err(PrepareError::IoErr(
|
||||
"error communicating over closed channel".into(),
|
||||
)),
|
||||
// Errors in this thread are independent of the PVF.
|
||||
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
|
||||
}
|
||||
},
|
||||
WaitOutcome::Pending =>
|
||||
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
|
||||
};
|
||||
|
||||
send_response(&mut stream, result).await?;
|
||||
}
|
||||
});
|
||||
send_response(&mut stream, result).await?;
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn prepare_artifact(
|
||||
|
||||
@@ -137,8 +137,10 @@ struct Queue {
|
||||
/// The receiver that receives messages to the pool.
|
||||
to_queue_rx: mpsc::Receiver<ToQueue>,
|
||||
|
||||
// Some variables related to the current session.
|
||||
program_path: PathBuf,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
|
||||
/// The queue of jobs that are waiting for a worker to pick up.
|
||||
queue: VecDeque<ExecuteJob>,
|
||||
@@ -152,12 +154,14 @@ impl Queue {
|
||||
program_path: PathBuf,
|
||||
worker_capacity: usize,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
to_queue_rx: mpsc::Receiver<ToQueue>,
|
||||
) -> Self {
|
||||
Self {
|
||||
metrics,
|
||||
program_path,
|
||||
spawn_timeout,
|
||||
node_version,
|
||||
to_queue_rx,
|
||||
queue: VecDeque::new(),
|
||||
mux: Mux::new(),
|
||||
@@ -398,9 +402,15 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
|
||||
queue.metrics.execute_worker().on_begin_spawn();
|
||||
gum::debug!(target: LOG_TARGET, "spawning an extra worker");
|
||||
|
||||
queue
|
||||
.mux
|
||||
.push(spawn_worker_task(queue.program_path.clone(), job, queue.spawn_timeout).boxed());
|
||||
queue.mux.push(
|
||||
spawn_worker_task(
|
||||
queue.program_path.clone(),
|
||||
job,
|
||||
queue.spawn_timeout,
|
||||
queue.node_version.clone(),
|
||||
)
|
||||
.boxed(),
|
||||
);
|
||||
queue.workers.spawn_inflight += 1;
|
||||
}
|
||||
|
||||
@@ -414,12 +424,18 @@ async fn spawn_worker_task(
|
||||
program_path: PathBuf,
|
||||
job: ExecuteJob,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
) -> QueueEvent {
|
||||
use futures_timer::Delay;
|
||||
|
||||
loop {
|
||||
match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout)
|
||||
.await
|
||||
match super::worker_intf::spawn(
|
||||
&program_path,
|
||||
job.executor_params.clone(),
|
||||
spawn_timeout,
|
||||
node_version.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
|
||||
Err(err) => {
|
||||
@@ -481,8 +497,17 @@ pub fn start(
|
||||
program_path: PathBuf,
|
||||
worker_capacity: usize,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
|
||||
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
|
||||
let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
|
||||
let run = Queue::new(
|
||||
metrics,
|
||||
program_path,
|
||||
worker_capacity,
|
||||
spawn_timeout,
|
||||
node_version,
|
||||
to_queue_rx,
|
||||
)
|
||||
.run();
|
||||
(to_queue_tx, run)
|
||||
}
|
||||
|
||||
@@ -45,14 +45,14 @@ pub async fn spawn(
|
||||
program_path: &Path,
|
||||
executor_params: ExecutorParams,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<&str>,
|
||||
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
|
||||
let (mut idle_worker, worker_handle) = spawn_with_program_path(
|
||||
"execute",
|
||||
program_path,
|
||||
&["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
|
||||
spawn_timeout,
|
||||
)
|
||||
.await?;
|
||||
let mut extra_args = vec!["execute-worker"];
|
||||
if let Some(node_version) = node_version {
|
||||
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
|
||||
}
|
||||
let (mut idle_worker, worker_handle) =
|
||||
spawn_with_program_path("execute", program_path, &extra_args, spawn_timeout).await?;
|
||||
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
|
||||
.await
|
||||
.map_err(|error| {
|
||||
|
||||
@@ -52,6 +52,12 @@ pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
|
||||
/// The amount of times we will retry failed prepare jobs.
|
||||
pub const NUM_PREPARE_RETRIES: u32 = 5;
|
||||
|
||||
/// The name of binary spawned to prepare a PVF artifact
|
||||
pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
|
||||
|
||||
/// The name of binary spawned to execute a PVF
|
||||
pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
|
||||
|
||||
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
|
||||
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
|
||||
|
||||
@@ -144,6 +150,8 @@ struct ExecutePvfInputs {
|
||||
pub struct Config {
|
||||
/// The root directory where the prepared artifacts can be stored.
|
||||
pub cache_path: PathBuf,
|
||||
/// The version of the node. `None` can be passed to skip the version check (only for tests).
|
||||
pub node_version: Option<String>,
|
||||
/// The path to the program that can be used to spawn the prepare workers.
|
||||
pub prepare_worker_program_path: PathBuf,
|
||||
/// The time allotted for a prepare worker to spawn and report to the host.
|
||||
@@ -163,18 +171,20 @@ pub struct Config {
|
||||
|
||||
impl Config {
|
||||
/// Create a new instance of the configuration.
|
||||
pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self {
|
||||
// Do not contaminate the other parts of the codebase with the types from `tokio`.
|
||||
let cache_path = PathBuf::from(cache_path);
|
||||
let program_path = PathBuf::from(program_path);
|
||||
|
||||
pub fn new(
|
||||
cache_path: PathBuf,
|
||||
node_version: Option<String>,
|
||||
prepare_worker_program_path: PathBuf,
|
||||
execute_worker_program_path: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
cache_path,
|
||||
prepare_worker_program_path: program_path.clone(),
|
||||
node_version,
|
||||
prepare_worker_program_path,
|
||||
prepare_worker_spawn_timeout: Duration::from_secs(3),
|
||||
prepare_workers_soft_max_num: 1,
|
||||
prepare_workers_hard_max_num: 1,
|
||||
execute_worker_program_path: program_path,
|
||||
execute_worker_program_path,
|
||||
execute_worker_spawn_timeout: Duration::from_secs(3),
|
||||
execute_workers_max_num: 2,
|
||||
}
|
||||
@@ -204,6 +214,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
|
||||
config.prepare_worker_program_path.clone(),
|
||||
config.cache_path.clone(),
|
||||
config.prepare_worker_spawn_timeout,
|
||||
config.node_version.clone(),
|
||||
);
|
||||
|
||||
let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
|
||||
@@ -220,6 +231,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
|
||||
config.execute_worker_program_path.to_owned(),
|
||||
config.execute_workers_max_num,
|
||||
config.execute_worker_spawn_timeout,
|
||||
config.node_version,
|
||||
);
|
||||
|
||||
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
|
||||
|
||||
@@ -105,7 +105,7 @@ pub mod testing;
|
||||
pub use sp_tracing;
|
||||
|
||||
pub use error::{InvalidCandidate, ValidationError};
|
||||
pub use host::{start, Config, ValidationHost};
|
||||
pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME};
|
||||
pub use metrics::Metrics;
|
||||
pub use priority::Priority;
|
||||
pub use worker_intf::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
|
||||
|
||||
@@ -113,10 +113,13 @@ struct Pool {
|
||||
program_path: PathBuf,
|
||||
cache_path: PathBuf,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
|
||||
to_pool: mpsc::Receiver<ToPool>,
|
||||
from_pool: mpsc::UnboundedSender<FromPool>,
|
||||
spawned: HopSlotMap<Worker, WorkerData>,
|
||||
mux: Mux,
|
||||
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
@@ -128,6 +131,7 @@ async fn run(
|
||||
program_path,
|
||||
cache_path,
|
||||
spawn_timeout,
|
||||
node_version,
|
||||
to_pool,
|
||||
mut from_pool,
|
||||
mut spawned,
|
||||
@@ -155,6 +159,7 @@ async fn run(
|
||||
&program_path,
|
||||
&cache_path,
|
||||
spawn_timeout,
|
||||
node_version.clone(),
|
||||
&mut spawned,
|
||||
&mut mux,
|
||||
to_pool,
|
||||
@@ -201,6 +206,7 @@ fn handle_to_pool(
|
||||
program_path: &Path,
|
||||
cache_path: &Path,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
spawned: &mut HopSlotMap<Worker, WorkerData>,
|
||||
mux: &mut Mux,
|
||||
to_pool: ToPool,
|
||||
@@ -209,7 +215,9 @@ fn handle_to_pool(
|
||||
ToPool::Spawn => {
|
||||
gum::debug!(target: LOG_TARGET, "spawning a new prepare worker");
|
||||
metrics.prepare_worker().on_begin_spawn();
|
||||
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
|
||||
mux.push(
|
||||
spawn_worker_task(program_path.to_owned(), spawn_timeout, node_version).boxed(),
|
||||
);
|
||||
},
|
||||
ToPool::StartWork { worker, pvf, artifact_path } => {
|
||||
if let Some(data) = spawned.get_mut(worker) {
|
||||
@@ -248,11 +256,15 @@ fn handle_to_pool(
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> PoolEvent {
|
||||
async fn spawn_worker_task(
|
||||
program_path: PathBuf,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
) -> PoolEvent {
|
||||
use futures_timer::Delay;
|
||||
|
||||
loop {
|
||||
match worker_intf::spawn(&program_path, spawn_timeout).await {
|
||||
match worker_intf::spawn(&program_path, spawn_timeout, node_version.as_deref()).await {
|
||||
Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
|
||||
Err(err) => {
|
||||
gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
|
||||
@@ -419,6 +431,7 @@ pub fn start(
|
||||
program_path: PathBuf,
|
||||
cache_path: PathBuf,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<String>,
|
||||
) -> (mpsc::Sender<ToPool>, mpsc::UnboundedReceiver<FromPool>, impl Future<Output = ()>) {
|
||||
let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
|
||||
let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
|
||||
@@ -428,6 +441,7 @@ pub fn start(
|
||||
program_path,
|
||||
cache_path,
|
||||
spawn_timeout,
|
||||
node_version,
|
||||
to_pool: to_pool_rx,
|
||||
from_pool: from_pool_tx,
|
||||
spawned: HopSlotMap::with_capacity_and_key(20),
|
||||
|
||||
@@ -45,14 +45,13 @@ use tokio::{io, net::UnixStream};
|
||||
pub async fn spawn(
|
||||
program_path: &Path,
|
||||
spawn_timeout: Duration,
|
||||
node_version: Option<&str>,
|
||||
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
|
||||
spawn_with_program_path(
|
||||
"prepare",
|
||||
program_path,
|
||||
&["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
|
||||
spawn_timeout,
|
||||
)
|
||||
.await
|
||||
let mut extra_args = vec!["prepare-worker"];
|
||||
if let Some(node_version) = node_version {
|
||||
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
|
||||
}
|
||||
spawn_with_program_path("prepare", program_path, &extra_args, spawn_timeout).await
|
||||
}
|
||||
|
||||
pub enum Outcome {
|
||||
|
||||
@@ -58,37 +58,35 @@ macro_rules! decl_puppet_worker_main {
|
||||
$crate::sp_tracing::try_init_simple();
|
||||
|
||||
let args = std::env::args().collect::<Vec<_>>();
|
||||
if args.len() < 3 {
|
||||
if args.len() == 1 {
|
||||
panic!("wrong number of arguments");
|
||||
}
|
||||
|
||||
let mut version = None;
|
||||
let mut socket_path: &str = "";
|
||||
|
||||
for i in 2..args.len() {
|
||||
match args[i].as_ref() {
|
||||
"--socket-path" => socket_path = args[i + 1].as_str(),
|
||||
"--node-version" => version = Some(args[i + 1].as_str()),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
let subcommand = &args[1];
|
||||
match subcommand.as_ref() {
|
||||
let entrypoint = match args[1].as_ref() {
|
||||
"exit" => {
|
||||
std::process::exit(1);
|
||||
},
|
||||
"sleep" => {
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
return
|
||||
},
|
||||
"prepare-worker" => {
|
||||
$crate::prepare_worker_entrypoint(&socket_path, version);
|
||||
},
|
||||
"execute-worker" => {
|
||||
$crate::execute_worker_entrypoint(&socket_path, version);
|
||||
},
|
||||
"prepare-worker" => $crate::prepare_worker_entrypoint,
|
||||
"execute-worker" => $crate::execute_worker_entrypoint,
|
||||
other => panic!("unknown subcommand: {}", other),
|
||||
};
|
||||
|
||||
let mut node_version = None;
|
||||
let mut socket_path: &str = "";
|
||||
|
||||
for i in (2..args.len()).step_by(2) {
|
||||
match args[i].as_ref() {
|
||||
"--socket-path" => socket_path = args[i + 1].as_str(),
|
||||
"--node-impl-version" => node_version = Some(args[i + 1].as_str()),
|
||||
arg => panic!("Unexpected argument found: {}", arg),
|
||||
}
|
||||
}
|
||||
|
||||
entrypoint(&socket_path, node_version, None);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -43,12 +43,14 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
|
||||
pub async fn spawn_with_program_path(
|
||||
debug_id: &'static str,
|
||||
program_path: impl Into<PathBuf>,
|
||||
extra_args: &'static [&'static str],
|
||||
extra_args: &[&str],
|
||||
spawn_timeout: Duration,
|
||||
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
|
||||
let program_path = program_path.into();
|
||||
with_transient_socket_path(debug_id, |socket_path| {
|
||||
let socket_path = socket_path.to_owned();
|
||||
let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
|
||||
|
||||
async move {
|
||||
let listener = UnixListener::bind(&socket_path).map_err(|err| {
|
||||
gum::warn!(
|
||||
@@ -63,7 +65,7 @@ pub async fn spawn_with_program_path(
|
||||
})?;
|
||||
|
||||
let handle =
|
||||
WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| {
|
||||
WorkerHandle::spawn(&program_path, &extra_args, socket_path).map_err(|err| {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
%debug_id,
|
||||
@@ -214,7 +216,7 @@ pub struct WorkerHandle {
|
||||
impl WorkerHandle {
|
||||
fn spawn(
|
||||
program: impl AsRef<Path>,
|
||||
extra_args: &[&str],
|
||||
extra_args: &[String],
|
||||
socket_path: impl AsRef<Path>,
|
||||
) -> io::Result<Self> {
|
||||
let mut child = process::Command::new(program.as_ref())
|
||||
|
||||
@@ -53,7 +53,8 @@ impl TestHost {
|
||||
{
|
||||
let cache_dir = tempfile::tempdir().unwrap();
|
||||
let program_path = std::path::PathBuf::from(PUPPET_EXE);
|
||||
let mut config = Config::new(cache_dir.path().to_owned(), program_path);
|
||||
let mut config =
|
||||
Config::new(cache_dir.path().to_owned(), None, program_path.clone(), program_path);
|
||||
f(&mut config);
|
||||
let (host, task) = start(config, Metrics::default());
|
||||
let _ = tokio::task::spawn(task);
|
||||
|
||||
Reference in New Issue
Block a user