mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 11:01:01 +00:00
PVF timeouts follow-up (#6151)
* Rename timeout consts and timeout parameter; bump leniency * Update implementor's guide with info about PVFs * Make glossary a bit easier to read * Add a note to LENIENT_PREPARATION_TIMEOUT * Remove PVF-specific section from glossary * Fix some typos
This commit is contained in:
@@ -65,7 +65,7 @@ pub enum ToPool {
|
||||
worker: Worker,
|
||||
code: Arc<Vec<u8>>,
|
||||
artifact_path: PathBuf,
|
||||
compilation_timeout: Duration,
|
||||
preparation_timeout: Duration,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -210,7 +210,7 @@ fn handle_to_pool(
|
||||
metrics.prepare_worker().on_begin_spawn();
|
||||
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
|
||||
},
|
||||
ToPool::StartWork { worker, code, artifact_path, compilation_timeout } => {
|
||||
ToPool::StartWork { worker, code, artifact_path, preparation_timeout } => {
|
||||
if let Some(data) = spawned.get_mut(worker) {
|
||||
if let Some(idle) = data.idle.take() {
|
||||
let preparation_timer = metrics.time_preparation();
|
||||
@@ -221,7 +221,7 @@ fn handle_to_pool(
|
||||
code,
|
||||
cache_path.to_owned(),
|
||||
artifact_path,
|
||||
compilation_timeout,
|
||||
preparation_timeout,
|
||||
preparation_timer,
|
||||
)
|
||||
.boxed(),
|
||||
@@ -269,11 +269,11 @@ async fn start_work_task<Timer>(
|
||||
code: Arc<Vec<u8>>,
|
||||
cache_path: PathBuf,
|
||||
artifact_path: PathBuf,
|
||||
compilation_timeout: Duration,
|
||||
preparation_timeout: Duration,
|
||||
_preparation_timer: Option<Timer>,
|
||||
) -> PoolEvent {
|
||||
let outcome =
|
||||
worker::start_work(idle, code, &cache_path, artifact_path, compilation_timeout).await;
|
||||
worker::start_work(idle, code, &cache_path, artifact_path, preparation_timeout).await;
|
||||
PoolEvent::StartWork(worker, outcome)
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ pub enum ToQueue {
|
||||
///
|
||||
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
|
||||
/// [`FromQueue`] response.
|
||||
Enqueue { priority: Priority, pvf: Pvf, compilation_timeout: Duration },
|
||||
Enqueue { priority: Priority, pvf: Pvf, preparation_timeout: Duration },
|
||||
}
|
||||
|
||||
/// A response from queue.
|
||||
@@ -80,7 +80,7 @@ struct JobData {
|
||||
priority: Priority,
|
||||
pvf: Pvf,
|
||||
/// The timeout for the preparation job.
|
||||
compilation_timeout: Duration,
|
||||
preparation_timeout: Duration,
|
||||
worker: Option<Worker>,
|
||||
}
|
||||
|
||||
@@ -208,8 +208,8 @@ impl Queue {
|
||||
|
||||
async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
|
||||
match to_queue {
|
||||
ToQueue::Enqueue { priority, pvf, compilation_timeout } => {
|
||||
handle_enqueue(queue, priority, pvf, compilation_timeout).await?;
|
||||
ToQueue::Enqueue { priority, pvf, preparation_timeout } => {
|
||||
handle_enqueue(queue, priority, pvf, preparation_timeout).await?;
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
@@ -219,13 +219,13 @@ async fn handle_enqueue(
|
||||
queue: &mut Queue,
|
||||
priority: Priority,
|
||||
pvf: Pvf,
|
||||
compilation_timeout: Duration,
|
||||
preparation_timeout: Duration,
|
||||
) -> Result<(), Fatal> {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
validation_code_hash = ?pvf.code_hash,
|
||||
?priority,
|
||||
?compilation_timeout,
|
||||
?preparation_timeout,
|
||||
"PVF is enqueued for preparation.",
|
||||
);
|
||||
queue.metrics.prepare_enqueued();
|
||||
@@ -247,7 +247,7 @@ async fn handle_enqueue(
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let job = queue.jobs.insert(JobData { priority, pvf, compilation_timeout, worker: None });
|
||||
let job = queue.jobs.insert(JobData { priority, pvf, preparation_timeout, worker: None });
|
||||
queue.artifact_id_to_job.insert(artifact_id, job);
|
||||
|
||||
if let Some(available) = find_idle_worker(queue) {
|
||||
@@ -439,7 +439,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
|
||||
worker,
|
||||
code: job_data.pvf.code.clone(),
|
||||
artifact_path,
|
||||
compilation_timeout: job_data.compilation_timeout,
|
||||
preparation_timeout: job_data.preparation_timeout,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
@@ -494,7 +494,7 @@ pub fn start(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{error::PrepareError, host::PRECHECK_COMPILATION_TIMEOUT};
|
||||
use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT};
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use slotmap::SlotMap;
|
||||
@@ -612,7 +612,7 @@ mod tests {
|
||||
test.send_queue(ToQueue::Enqueue {
|
||||
priority: Priority::Normal,
|
||||
pvf: pvf(1),
|
||||
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
|
||||
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
|
||||
});
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
|
||||
@@ -626,12 +626,12 @@ mod tests {
|
||||
#[async_std::test]
|
||||
async fn dont_spawn_over_soft_limit_unless_critical() {
|
||||
let mut test = Test::new(2, 3);
|
||||
let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT;
|
||||
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
|
||||
|
||||
let priority = Priority::Normal;
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });
|
||||
|
||||
// Receive only two spawns.
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
@@ -655,7 +655,7 @@ mod tests {
|
||||
test.send_queue(ToQueue::Enqueue {
|
||||
priority: Priority::Critical,
|
||||
pvf: pvf(4),
|
||||
compilation_timeout,
|
||||
preparation_timeout,
|
||||
});
|
||||
|
||||
// 2 out of 2 are working, but there is a critical job incoming. That means that spawning
|
||||
@@ -666,12 +666,12 @@ mod tests {
|
||||
#[async_std::test]
|
||||
async fn cull_unwanted() {
|
||||
let mut test = Test::new(1, 2);
|
||||
let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT;
|
||||
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
|
||||
|
||||
test.send_queue(ToQueue::Enqueue {
|
||||
priority: Priority::Normal,
|
||||
pvf: pvf(1),
|
||||
compilation_timeout,
|
||||
preparation_timeout,
|
||||
});
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
let w1 = test.workers.insert(());
|
||||
@@ -682,7 +682,7 @@ mod tests {
|
||||
test.send_queue(ToQueue::Enqueue {
|
||||
priority: Priority::Critical,
|
||||
pvf: pvf(2),
|
||||
compilation_timeout,
|
||||
preparation_timeout,
|
||||
});
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
|
||||
@@ -701,10 +701,10 @@ mod tests {
|
||||
async fn worker_mass_die_out_doesnt_stall_queue() {
|
||||
let mut test = Test::new(2, 2);
|
||||
|
||||
let (priority, compilation_timeout) = (Priority::Normal, PRECHECK_COMPILATION_TIMEOUT);
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout });
|
||||
let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT);
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
|
||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });
|
||||
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
@@ -734,7 +734,7 @@ mod tests {
|
||||
test.send_queue(ToQueue::Enqueue {
|
||||
priority: Priority::Normal,
|
||||
pvf: pvf(1),
|
||||
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
|
||||
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
|
||||
});
|
||||
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
@@ -759,7 +759,7 @@ mod tests {
|
||||
test.send_queue(ToQueue::Enqueue {
|
||||
priority: Priority::Normal,
|
||||
pvf: pvf(1),
|
||||
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
|
||||
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
|
||||
});
|
||||
|
||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||
|
||||
@@ -65,7 +65,7 @@ pub async fn start_work(
|
||||
code: Arc<Vec<u8>>,
|
||||
cache_path: &Path,
|
||||
artifact_path: PathBuf,
|
||||
compilation_timeout: Duration,
|
||||
preparation_timeout: Duration,
|
||||
) -> Outcome {
|
||||
let IdleWorker { mut stream, pid } = worker;
|
||||
|
||||
@@ -100,7 +100,7 @@ pub async fn start_work(
|
||||
}
|
||||
|
||||
let selected =
|
||||
match async_std::future::timeout(compilation_timeout, framed_recv(&mut stream)).await {
|
||||
match async_std::future::timeout(preparation_timeout, framed_recv(&mut stream)).await {
|
||||
Ok(Ok(response_bytes)) => {
|
||||
// Received bytes from worker within the time limit.
|
||||
// By convention we expect encoded `PrepareResult`.
|
||||
|
||||
Reference in New Issue
Block a user