PVF validation host: do not alter niceness (#4525)

We wanted to change niceness to accomodate the fact that some of the
preparation tasks are low priority. For example, when a node sees that
there is a new para was onboarded the node may start preparing right
away. Since all other activities are more important, such as network I/O
or validation of the backed candidates and preparation of the
immediatelly needed PVFs.

However, it turned out that this approach does not work: generally
non-root processes can only decrease niceness and they cannot increase
it to the previous value, as was assumed by the code.

Apart from that, https://github.com/paritytech/polkadot/pull/4123
assumes all PVFs are prepared in the same way. Specifically, that if a
PVF preparation failed before, then PVF pre-checking will also report
that it was failed, even though it could happen that preparation failed
due to being low-priority. In order to avoid such cases, we decided to
simplify the whole preparation model. Preparation under low priority
does not work well with that.

Closes https://github.com/paritytech/polkadot/issues/4520
This commit is contained in:
Sergei Shulepov
2021-12-14 17:17:45 +01:00
committed by GitHub
parent 2ccbf38b85
commit 1493fed1ed
7 changed files with 13 additions and 197 deletions
-1
View File
@@ -6329,7 +6329,6 @@ dependencies = [
"futures 0.3.18",
"futures-timer 3.0.2",
"hex-literal",
"libc",
"parity-scale-codec",
"pin-project 1.0.8",
"polkadot-core-primitives",
-1
View File
@@ -15,7 +15,6 @@ async-process = "1.3.0"
assert_matches = "1.4.0"
futures = "0.3.17"
futures-timer = "3.0.2"
libc = "0.2.109"
slotmap = "1.0"
tracing = "0.1.29"
pin-project = "1.0.8"
+3 -56
View File
@@ -486,12 +486,6 @@ async fn handle_execute_pvf(
.await?;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
send_prepare(
prepare_queue,
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
)
.await?;
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
@@ -525,18 +519,17 @@ async fn handle_heads_up(
*last_time_needed = now;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
// The artifact is already being prepared, so we don't need to do anything.
},
ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
// It's not in the artifacts, so we need to enqueue a job to prepare it.
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf },
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
)
.await?;
}
@@ -923,48 +916,6 @@ mod tests {
test.poll_ensure_to_sweeper_is_empty().await;
}
#[async_std::test]
async fn amending_priority() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
// Run until we receive a prepare request.
let prepare_q_rx = &mut test.to_prepare_queue_rx;
run_until(
&mut test.run,
async {
assert_matches!(
prepare_q_rx.next().await.unwrap(),
prepare::ToQueue::Enqueue { .. }
);
}
.boxed(),
)
.await;
let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
vec![],
Priority::Critical,
result_tx,
)
.await
.unwrap();
run_until(
&mut test.run,
async {
assert_matches!(prepare_q_rx.next().await.unwrap(), prepare::ToQueue::Amend { .. });
}
.boxed(),
)
.await;
}
#[async_std::test]
async fn execute_pvf_requests() {
let mut test = Builder::default().build();
@@ -1007,10 +958,6 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Amend { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
+3 -19
View File
@@ -53,10 +53,6 @@ pub enum ToPool {
/// this message is processed.
Kill(Worker),
/// If the given worker was started with the background priority, then it will be raised up to
/// normal priority. Otherwise, it's no-op.
BumpPriority(Worker),
/// Request the given worker to start working on the given code.
///
/// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent back.
@@ -65,12 +61,7 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
},
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
}
/// A message sent from pool to its client.
@@ -214,7 +205,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, background_priority } => {
ToPool::StartWork { worker, code, artifact_path } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
@@ -225,7 +216,6 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
background_priority,
preparation_timer,
)
.boxed(),
@@ -248,10 +238,6 @@ fn handle_to_pool(
// It may be absent if it were previously already removed by `purge_dead`.
let _ = attempt_retire(metrics, spawned, worker);
},
ToPool::BumpPriority(worker) =>
if let Some(data) = spawned.get(worker) {
worker::bump_priority(&data.handle);
},
}
}
@@ -277,11 +263,9 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
background_priority: bool,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}
+5 -75
View File
@@ -29,11 +29,8 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
Amend { priority: Priority, artifact_id: ArtifactId },
}
/// A response from queue.
@@ -97,7 +94,6 @@ impl WorkerData {
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
#[derive(Default)]
struct Unscheduled {
background: VecDeque<Job>,
normal: VecDeque<Job>,
critical: VecDeque<Job>,
}
@@ -105,7 +101,6 @@ struct Unscheduled {
impl Unscheduled {
fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
match prio {
Priority::Background => &mut self.background,
Priority::Normal => &mut self.normal,
Priority::Critical => &mut self.critical,
}
@@ -120,14 +115,12 @@ impl Unscheduled {
}
fn is_empty(&self) -> bool {
self.background.is_empty() && self.normal.is_empty() && self.critical.is_empty()
self.normal.is_empty() && self.critical.is_empty()
}
fn next(&mut self) -> Option<Job> {
let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
check(Priority::Critical)
.or_else(|| check(Priority::Normal))
.or_else(|| check(Priority::Background))
check(Priority::Critical).or_else(|| check(Priority::Normal))
}
}
@@ -213,9 +206,6 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
},
ToQueue::Amend { priority, artifact_id } => {
handle_amend(queue, priority, artifact_id).await?;
},
}
Ok(())
}
@@ -265,41 +255,6 @@ fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
}
async fn handle_amend(
queue: &mut Queue,
priority: Priority,
artifact_id: ArtifactId,
) -> Result<(), Fatal> {
if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?priority,
"amending preparation priority.",
);
let mut job_data: &mut JobData = &mut queue.jobs[job];
if job_data.priority < priority {
// The new priority is higher. We should do two things:
// - if the worker was already spawned with the background prio and the new one is not
// (it's already the case, if we are in this branch but we still do the check for
// clarity), then we should tell the pool to bump the priority for the worker.
//
// - save the new priority in the job.
if let Some(worker) = job_data.worker {
if job_data.priority.is_background() && !priority.is_background() {
send_pool(&mut queue.to_pool_tx, pool::ToPool::BumpPriority(worker)).await?;
}
}
job_data.priority = priority;
}
}
Ok(())
}
async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
use pool::FromPool::*;
match from_pool {
@@ -469,12 +424,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
background_priority: job_data.priority.is_background(),
},
pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path },
)
.await?;
@@ -644,7 +594,7 @@ mod tests {
async fn properly_concludes() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(());
@@ -713,26 +663,6 @@ mod tests {
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}
#[async_std::test]
async fn bump_prio_on_urgency_change() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_queue(ToQueue::Amend {
priority: Priority::Normal,
artifact_id: pvf(1).as_artifact_id(),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::BumpPriority(w));
}
#[async_std::test]
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
+2 -35
View File
@@ -32,9 +32,6 @@ use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{any::Any, panic, sync::Arc, time::Duration};
const NICENESS_BACKGROUND: i32 = 10;
const NICENESS_FOREGROUND: i32 = 0;
/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
@@ -72,22 +69,16 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
background_priority: bool,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
%background_priority,
"starting prepare for {}",
artifact_path.display(),
);
if background_priority {
renice(pid, NICENESS_BACKGROUND);
}
with_tmp_file(pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
tracing::warn!(
@@ -172,10 +163,8 @@ pub async fn start_work(
};
match selected {
Selected::Done(result) => {
renice(pid, NICENESS_FOREGROUND);
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
},
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Selected::Deadline => Outcome::TimedOut,
Selected::IoErr => Outcome::DidNotMakeIt,
}
@@ -250,28 +239,6 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
Ok((code, tmp_file))
}
pub fn bump_priority(handle: &WorkerHandle) {
let pid = handle.id();
renice(pid, NICENESS_FOREGROUND);
}
fn renice(pid: u32, niceness: i32) {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"changing niceness to {}",
niceness,
);
// Consider upstreaming this to the `nix` crate.
unsafe {
if -1 == libc::setpriority(libc::PRIO_PROCESS, pid, niceness) {
let err = std::io::Error::last_os_error();
tracing::warn!(target: LOG_TARGET, "failed to set the priority: {:?}", err);
}
}
}
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
-10
View File
@@ -17,11 +17,6 @@
/// A priority assigned to execution of a PVF.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
/// Jobs in this priority will be executed in the background, meaning that they will be only
/// given spare CPU time.
///
/// This is mainly for cache warmings.
Background,
/// Normal priority for things that do not require immediate response, but still need to be
/// done pretty quick.
///
@@ -38,9 +33,4 @@ impl Priority {
pub fn is_critical(self) -> bool {
self == Priority::Critical
}
/// Returns `true` if `self` is `Background`
pub fn is_background(self) -> bool {
self == Priority::Background
}
}