Executor Environment parameterization (#6161)

* Re-apply changes without Diener, rebase to the lastest master

* Cache pruning

* Bit-pack InstantiationStrategy

* Move ExecutorParams version inside the structure itself

* Rework runtime API and executor parameters storage

* Pass executor parameters through backing subsystem

* Update Cargo.lock

* Introduce `ExecutorParams` to approval voting subsys

* Introduce `ExecutorParams` to dispute coordinator

* `cargo fmt`

* Simplify requests from backing subsys

* Fix tests

* Replace manual config cloning with `.clone()`

* Move constants to module

* Parametrize executor performing PVF pre-check

* Fix Malus

* Fix test runtime

* Introduce session executor params as a constant defined by session info
pallet

* Use Parity SCALE codec instead of hand-crafted binary encoding

* Get rid of constants; Add docs

* Get rid of constants

* Minor typo

* Fix Malus after rebase

* `cargo fmt`

* Use transparent SCALE encoding instead of explicit

* Clean up

* Get rid of relay parent to session index mapping

* Join environment type and version in a single enum element

* Use default execution parameters if running an old runtime

* `unwrap()` -> `expect()`

* Correct API version

* Constants are back in town

* Use constants for execution environment types

* Artifact separation, first try

* Get rid of explicit version

* PVF execution queue worker separation

* Worker handshake

* Global renaming

* Minor fixes resolving discussions

* Two-stage requesting of executor params to make use of runtime API cache

* Proper error handling in pvf-checker

* Executor params storage bootstrapping

* Propagate migration to v3 network runtimes

* Fix storage versioning

* Ensure `ExecutorParams` serialization determinism; Add comments

* Rename constants to make things a bit more deterministic
Get rid of stale code

* Tidy up a structure of active PVFs

* Minor formatting

* Fix comment

* Add try-runtime hooks

* Add storage version write on upgrade

Co-authored-by: Andronik <write@reusable.software>

* Add pre- and post-upgrade assertions

* Require to specify environment type; Remove redundant `impl`s

* Add `ExecutorParamHash` creation from `H256`

* Fix candidate validation subsys tests

* Return splittable error from executor params request fn

* Revert "Return splittable error from executor params request fn"

This reverts commit a0b274177d8bb2f6e13c066741892ecd2e72a456.

* Decompose approval voting metrics

* Use more relevant errors

* Minor formatting fix

* Assert a valid environment type instead of checking

* Fix `try-runtime` hooks

* After-merge fixes

* Add migration logs

* Remove dead code

* Fix tests

* Fix tests

* Back to the strongly typed implementation

* Promote strong types to executor interface

* Remove stale comment

* Move executor params to `SessionInfo`: primitives and runtime

* Move executor params to `SessionInfo`: node

* Try to bump primitives and API version

* Get rid of `MallocSizeOf`

* Bump target API version to v4

* Make use of session index already in place

* Back to v3

* Fix all the tests

* Add migrations to all the runtimes

* Make use of existing `SessionInfo` in approval voting subsys

* Rename `TARGET` -> `LOG_TARGET`

* Bump all the primitives to v3

* Fix Rococo ParachainHost API version

* Use `RollingSessionWindow` to acquire `ExecutorParams` in disputes

* Fix nits from discussions; add comments

* Re-evaluate queue logic

* Rework job assignment in execution queue

* Add documentation

* Use `RuntimeInfo` to obtain `SessionInfo` (with blackjack and caching)

* Couple `Pvf` with `ExecutorParams` wherever possible

* Put members of `PvfWithExecutorParams` under `Arc` for cheap cloning

* Fix comment

* Fix CI tests

* Fix clippy warnings

* Address nits from discussions

* Add a placeholder for raw data

* Fix non exhaustive match

* Remove redundant reexports and fix imports

* Keep only necessary semantic features, as discussed

* Rework `RuntimeInfo` to support mock implementation for tests

* Remove unneeded bound

* `cargo fmt`

* Revert "Remove unneeded bound"

This reverts commit 932463f26b00ce290e1e61848eb9328632ef8a61.

* Fix PVF host tests

* Fix PVF checker tests

* Fix overseer declarations

* Simplify tests

* `MAX_KEEP_WAITING` timeout based on `BACKGING_EXECUTION_TIMEOUT`

* Add a unit test for varying executor parameters

* Minor fixes from discussions

* Add prechecking max. memory parameter (see paritytech/srlabs_findings#110)

* Fix and improve a test

* Remove `ExecutionEnvironment` and `RawData`

* New primitives versioning in parachain host API

* `disputes()` implementation for Kusama and Polkadot

* Move `ExecutorParams` from `vstaging` to stable primitives

* Move disputes from `vstaging` to stable implementation

* Fix `try-runtime`

* Fixes after merge

* Move `ExecutorParams` to the bottom of `SessionInfo`

* Revert "Move executor params to `SessionInfo`: primitives and runtime"

This reverts commit dfcfb85fefd1c5be6c8a8f72dc09fd1809cfa9ce.

* Always use fresh activated live hash in pvf precheck
(re-apply 34b09a4c20de17e7926ed942cd0d657d18f743fa)

* Fixing tests (broken commit)

* Fix candidate validation tests

* Fix PVF host test

* Minor fixes

* Address discussions

* Restore migration

* Fix `use` to only include what is needed instead of `*`

* Add comment to never touch `DEFAULT_CONFIG`

* Update migration to set default `ExecutorParams` for `dispute_period`
sessions back

* Use `earliest_stored_session` instead of calculations

* Nit

* Add logs

* Treat any runtime error as `NotSupported` again

* Always return default executor params if not available

* Revert "Always return default executor params if not available"

This reverts commit b58ac4482ef444c67a9852d5776550d08e312f30.

* Add paritytech/substrate#9997 workaround

* `cargo fmt`

* Remove migration (again!)

* Bump executor params to API v4 (backport from #6698)

---------

Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
s0me0ne-unkn0wn
2023-02-15 12:26:09 +01:00
committed by GitHub
parent 7f6b8e6df9
commit dd0a556665
40 changed files with 1243 additions and 330 deletions
+149 -38
View File
@@ -30,8 +30,23 @@ use futures::{
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use polkadot_node_primitives::BACKING_EXECUTION_TIMEOUT;
use polkadot_primitives::vstaging::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};
use std::{
collections::VecDeque,
fmt,
path::PathBuf,
time::{Duration, Instant},
};
/// The amount of time a job for which the queue does not have a compatible worker may wait in the
/// queue. After that time passes, the queue will kill the first worker which becomes idle to
/// re-spawn a new worker to execute the job immediately.
/// To make any sense and not to break things, the value should be greater than minimal execution
/// timeout in use, and less than the block time.
const MAX_KEEP_WAITING: Duration =
Duration::from_millis(BACKING_EXECUTION_TIMEOUT.as_millis() as u64 * 2);
slotmap::new_key_type! { struct Worker; }
@@ -41,6 +56,7 @@ pub enum ToQueue {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
},
}
@@ -49,12 +65,15 @@ struct ExecuteJob {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
waiting_since: Instant,
}
struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
executor_params_hash: ExecutorParamsHash,
}
impl fmt::Debug for WorkerData {
@@ -79,7 +98,17 @@ impl Workers {
self.spawn_inflight + self.running.len() < self.capacity
}
fn find_available(&self) -> Option<Worker> {
fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option<Worker> {
self.running.iter().find_map(|d| {
if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash {
Some(d.0)
} else {
None
}
})
}
fn find_idle(&self) -> Option<Worker> {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
@@ -94,7 +123,7 @@ impl Workers {
}
enum QueueEvent {
Spawn(IdleWorker, WorkerHandle),
Spawn(IdleWorker, WorkerHandle, ExecuteJob),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
}
@@ -154,6 +183,66 @@ impl Queue {
purge_dead(&self.metrics, &mut self.workers).await;
}
}
/// Tries to assign a job in the queue to a worker. If an idle worker is provided, it does its
/// best to find a job with a compatible execution environment unless there are jobs in the
/// queue waiting too long. In that case, it kills an existing idle worker and spawns a new
/// one. It may spawn an additional worker if that is affordable.
/// If all the workers are busy or the queue is empty, it does nothing.
/// Should be called every time a new job arrives to the queue or a job finishes.
fn try_assign_next_job(&mut self, finished_worker: Option<Worker>) {
// New jobs are always pushed to the tail of the queue; the one at its head is always
// the eldest one.
let eldest = if let Some(eldest) = self.queue.get(0) { eldest } else { return };
// By default, we're going to execute the eldest job on any worker slot available, even if
// we have to kill and re-spawn a worker
let mut worker = None;
let mut job_index = 0;
// But if we're not pressed for time, we can try to find a better job-worker pair not
// requiring the expensive kill-spawn operation
if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING {
if let Some(finished_worker) = finished_worker {
if let Some(worker_data) = self.workers.running.get(finished_worker) {
for (i, job) in self.queue.iter().enumerate() {
if worker_data.executor_params_hash == job.executor_params.hash() {
(worker, job_index) = (Some(finished_worker), i);
break
}
}
}
}
}
if worker.is_none() {
// Try to obtain a worker for the job
worker = self.workers.find_available(self.queue[job_index].executor_params.hash());
}
if worker.is_none() {
if let Some(idle) = self.workers.find_idle() {
// No available workers of required type but there are some idle ones of other
// types, have to kill one and re-spawn with the correct type
if self.workers.running.remove(idle).is_some() {
self.metrics.execute_worker().on_retired();
}
}
}
if worker.is_none() && !self.workers.can_afford_one_more() {
// Bad luck, no worker slot can be used to execute the job
return
}
let job = self.queue.remove(job_index).expect("Job is just checked to be in queue; qed");
if let Some(worker) = worker {
assign(self, worker, job);
} else {
spawn_extra_worker(self, job);
}
}
}
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
@@ -172,29 +261,30 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, execution_timeout, params, result_tx } = to_queue;
let ToQueue::Enqueue { artifact, execution_timeout, params, executor_params, result_tx } =
to_queue;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob { artifact, execution_timeout, params, result_tx };
if let Some(available) = queue.workers.find_available() {
assign(queue, available, job);
} else {
if queue.workers.can_afford_one_more() {
spawn_extra_worker(queue);
}
queue.queue.push_back(job);
}
let job = ExecuteJob {
artifact,
execution_timeout,
params,
executor_params,
result_tx,
waiting_since: Instant::now(),
};
queue.queue.push_back(job);
queue.try_assign_next_job(None);
}
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
QueueEvent::Spawn(idle, handle) => {
handle_worker_spawned(queue, idle, handle);
QueueEvent::Spawn(idle, handle, job) => {
handle_worker_spawned(queue, idle, handle, job);
},
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
@@ -202,16 +292,23 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
}
}
fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
fn handle_worker_spawned(
queue: &mut Queue,
idle: IdleWorker,
handle: WorkerHandle,
job: ExecuteJob,
) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
let worker = queue.workers.running.insert(WorkerData {
idle: Some(idle),
handle,
executor_params_hash: job.executor_params.hash(),
});
gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
assign(queue, worker, job);
}
/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
@@ -280,42 +377,45 @@ fn handle_job_finish(
if let Some(idle_worker) = idle_worker {
if let Some(data) = queue.workers.running.get_mut(worker) {
data.idle = Some(idle_worker);
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
return queue.try_assign_next_job(Some(worker))
}
} else {
// Note it's possible that the worker was purged already by `purge_dead`
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
if !queue.queue.is_empty() {
// The worker has died and we still have work we have to do. Request an extra worker.
//
// That can potentially overshoot, but that should be OK.
spawn_extra_worker(queue);
}
}
queue.try_assign_next_job(None);
}
fn spawn_extra_worker(queue: &mut Queue) {
fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
queue.metrics.execute_worker().on_begin_spawn();
gum::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
.mux
.push(spawn_worker_task(queue.program_path.clone(), queue.spawn_timeout).boxed());
.push(spawn_worker_task(queue.program_path.clone(), job, queue.spawn_timeout).boxed());
queue.workers.spawn_inflight += 1;
}
async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> QueueEvent {
/// Spawns a new worker to execute a pre-assigned job.
/// A worker is never spawned as idle; a job to be executed by the worker has to be determined
/// beforehand. In such a way, a race condition is avoided: during the worker being spawned,
/// another job in the queue, with an incompatible execution environment, may become stale, and
/// the queue would have to kill a newly started worker and spawn another one.
/// Nevertheless, if the worker finishes executing the job, it becomes idle and may be used to execute other jobs with a compatible execution environment.
async fn spawn_worker_task(
program_path: PathBuf,
job: ExecuteJob,
spawn_timeout: Duration,
) -> QueueEvent {
use futures_timer::Delay;
loop {
match super::worker::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle),
match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await
{
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
@@ -328,7 +428,8 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
/// Ask the given worker to perform the given job.
///
/// The worker must be running and idle.
/// The worker must be running and idle. The job and the worker must share the same execution
/// environment parameter set.
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
gum::debug!(
target: LOG_TARGET,
@@ -337,6 +438,16 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
"assigning the execute worker",
);
debug_assert_eq!(
queue
.workers
.running
.get(worker)
.expect("caller must provide existing worker; qed")
.executor_params_hash,
job.executor_params.hash()
);
let idle = queue.workers.claim_idle(worker).expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
+42 -2
View File
@@ -28,7 +28,9 @@ use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult;
use polkadot_primitives::vstaging::ExecutorParams;
use std::{
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
@@ -37,13 +39,29 @@ use std::{
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
/// Sends a handshake message to the worker as soon as it is spawned.
///
/// The program should be able to handle `<program-path> execute-worker <socket-path>` invocation.
pub async fn spawn(
program_path: &Path,
executor_params: ExecutorParams,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout).await
let (mut idle_worker, worker_handle) =
spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout)
.await?;
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %idle_worker.pid,
?error,
"failed to send a handshake to the spawned worker",
);
SpawnErr::Handshake
})?;
Ok((idle_worker, worker_handle))
}
/// Outcome of PVF execution.
@@ -159,6 +177,21 @@ pub async fn start_work(
}
}
async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
framed_send(stream, &handshake.encode()).await
}
async fn recv_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
let handshake_enc = framed_recv(stream).await?;
let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_handshake: failed to decode Handshake".to_owned(),
)
})?;
Ok(handshake)
}
async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
@@ -203,6 +236,11 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
})
}
#[derive(Encode, Decode)]
struct Handshake {
executor_params: ExecutorParams,
}
#[derive(Encode, Decode)]
pub enum Response {
Ok { result_descriptor: ValidationResult, duration: Duration },
@@ -225,7 +263,9 @@ impl Response {
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let executor = Arc::new(Executor::new().map_err(|e| {
let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);