Refactor PVF preparation memory stats (#6693)

* Refactor PVF preparation memory stats

The original purpose of this change was to gate metrics that are unsupported by
some systems behind conditional compilation directives (#[cfg]); see
https://github.com/paritytech/polkadot/pull/6675#discussion_r1099996209.

Then I started doing some random cleanups and simplifications and got a bit
carried away. 🙈 The code should be overall tidier than before.

Changes:
- Don't register unsupported metrics (e.g. `max_rss` on non-Linux systems)
- Introduce `PrepareStats` struct as an abstraction over the `Ok` values of
  `PrepareResult`. It is cleaner, and can be easily modified in the future.
- Other small changes

* Minor fixes to comments

* Fix compile errors

* Try to fix some Linux errors

* Mep

* Fix candidate-validation tests

* Update docstring
This commit is contained in:
Marcin S
2023-02-14 16:48:57 +01:00
committed by GitHub
parent 9c35763017
commit fd70d01274
11 changed files with 158 additions and 147 deletions
@@ -24,7 +24,8 @@
#![warn(missing_docs)] #![warn(missing_docs)]
use polkadot_node_core_pvf::{ use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, Pvf, ValidationError, ValidationHost, InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError,
ValidationHost,
}; };
use polkadot_node_primitives::{ use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
@@ -654,7 +655,7 @@ trait ValidationBackend {
validation_result validation_result
} }
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>; async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError>;
} }
#[async_trait] #[async_trait]
@@ -680,7 +681,7 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
} }
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf, tx).await { if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host. // Return an IO error if there was an error communicating with the host.
@@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result result
} }
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> { async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<PrepareStats, PrepareError> {
unreachable!() unreachable!()
} }
} }
@@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() {
} }
struct MockPreCheckBackend { struct MockPreCheckBackend {
result: Result<Duration, PrepareError>, result: Result<PrepareStats, PrepareError>,
} }
impl MockPreCheckBackend { impl MockPreCheckBackend {
fn with_hardcoded_result(result: Result<Duration, PrepareError>) -> Self { fn with_hardcoded_result(result: Result<PrepareStats, PrepareError>) -> Self {
Self { result } Self { result }
} }
} }
@@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend {
unreachable!() unreachable!()
} }
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> { async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<PrepareStats, PrepareError> {
self.result.clone() self.result.clone()
} }
} }
@@ -931,7 +931,7 @@ fn precheck_works() {
let (check_fut, check_result) = precheck_pvf( let (check_fut, check_result) = precheck_pvf(
ctx.sender(), ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())), MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
relay_parent, relay_parent,
validation_code_hash, validation_code_hash,
) )
@@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() {
let (check_fut, check_result) = precheck_pvf( let (check_fut, check_result) = precheck_pvf(
ctx.sender(), ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())), MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
relay_parent, relay_parent,
validation_code_hash, validation_code_hash,
) )
+5 -5
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{error::PrepareError, host::PrepareResultSender}; use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats};
use always_assert::always; use always_assert::always;
use polkadot_parachain::primitives::ValidationCodeHash; use polkadot_parachain::primitives::ValidationCodeHash;
use std::{ use std::{
@@ -101,8 +101,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover /// This is updated when we get the heads up for this artifact or when we just discover
/// this file. /// this file.
last_time_needed: SystemTime, last_time_needed: SystemTime,
/// The CPU time that was taken preparing this artifact. /// Stats produced by successful preparation.
cpu_time_elapsed: Duration, prepare_stats: PrepareStats,
}, },
/// A task to prepare this artifact is scheduled. /// A task to prepare this artifact is scheduled.
Preparing { Preparing {
@@ -177,12 +177,12 @@ impl Artifacts {
&mut self, &mut self,
artifact_id: ArtifactId, artifact_id: ArtifactId,
last_time_needed: SystemTime, last_time_needed: SystemTime,
cpu_time_elapsed: Duration, prepare_stats: PrepareStats,
) { ) {
// See the precondition. // See the precondition.
always!(self always!(self
.artifacts .artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed }) .insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats })
.is_none()); .is_none());
} }
+4 -3
View File
@@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::prepare::PrepareStats;
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use std::{any::Any, fmt, time::Duration}; use std::{any::Any, fmt};
/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if /// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
/// successful /// successful
pub type PrepareResult = Result<Duration, PrepareError>; pub type PrepareResult = Result<PrepareStats, PrepareError>;
/// An error that occurred during the prepare part of the PVF pipeline. /// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)] #[derive(Debug, Clone, Encode, Decode)]
+17 -13
View File
@@ -456,9 +456,9 @@ async fn handle_precheck_pvf(
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state { match state {
ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => { ArtifactState::Prepared { last_time_needed, prepare_stats } => {
*last_time_needed = SystemTime::now(); *last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(*cpu_time_elapsed)); let _ = result_sender.send(Ok(prepare_stats.clone()));
}, },
ArtifactState::Preparing { waiting_for_response, num_failures: _ } => ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender), waiting_for_response.push(result_sender),
@@ -725,8 +725,8 @@ async fn handle_prepare_done(
} }
*state = match result { *state = match result {
Ok(cpu_time_elapsed) => Ok(prepare_stats) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats },
Err(error) => { Err(error) => {
let last_time_failed = SystemTime::now(); let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1; let num_failures = *num_failures + 1;
@@ -834,7 +834,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{InvalidCandidate, PrepareError}; use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures::future::BoxFuture; use futures::future::BoxFuture;
@@ -1056,8 +1056,12 @@ mod tests {
let mut builder = Builder::default(); let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100); builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500); builder.artifact_ttl = Duration::from_millis(500);
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default()); builder
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default()); .artifacts
.insert_prepared(artifact_id(1), mock_now, PrepareStats::default());
builder
.artifacts
.insert_prepared(artifact_id(2), mock_now, PrepareStats::default());
let mut test = builder.build(); let mut test = builder.build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1129,7 +1133,7 @@ mod tests {
test.from_prepare_queue_tx test.from_prepare_queue_tx
.send(prepare::FromQueue { .send(prepare::FromQueue {
artifact_id: artifact_id(1), artifact_id: artifact_id(1),
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}) })
.await .await
.unwrap(); .unwrap();
@@ -1145,7 +1149,7 @@ mod tests {
test.from_prepare_queue_tx test.from_prepare_queue_tx
.send(prepare::FromQueue { .send(prepare::FromQueue {
artifact_id: artifact_id(2), artifact_id: artifact_id(2),
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}) })
.await .await
.unwrap(); .unwrap();
@@ -1197,7 +1201,7 @@ mod tests {
test.from_prepare_queue_tx test.from_prepare_queue_tx
.send(prepare::FromQueue { .send(prepare::FromQueue {
artifact_id: artifact_id(1), artifact_id: artifact_id(1),
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}) })
.await .await
.unwrap(); .unwrap();
@@ -1304,7 +1308,7 @@ mod tests {
test.from_prepare_queue_tx test.from_prepare_queue_tx
.send(prepare::FromQueue { .send(prepare::FromQueue {
artifact_id: artifact_id(2), artifact_id: artifact_id(2),
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}) })
.await .await
.unwrap(); .unwrap();
@@ -1454,7 +1458,7 @@ mod tests {
test.from_prepare_queue_tx test.from_prepare_queue_tx
.send(prepare::FromQueue { .send(prepare::FromQueue {
artifact_id: artifact_id(1), artifact_id: artifact_id(1),
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}) })
.await .await
.unwrap(); .unwrap();
@@ -1630,7 +1634,7 @@ mod tests {
test.from_prepare_queue_tx test.from_prepare_queue_tx
.send(prepare::FromQueue { .send(prepare::FromQueue {
artifact_id: artifact_id(1), artifact_id: artifact_id(1),
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}) })
.await .await
.unwrap(); .unwrap();
+1
View File
@@ -108,6 +108,7 @@ pub mod testing;
pub use sp_tracing; pub use sp_tracing;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority; pub use priority::Priority;
pub use pvf::Pvf; pub use pvf::Pvf;
+22 -15
View File
@@ -16,6 +16,7 @@
//! Prometheus metrics related to the validation host. //! Prometheus metrics related to the validation host.
use crate::prepare::MemoryStats;
use polkadot_node_metrics::metrics::{self, prometheus}; use polkadot_node_metrics::metrics::{self, prometheus};
/// Validation host metrics. /// Validation host metrics.
@@ -73,24 +74,24 @@ impl Metrics {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
} }
/// Observe max_rss for preparation. /// Observe memory stats for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) { #[allow(unused_variables)]
pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) {
if let Some(metrics) = &self.0 { if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss); #[cfg(target_os = "linux")]
} if let Some(max_rss) = memory_stats.max_rss {
} metrics.preparation_max_rss.observe(max_rss as f64);
}
/// Observe max resident memory for preparation. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) { if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
if let Some(metrics) = &self.0 { // We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
metrics.preparation_max_resident.observe(max_resident_kb); let max_resident_kb = (tracker_stats.resident / 1024) as f64;
} let max_allocated_kb = (tracker_stats.allocated / 1024) as f64;
}
/// Observe max allocated memory for preparation. metrics.preparation_max_resident.observe(max_resident_kb);
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) { metrics.preparation_max_allocated.observe(max_allocated_kb);
if let Some(metrics) = &self.0 { }
metrics.preparation_max_allocated.observe(max_allocated_kb);
} }
} }
} }
@@ -106,8 +107,11 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>, execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram, preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram, execution_time: prometheus::Histogram,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::Histogram, preparation_max_rss: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::Histogram, preparation_max_allocated: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::Histogram, preparation_max_resident: prometheus::Histogram,
} }
@@ -226,6 +230,7 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::register( preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts( prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new( prometheus::HistogramOpts::new(
@@ -238,6 +243,7 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::register( preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts( prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new( prometheus::HistogramOpts::new(
@@ -250,6 +256,7 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::register( preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts( prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new( prometheus::HistogramOpts::new(
@@ -27,18 +27,18 @@
//! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more //! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
//! background. //! background.
use crate::{metrics::Metrics, LOG_TARGET};
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use std::io;
/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
/// supported by the OS, `ru_maxrss`. /// supported by the OS, `ru_maxrss`.
#[derive(Encode, Decode)] #[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryStats { pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`. /// Memory stats from `tikv_jemalloc_ctl`.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub memory_tracker_stats: Option<MemoryAllocationStats>, pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able. /// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
pub max_rss: Option<Result<i64, String>>, #[cfg(target_os = "linux")]
pub max_rss: Option<i64>,
} }
/// Statistics of collected memory metrics. /// Statistics of collected memory metrics.
@@ -51,44 +51,14 @@ pub struct MemoryAllocationStats {
pub allocated: u64, pub allocated: u64,
} }
/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just /// Module for the memory tracker. The memory tracker runs in its own thread, where it polls memory
/// returns `None`. /// usage at an interval.
pub fn get_max_rss_thread() -> Option<io::Result<i64>> { ///
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. /// NOTE: Requires jemalloc enabled.
#[cfg(target_os = "linux")]
let max_rss = Some(getrusage::getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
#[cfg(not(target_os = "linux"))]
let max_rss = None;
max_rss
}
/// Helper function to send the memory metrics, if available, to prometheus.
pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) {
if let Some(max_rss) = memory_stats.max_rss {
match max_rss {
Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64),
Err(err) => gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"error getting `ru_maxrss` in preparation thread: {}",
err
),
}
}
if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let resident_kb = (tracker_stats.resident / 1024) as f64;
let allocated_kb = (tracker_stats.allocated / 1024) as f64;
metrics.observe_preparation_max_resident(resident_kb);
metrics.observe_preparation_max_allocated(allocated_kb);
}
}
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub mod memory_tracker { pub mod memory_tracker {
use super::*; use super::*;
use crate::LOG_TARGET;
use std::{ use std::{
sync::mpsc::{Receiver, RecvTimeoutError, Sender}, sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration, time::Duration,
@@ -183,13 +153,15 @@ pub mod memory_tracker {
pub async fn get_memory_tracker_loop_stats( pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>, fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>, tx: Sender<()>,
worker_pid: u32,
) -> Option<MemoryAllocationStats> { ) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate. // Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) { if let Err(err) = tx.send(()) {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), %worker_pid,
"worker: error sending signal to memory tracker_thread: {}", err "worker: error sending signal to memory tracker_thread: {}",
err
); );
None None
} else { } else {
@@ -199,7 +171,7 @@ pub mod memory_tracker {
Ok(Err(err)) => { Ok(Err(err)) => {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), %worker_pid,
"worker: error occurred in the memory tracker thread: {}", err "worker: error occurred in the memory tracker thread: {}", err
); );
None None
@@ -207,7 +179,7 @@ pub mod memory_tracker {
Err(err) => { Err(err) => {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), %worker_pid,
"worker: error joining on memory tracker thread: {}", err "worker: error joining on memory tracker thread: {}", err
); );
None None
@@ -217,13 +189,19 @@ pub mod memory_tracker {
} }
} }
/// Module for dealing with the `ru_maxrss` (peak resident memory) stat from `getrusage`.
///
/// NOTE: `getrusage` with the `RUSAGE_THREAD` parameter is only supported on Linux. `RUSAGE_SELF`
/// works on MacOS, but we need to get the max rss only for the preparation thread. Gettng it for
/// the current process would conflate the stats of previous jobs run by the process.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
mod getrusage { pub mod max_rss_stat {
use crate::LOG_TARGET;
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};
use std::io; use std::io;
/// Get the rusage stats for the current thread. /// Get the rusage stats for the current thread.
pub fn getrusage_thread() -> io::Result<rusage> { fn getrusage_thread() -> io::Result<rusage> {
let mut result = rusage { let mut result = rusage {
ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
@@ -247,4 +225,25 @@ mod getrusage {
} }
Ok(result) Ok(result)
} }
/// Gets the `ru_maxrss` for the current thread.
pub fn get_max_rss_thread() -> io::Result<i64> {
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))
}
/// Extracts the max_rss stat and logs any error.
pub fn extract_max_rss_stat(max_rss: io::Result<i64>, worker_pid: u32) -> Option<i64> {
max_rss
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"error getting `ru_maxrss` in preparation thread: {}",
err
);
err
})
.ok()
}
} }
+10
View File
@@ -27,6 +27,16 @@ mod pool;
mod queue; mod queue;
mod worker; mod worker;
pub use memory_stats::MemoryStats;
pub use pool::start as start_pool; pub use pool::start as start_pool;
pub use queue::{start as start_queue, FromQueue, ToQueue}; pub use queue::{start as start_queue, FromQueue, ToQueue};
pub use worker::worker_entrypoint; pub use worker::worker_entrypoint;
use parity_scale_codec::{Decode, Encode};
/// Preparation statistics, including the CPU time and memory taken.
#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct PrepareStats {
cpu_time_elapsed: std::time::Duration,
memory_stats: MemoryStats,
}
+5 -4
View File
@@ -495,6 +495,7 @@ mod tests {
use crate::{ use crate::{
error::PrepareError, error::PrepareError,
host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT}, host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT},
prepare::PrepareStats,
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
@@ -622,7 +623,7 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded { test.send_from_pool(pool::FromPool::Concluded {
worker: w, worker: w,
rip: false, rip: false,
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}); });
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
@@ -660,7 +661,7 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded { test.send_from_pool(pool::FromPool::Concluded {
worker: w1, worker: w1,
rip: false, rip: false,
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}); });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
@@ -710,7 +711,7 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded { test.send_from_pool(pool::FromPool::Concluded {
worker: w1, worker: w1,
rip: false, rip: false,
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}); });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
} }
@@ -746,7 +747,7 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded { test.send_from_pool(pool::FromPool::Concluded {
worker: w1, worker: w1,
rip: true, rip: true,
result: Ok(Duration::default()), result: Ok(PrepareStats::default()),
}); });
// Since there is still work, the queue requested one extra worker to spawn to handle the // Since there is still work, the queue requested one extra worker to spawn to handle the
+40 -53
View File
@@ -14,13 +14,16 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
#[cfg(target_os = "linux")]
use super::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use super::memory_stats::{get_max_rss_thread, observe_memory_metrics, MemoryStats}; use super::memory_stats::MemoryStats;
use crate::{ use crate::{
artifacts::CompiledArtifact, artifacts::CompiledArtifact,
error::{PrepareError, PrepareResult}, error::{PrepareError, PrepareResult},
metrics::Metrics, metrics::Metrics,
prepare::PrepareStats,
worker_common::{ worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
@@ -118,12 +121,11 @@ pub async fn start_work(
match result { match result {
// Received bytes from worker within the time limit. // Received bytes from worker within the time limit.
Ok(Ok((prepare_result, memory_stats))) => Ok(Ok(prepare_result)) =>
handle_response( handle_response(
metrics, metrics,
IdleWorker { stream, pid }, IdleWorker { stream, pid },
prepare_result, prepare_result,
memory_stats,
pid, pid,
tmp_file, tmp_file,
artifact_path, artifact_path,
@@ -162,13 +164,12 @@ async fn handle_response(
metrics: &Metrics, metrics: &Metrics,
worker: IdleWorker, worker: IdleWorker,
result: PrepareResult, result: PrepareResult,
memory_stats: Option<MemoryStats>, worker_pid: u32,
pid: u32,
tmp_file: PathBuf, tmp_file: PathBuf,
artifact_path: PathBuf, artifact_path: PathBuf,
preparation_timeout: Duration, preparation_timeout: Duration,
) -> Outcome { ) -> Outcome {
let cpu_time_elapsed = match result { let PrepareStats { cpu_time_elapsed, memory_stats } = match result.clone() {
Ok(result) => result, Ok(result) => result,
// Timed out on the child. This should already be logged by the child. // Timed out on the child. This should already be logged by the child.
Err(PrepareError::TimedOut) => return Outcome::TimedOut, Err(PrepareError::TimedOut) => return Outcome::TimedOut,
@@ -179,7 +180,7 @@ async fn handle_response(
// The job didn't complete within the timeout. // The job didn't complete within the timeout.
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %pid, %worker_pid,
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}", "prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
cpu_time_elapsed.as_millis(), cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(), preparation_timeout.as_millis(),
@@ -190,7 +191,7 @@ async fn handle_response(
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %pid, %worker_pid,
"promoting WIP artifact {} to {}", "promoting WIP artifact {} to {}",
tmp_file.display(), tmp_file.display(),
artifact_path.display(), artifact_path.display(),
@@ -201,7 +202,7 @@ async fn handle_response(
Err(err) => { Err(err) => {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %pid, %worker_pid,
"failed to rename the artifact from {} to {}: {:?}", "failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(), tmp_file.display(),
artifact_path.display(), artifact_path.display(),
@@ -213,9 +214,7 @@ async fn handle_response(
// If there were no errors up until now, log the memory stats for a successful preparation, if // If there were no errors up until now, log the memory stats for a successful preparation, if
// available. // available.
if let Some(memory_stats) = memory_stats { metrics.observe_preparation_memory_metrics(memory_stats);
observe_memory_metrics(metrics, memory_stats, pid);
}
outcome outcome
} }
@@ -299,19 +298,11 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf,
Ok((code, tmp_file, preparation_timeout)) Ok((code, tmp_file, preparation_timeout))
} }
async fn send_response( async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
stream: &mut UnixStream, framed_send(stream, &result.encode()).await
result: PrepareResult,
memory_stats: Option<MemoryStats>,
) -> io::Result<()> {
framed_send(stream, &result.encode()).await?;
framed_send(stream, &memory_stats.encode()).await
} }
async fn recv_response( async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareResult> {
stream: &mut UnixStream,
pid: u32,
) -> io::Result<(PrepareResult, Option<MemoryStats>)> {
let result = framed_recv(stream).await?; let result = framed_recv(stream).await?;
let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { let result = PrepareResult::decode(&mut &result[..]).map_err(|e| {
// We received invalid bytes from the worker. // We received invalid bytes from the worker.
@@ -327,14 +318,7 @@ async fn recv_response(
format!("prepare pvf recv_response: failed to decode result: {:?}", e), format!("prepare pvf recv_response: failed to decode result: {:?}", e),
) )
})?; })?;
let memory_stats = framed_recv(stream).await?; Ok(result)
let memory_stats = Option::<MemoryStats>::decode(&mut &memory_stats[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e),
)
})?;
Ok((result, memory_stats))
} }
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
@@ -362,10 +346,11 @@ async fn recv_response(
pub fn worker_entrypoint(socket_path: &str) { pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
loop { loop {
let worker_pid = std::process::id();
let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; let (code, dest, preparation_timeout) = recv_request(&mut stream).await?;
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), %worker_pid,
"worker: preparing artifact", "worker: preparing artifact",
); );
@@ -387,28 +372,29 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn another thread for preparation. // Spawn another thread for preparation.
let prepare_fut = rt_handle let prepare_fut = rt_handle
.spawn_blocking(move || { .spawn_blocking(move || {
let prepare_result = prepare_artifact(&code); let result = prepare_artifact(&code);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread. // Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
let max_rss = get_max_rss_thread(); #[cfg(target_os = "linux")]
let result = result.map(|artifact| (artifact, get_max_rss_thread()));
(prepare_result, max_rss) result
}) })
.fuse(); .fuse();
pin_mut!(cpu_time_monitor_fut); pin_mut!(cpu_time_monitor_fut);
pin_mut!(prepare_fut); pin_mut!(prepare_fut);
let (result, memory_stats) = select_biased! { let result = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will // If this future is not selected, the join handle is dropped and the thread will
// finish in the background. // finish in the background.
join_res = cpu_time_monitor_fut => { join_res = cpu_time_monitor_fut => {
let result = match join_res { match join_res {
Ok(Some(cpu_time_elapsed)) => { Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished. // Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), %worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms", "prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_time_elapsed.as_millis(), cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(), preparation_timeout.as_millis(),
@@ -417,28 +403,29 @@ pub fn worker_entrypoint(socket_path: &str) {
}, },
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
Err(err) => Err(PrepareError::IoErr(err.to_string())), Err(err) => Err(PrepareError::IoErr(err.to_string())),
}; }
(result, None)
}, },
compilation_res = prepare_fut => { prepare_res = prepare_fut => {
let cpu_time_elapsed = cpu_time_start.elapsed(); let cpu_time_elapsed = cpu_time_start.elapsed();
let _ = cpu_time_monitor_tx.send(()); let _ = cpu_time_monitor_tx.send(());
match compilation_res.unwrap_or_else(|err| (Err(PrepareError::IoErr(err.to_string())), None)) { match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
(Err(err), _) => { Err(err) => {
// Serialized error will be written into the socket. // Serialized error will be written into the socket.
(Err(err), None) Err(err)
}, },
(Ok(compiled_artifact), max_rss) => { Ok(ok) => {
// Stop the memory stats worker and get its observed memory stats. // Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = let memory_tracker_stats =
get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await; get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await;
#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))] #[cfg(target_os = "linux")]
let memory_tracker_stats = None; let (ok, max_rss) = ok;
let memory_stats = MemoryStats { let memory_stats = MemoryStats {
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
memory_tracker_stats, memory_tracker_stats,
max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())), #[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, worker_pid),
}; };
// Write the serialized artifact into a temp file. // Write the serialized artifact into a temp file.
@@ -450,19 +437,19 @@ pub fn worker_entrypoint(socket_path: &str) {
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), %worker_pid,
"worker: writing artifact to {}", "worker: writing artifact to {}",
dest.display(), dest.display(),
); );
tokio::fs::write(&dest, &compiled_artifact).await?; tokio::fs::write(&dest, &ok).await?;
(Ok(cpu_time_elapsed), Some(memory_stats)) Ok(PrepareStats{cpu_time_elapsed, memory_stats})
}, },
} }
}, },
}; };
send_response(&mut stream, result, memory_stats).await?; send_response(&mut stream, result).await?;
} }
}); });
} }