Introduce metrics into PVF validation host (#3603)

This commit is contained in:
Sergei Shulepov
2021-08-20 11:50:47 +02:00
committed by GitHub
parent 8792d0e407
commit ad0e42537d
13 changed files with 325 additions and 24 deletions
+21 -5
View File
@@ -20,6 +20,7 @@ use super::worker::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
@@ -95,6 +96,8 @@ enum QueueEvent {
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
struct Queue {
metrics: Metrics,
/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver<ToQueue>,
@@ -109,12 +112,14 @@ struct Queue {
impl Queue {
fn new(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
to_queue_rx: mpsc::Receiver<ToQueue>,
) -> Self {
Self {
metrics,
program_path,
spawn_timeout,
to_queue_rx,
@@ -141,12 +146,12 @@ impl Queue {
ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
}
purge_dead(&mut self.workers).await;
purge_dead(&self.metrics, &mut self.workers).await;
}
}
}
async fn purge_dead(workers: &mut Workers) {
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
let mut to_remove = vec![];
for (worker, data) in workers.running.iter_mut() {
if futures::poll!(&mut data.handle).is_ready() {
@@ -155,7 +160,9 @@ async fn purge_dead(workers: &mut Workers) {
}
}
for w in to_remove {
let _ = workers.running.remove(w);
if workers.running.remove(w).is_some() {
metrics.execute_worker().on_retired();
}
}
}
@@ -166,6 +173,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob { artifact, params, result_tx };
if let Some(available) = queue.workers.find_available() {
@@ -190,6 +198,7 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
}
fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
@@ -228,6 +237,7 @@ fn handle_job_finish(
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbigiousWorkerDeath))),
};
queue.metrics.execute_finished();
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
@@ -257,7 +267,9 @@ fn handle_job_finish(
}
} else {
// Note it's possible that the worker was purged already by `purge_dead`
queue.workers.running.remove(worker);
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
if !queue.queue.is_empty() {
// The worker has died and we still have work we have to do. Request an extra worker.
@@ -269,6 +281,7 @@ fn handle_job_finish(
}
fn spawn_extra_worker(queue: &mut Queue) {
queue.metrics.execute_worker().on_begin_spawn();
tracing::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
@@ -309,8 +322,10 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
thus claim_idle cannot return None;
qed.",
);
let execution_timer = queue.metrics.time_execution();
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
@@ -319,11 +334,12 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
}
pub fn start(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let run = Queue::new(program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
(to_queue_tx, run)
}