mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 17:31:03 +00:00
pvf: Log memory metrics from preparation (#6565)
* Add getrusage and memory tracker for precheck preparation * Log memory stats metrics after prechecking * Fix tests * Try to fix errors (linux-only so I'm relying on CI here) * Try to fix CI * Add module docs for `prepare/memory_stats.rs`; fix CI error * Report memory stats for all preparation jobs * Use `RUSAGE_SELF` instead of `RUSAGE_THREAD` Not sure why I did that -- was a brainfart on my end. * Revert last commit (RUSAGE_THREAD is correct) * Use exponential buckets * Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS * Increase poll interval * Revert "Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS" This reverts commit becf7a815409ab530fc61370abffcd1b97b9a777.
This commit is contained in:
Generated
+2
@@ -6981,6 +6981,7 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"futures-timer",
|
"futures-timer",
|
||||||
"hex-literal",
|
"hex-literal",
|
||||||
|
"libc",
|
||||||
"parity-scale-codec",
|
"parity-scale-codec",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"polkadot-core-primitives",
|
"polkadot-core-primitives",
|
||||||
@@ -7001,6 +7002,7 @@ dependencies = [
|
|||||||
"tempfile",
|
"tempfile",
|
||||||
"test-parachain-adder",
|
"test-parachain-adder",
|
||||||
"test-parachain-halt",
|
"test-parachain-halt",
|
||||||
|
"tikv-jemalloc-ctl",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing-gum",
|
"tracing-gum",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -14,13 +14,14 @@ assert_matches = "1.4.0"
|
|||||||
cpu-time = "1.0.0"
|
cpu-time = "1.0.0"
|
||||||
futures = "0.3.21"
|
futures = "0.3.21"
|
||||||
futures-timer = "3.0.2"
|
futures-timer = "3.0.2"
|
||||||
slotmap = "1.0"
|
|
||||||
gum = { package = "tracing-gum", path = "../../gum" }
|
gum = { package = "tracing-gum", path = "../../gum" }
|
||||||
pin-project = "1.0.9"
|
pin-project = "1.0.9"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
tempfile = "3.3.0"
|
|
||||||
tokio = { version = "1.24.2", features = ["fs", "process"] }
|
|
||||||
rayon = "1.5.1"
|
rayon = "1.5.1"
|
||||||
|
slotmap = "1.0"
|
||||||
|
tempfile = "3.3.0"
|
||||||
|
tikv-jemalloc-ctl = "0.5.0"
|
||||||
|
tokio = { version = "1.24.2", features = ["fs", "process"] }
|
||||||
|
|
||||||
parity-scale-codec = { version = "3.3.0", default-features = false, features = ["derive"] }
|
parity-scale-codec = { version = "3.3.0", default-features = false, features = ["derive"] }
|
||||||
|
|
||||||
@@ -38,8 +39,11 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
|
|||||||
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
|
||||||
|
[target.'cfg(target_os = "linux")'.dependencies]
|
||||||
|
libc = "0.2.139"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
|
adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
|
||||||
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
|
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
|
||||||
hex-literal = "0.3.4"
|
hex-literal = "0.3.4"
|
||||||
tempfile = "3.2.0"
|
tempfile = "3.3.0"
|
||||||
|
|||||||
@@ -845,7 +845,7 @@ mod tests {
|
|||||||
let pulse = pulse_every(Duration::from_millis(100));
|
let pulse = pulse_every(Duration::from_millis(100));
|
||||||
futures::pin_mut!(pulse);
|
futures::pin_mut!(pulse);
|
||||||
|
|
||||||
for _ in 0usize..5usize {
|
for _ in 0..5 {
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
let _ = pulse.next().await.unwrap();
|
let _ = pulse.next().await.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -72,6 +72,27 @@ impl Metrics {
|
|||||||
pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||||
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
|
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Observe max_rss for preparation.
|
||||||
|
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
|
||||||
|
if let Some(metrics) = &self.0 {
|
||||||
|
metrics.preparation_max_rss.observe(max_rss);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Observe max resident memory for preparation.
|
||||||
|
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
|
||||||
|
if let Some(metrics) = &self.0 {
|
||||||
|
metrics.preparation_max_resident.observe(max_resident_kb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Observe max allocated memory for preparation.
|
||||||
|
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
|
||||||
|
if let Some(metrics) = &self.0 {
|
||||||
|
metrics.preparation_max_allocated.observe(max_allocated_kb);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -85,6 +106,9 @@ struct MetricsInner {
|
|||||||
execute_finished: prometheus::Counter<prometheus::U64>,
|
execute_finished: prometheus::Counter<prometheus::U64>,
|
||||||
preparation_time: prometheus::Histogram,
|
preparation_time: prometheus::Histogram,
|
||||||
execution_time: prometheus::Histogram,
|
execution_time: prometheus::Histogram,
|
||||||
|
preparation_max_rss: prometheus::Histogram,
|
||||||
|
preparation_max_allocated: prometheus::Histogram,
|
||||||
|
preparation_max_resident: prometheus::Histogram,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl metrics::Metrics for Metrics {
|
impl metrics::Metrics for Metrics {
|
||||||
@@ -202,6 +226,42 @@ impl metrics::Metrics for Metrics {
|
|||||||
)?,
|
)?,
|
||||||
registry,
|
registry,
|
||||||
)?,
|
)?,
|
||||||
|
preparation_max_rss: prometheus::register(
|
||||||
|
prometheus::Histogram::with_opts(
|
||||||
|
prometheus::HistogramOpts::new(
|
||||||
|
"polkadot_pvf_preparation_max_rss",
|
||||||
|
"ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)",
|
||||||
|
).buckets(
|
||||||
|
prometheus::exponential_buckets(8192.0, 2.0, 10)
|
||||||
|
.expect("arguments are always valid; qed"),
|
||||||
|
),
|
||||||
|
)?,
|
||||||
|
registry,
|
||||||
|
)?,
|
||||||
|
preparation_max_resident: prometheus::register(
|
||||||
|
prometheus::Histogram::with_opts(
|
||||||
|
prometheus::HistogramOpts::new(
|
||||||
|
"polkadot_pvf_preparation_max_resident",
|
||||||
|
"max resident memory observed for preparation (in kilobytes)",
|
||||||
|
).buckets(
|
||||||
|
prometheus::exponential_buckets(8192.0, 2.0, 10)
|
||||||
|
.expect("arguments are always valid; qed"),
|
||||||
|
),
|
||||||
|
)?,
|
||||||
|
registry,
|
||||||
|
)?,
|
||||||
|
preparation_max_allocated: prometheus::register(
|
||||||
|
prometheus::Histogram::with_opts(
|
||||||
|
prometheus::HistogramOpts::new(
|
||||||
|
"polkadot_pvf_preparation_max_allocated",
|
||||||
|
"max allocated memory observed for preparation (in kilobytes)",
|
||||||
|
).buckets(
|
||||||
|
prometheus::exponential_buckets(8192.0, 2.0, 10)
|
||||||
|
.expect("arguments are always valid; qed"),
|
||||||
|
),
|
||||||
|
)?,
|
||||||
|
registry,
|
||||||
|
)?,
|
||||||
};
|
};
|
||||||
Ok(Metrics(Some(inner)))
|
Ok(Metrics(Some(inner)))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,243 @@
|
|||||||
|
// Copyright 2023 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Memory stats for preparation.
|
||||||
|
//!
|
||||||
|
//! Right now we gather three measurements:
|
||||||
|
//!
|
||||||
|
//! - `ru_maxrss` (resident set size) from `getrusage`.
|
||||||
|
//! - `resident` memory stat provided by `tikv-malloc-ctl`.
|
||||||
|
//! - `allocated` memory stat also from `tikv-malloc-ctl`.
|
||||||
|
//!
|
||||||
|
//! Currently we are only logging these for the purposes of gathering data. In the future, we may
|
||||||
|
//! use these stats to reject PVFs during pre-checking. See
|
||||||
|
//! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
|
||||||
|
//! background.
|
||||||
|
|
||||||
|
use crate::{metrics::Metrics, LOG_TARGET};
|
||||||
|
use parity_scale_codec::{Decode, Encode};
|
||||||
|
use std::{
|
||||||
|
io,
|
||||||
|
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use tikv_jemalloc_ctl::{epoch, stats, Error};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};
|
||||||
|
|
||||||
|
/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
|
||||||
|
/// supported by the OS, `ru_maxrss`.
|
||||||
|
#[derive(Encode, Decode)]
|
||||||
|
pub struct MemoryStats {
|
||||||
|
/// Memory stats from `tikv_jemalloc_ctl`.
|
||||||
|
pub memory_tracker_stats: Option<MemoryAllocationStats>,
|
||||||
|
/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
|
||||||
|
pub max_rss: Option<Result<i64, String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Statistics of collected memory metrics.
|
||||||
|
#[non_exhaustive]
|
||||||
|
#[derive(Clone, Debug, Default, Encode, Decode)]
|
||||||
|
pub struct MemoryAllocationStats {
|
||||||
|
/// Total resident memory, in bytes.
|
||||||
|
pub resident: u64,
|
||||||
|
/// Total allocated memory, in bytes.
|
||||||
|
pub allocated: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct MemoryAllocationTracker {
|
||||||
|
epoch: tikv_jemalloc_ctl::epoch_mib,
|
||||||
|
allocated: stats::allocated_mib,
|
||||||
|
resident: stats::resident_mib,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MemoryAllocationTracker {
|
||||||
|
pub fn new() -> Result<Self, Error> {
|
||||||
|
Ok(Self {
|
||||||
|
epoch: epoch::mib()?,
|
||||||
|
allocated: stats::allocated::mib()?,
|
||||||
|
resident: stats::resident::mib()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
|
||||||
|
// update stats by advancing the allocation epoch
|
||||||
|
self.epoch.advance()?;
|
||||||
|
|
||||||
|
// Convert to `u64`, as `usize` is not `Encode`able.
|
||||||
|
let allocated = self.allocated.read()? as u64;
|
||||||
|
let resident = self.resident.read()? as u64;
|
||||||
|
Ok(MemoryAllocationStats { allocated, resident })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the rusage stats for the current thread.
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn getrusage_thread() -> io::Result<rusage> {
|
||||||
|
let mut result = rusage {
|
||||||
|
ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
|
||||||
|
ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
|
||||||
|
ru_maxrss: 0,
|
||||||
|
ru_ixrss: 0,
|
||||||
|
ru_idrss: 0,
|
||||||
|
ru_isrss: 0,
|
||||||
|
ru_minflt: 0,
|
||||||
|
ru_majflt: 0,
|
||||||
|
ru_nswap: 0,
|
||||||
|
ru_inblock: 0,
|
||||||
|
ru_oublock: 0,
|
||||||
|
ru_msgsnd: 0,
|
||||||
|
ru_msgrcv: 0,
|
||||||
|
ru_nsignals: 0,
|
||||||
|
ru_nvcsw: 0,
|
||||||
|
ru_nivcsw: 0,
|
||||||
|
};
|
||||||
|
if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 {
|
||||||
|
return Err(io::Error::last_os_error())
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just
|
||||||
|
/// returns `None`.
|
||||||
|
pub fn get_max_rss_thread() -> Option<io::Result<i64>> {
|
||||||
|
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
|
||||||
|
#[cfg(not(target_os = "linux"))]
|
||||||
|
let max_rss = None;
|
||||||
|
max_rss
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs a thread in the background that observes memory statistics. The goal is to try to get
|
||||||
|
/// accurate stats during preparation.
|
||||||
|
///
|
||||||
|
/// # Algorithm
|
||||||
|
///
|
||||||
|
/// 1. Create the memory tracker.
|
||||||
|
///
|
||||||
|
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
|
||||||
|
/// allocation epoch.
|
||||||
|
///
|
||||||
|
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
|
||||||
|
/// the maximum observed values.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
|
||||||
|
/// are used for informational purposes (logging) only.
|
||||||
|
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
|
||||||
|
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
|
||||||
|
// Apart from that, there is not really a science to this number.
|
||||||
|
const POLL_INTERVAL: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
|
let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
|
||||||
|
let mut max_stats = MemoryAllocationStats::default();
|
||||||
|
|
||||||
|
let mut update_stats = || -> Result<(), String> {
|
||||||
|
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
|
||||||
|
if current_stats.resident > max_stats.resident {
|
||||||
|
max_stats.resident = current_stats.resident;
|
||||||
|
}
|
||||||
|
if current_stats.allocated > max_stats.allocated {
|
||||||
|
max_stats.allocated = current_stats.allocated;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Take a snapshot and update the max stats.
|
||||||
|
update_stats()?;
|
||||||
|
|
||||||
|
// Sleep.
|
||||||
|
match finished_rx.recv_timeout(POLL_INTERVAL) {
|
||||||
|
// Received finish signal.
|
||||||
|
Ok(()) => {
|
||||||
|
update_stats()?;
|
||||||
|
return Ok(max_stats)
|
||||||
|
},
|
||||||
|
// Timed out, restart loop.
|
||||||
|
Err(RecvTimeoutError::Timeout) => continue,
|
||||||
|
Err(RecvTimeoutError::Disconnected) =>
|
||||||
|
return Err("memory_tracker_loop: finished_rx disconnected".into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
|
||||||
|
/// error handling.
|
||||||
|
pub async fn get_memory_tracker_loop_stats(
|
||||||
|
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
|
||||||
|
tx: Sender<()>,
|
||||||
|
) -> Option<MemoryAllocationStats> {
|
||||||
|
// Signal to the memory tracker thread to terminate.
|
||||||
|
if let Err(err) = tx.send(()) {
|
||||||
|
gum::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %std::process::id(),
|
||||||
|
"worker: error sending signal to memory tracker_thread: {}", err
|
||||||
|
);
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
// Join on the thread handle.
|
||||||
|
match fut.await {
|
||||||
|
Ok(Ok(stats)) => Some(stats),
|
||||||
|
Ok(Err(err)) => {
|
||||||
|
gum::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %std::process::id(),
|
||||||
|
"worker: error occurred in the memory tracker thread: {}", err
|
||||||
|
);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
gum::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %std::process::id(),
|
||||||
|
"worker: error joining on memory tracker thread: {}", err
|
||||||
|
);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to send the memory metrics, if available, to prometheus.
|
||||||
|
pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) {
|
||||||
|
if let Some(max_rss) = memory_stats.max_rss {
|
||||||
|
match max_rss {
|
||||||
|
Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64),
|
||||||
|
Err(err) => gum::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %pid,
|
||||||
|
"error getting `ru_maxrss` in preparation thread: {}",
|
||||||
|
err
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
|
||||||
|
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
|
||||||
|
let resident_kb = (tracker_stats.resident / 1024) as f64;
|
||||||
|
let allocated_kb = (tracker_stats.allocated / 1024) as f64;
|
||||||
|
|
||||||
|
metrics.observe_preparation_max_resident(resident_kb);
|
||||||
|
metrics.observe_preparation_max_allocated(allocated_kb);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
// Copyright 2021 Parity Technologies (UK) Ltd.
|
// Copyright 2021-2023 Parity Technologies (UK) Ltd.
|
||||||
// This file is part of Polkadot.
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
// Polkadot is free software: you can redistribute it and/or modify
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
@@ -22,6 +22,7 @@
|
|||||||
//! The pool will spawn workers in new processes and those should execute pass control to
|
//! The pool will spawn workers in new processes and those should execute pass control to
|
||||||
//! [`worker_entrypoint`].
|
//! [`worker_entrypoint`].
|
||||||
|
|
||||||
|
mod memory_stats;
|
||||||
mod pool;
|
mod pool;
|
||||||
mod queue;
|
mod queue;
|
||||||
mod worker;
|
mod worker;
|
||||||
|
|||||||
@@ -220,6 +220,7 @@ fn handle_to_pool(
|
|||||||
let preparation_timer = metrics.time_preparation();
|
let preparation_timer = metrics.time_preparation();
|
||||||
mux.push(
|
mux.push(
|
||||||
start_work_task(
|
start_work_task(
|
||||||
|
metrics.clone(),
|
||||||
worker,
|
worker,
|
||||||
idle,
|
idle,
|
||||||
code,
|
code,
|
||||||
@@ -268,6 +269,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn start_work_task<Timer>(
|
async fn start_work_task<Timer>(
|
||||||
|
metrics: Metrics,
|
||||||
worker: Worker,
|
worker: Worker,
|
||||||
idle: IdleWorker,
|
idle: IdleWorker,
|
||||||
code: Arc<Vec<u8>>,
|
code: Arc<Vec<u8>>,
|
||||||
@@ -277,7 +279,8 @@ async fn start_work_task<Timer>(
|
|||||||
_preparation_timer: Option<Timer>,
|
_preparation_timer: Option<Timer>,
|
||||||
) -> PoolEvent {
|
) -> PoolEvent {
|
||||||
let outcome =
|
let outcome =
|
||||||
worker::start_work(idle, code, &cache_path, artifact_path, preparation_timeout).await;
|
worker::start_work(&metrics, idle, code, &cache_path, artifact_path, preparation_timeout)
|
||||||
|
.await;
|
||||||
PoolEvent::StartWork(worker, outcome)
|
PoolEvent::StartWork(worker, outcome)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -492,7 +492,10 @@ pub fn start(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT};
|
use crate::{
|
||||||
|
error::PrepareError,
|
||||||
|
host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT},
|
||||||
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use futures::{future::BoxFuture, FutureExt};
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
use slotmap::SlotMap;
|
use slotmap::SlotMap;
|
||||||
@@ -628,12 +631,17 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn dont_spawn_over_soft_limit_unless_critical() {
|
async fn dont_spawn_over_soft_limit_unless_critical() {
|
||||||
let mut test = Test::new(2, 3);
|
let mut test = Test::new(2, 3);
|
||||||
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
|
|
||||||
|
|
||||||
let priority = Priority::Normal;
|
let priority = Priority::Normal;
|
||||||
|
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
|
||||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
|
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
|
||||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
|
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
|
||||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });
|
// Start a non-precheck preparation for this one.
|
||||||
|
test.send_queue(ToQueue::Enqueue {
|
||||||
|
priority,
|
||||||
|
pvf: pvf(3),
|
||||||
|
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
|
||||||
|
});
|
||||||
|
|
||||||
// Receive only two spawns.
|
// Receive only two spawns.
|
||||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||||
@@ -711,10 +719,16 @@ mod tests {
|
|||||||
async fn worker_mass_die_out_doesnt_stall_queue() {
|
async fn worker_mass_die_out_doesnt_stall_queue() {
|
||||||
let mut test = Test::new(2, 2);
|
let mut test = Test::new(2, 2);
|
||||||
|
|
||||||
let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT);
|
let priority = Priority::Normal;
|
||||||
|
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
|
||||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
|
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
|
||||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
|
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
|
||||||
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });
|
// Start a non-precheck preparation for this one.
|
||||||
|
test.send_queue(ToQueue::Enqueue {
|
||||||
|
priority,
|
||||||
|
pvf: pvf(3),
|
||||||
|
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
|
||||||
|
});
|
||||||
|
|
||||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||||
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
|
||||||
|
|||||||
@@ -14,9 +14,14 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
use super::memory_stats::{
|
||||||
|
get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics,
|
||||||
|
MemoryStats,
|
||||||
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
artifacts::CompiledArtifact,
|
artifacts::CompiledArtifact,
|
||||||
error::{PrepareError, PrepareResult},
|
error::{PrepareError, PrepareResult},
|
||||||
|
metrics::Metrics,
|
||||||
worker_common::{
|
worker_common::{
|
||||||
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
|
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
|
||||||
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
|
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
|
||||||
@@ -73,6 +78,7 @@ pub enum Outcome {
|
|||||||
/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process
|
/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process
|
||||||
/// being killed.
|
/// being killed.
|
||||||
pub async fn start_work(
|
pub async fn start_work(
|
||||||
|
metrics: &Metrics,
|
||||||
worker: IdleWorker,
|
worker: IdleWorker,
|
||||||
code: Arc<Vec<u8>>,
|
code: Arc<Vec<u8>>,
|
||||||
cache_path: &Path,
|
cache_path: &Path,
|
||||||
@@ -109,14 +115,16 @@ pub async fn start_work(
|
|||||||
// load, but the CPU resources of the child can only be measured from the parent after the
|
// load, but the CPU resources of the child can only be measured from the parent after the
|
||||||
// child process terminates.
|
// child process terminates.
|
||||||
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
|
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
|
||||||
let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await;
|
let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
// Received bytes from worker within the time limit.
|
// Received bytes from worker within the time limit.
|
||||||
Ok(Ok(response_bytes)) =>
|
Ok(Ok((prepare_result, memory_stats))) =>
|
||||||
handle_response_bytes(
|
handle_response(
|
||||||
|
metrics,
|
||||||
IdleWorker { stream, pid },
|
IdleWorker { stream, pid },
|
||||||
response_bytes,
|
prepare_result,
|
||||||
|
memory_stats,
|
||||||
pid,
|
pid,
|
||||||
tmp_file,
|
tmp_file,
|
||||||
artifact_path,
|
artifact_path,
|
||||||
@@ -151,29 +159,16 @@ pub async fn start_work(
|
|||||||
///
|
///
|
||||||
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
|
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
|
||||||
/// cleared by `with_tmp_file`.
|
/// cleared by `with_tmp_file`.
|
||||||
async fn handle_response_bytes(
|
async fn handle_response(
|
||||||
|
metrics: &Metrics,
|
||||||
worker: IdleWorker,
|
worker: IdleWorker,
|
||||||
response_bytes: Vec<u8>,
|
result: PrepareResult,
|
||||||
|
memory_stats: Option<MemoryStats>,
|
||||||
pid: u32,
|
pid: u32,
|
||||||
tmp_file: PathBuf,
|
tmp_file: PathBuf,
|
||||||
artifact_path: PathBuf,
|
artifact_path: PathBuf,
|
||||||
preparation_timeout: Duration,
|
preparation_timeout: Duration,
|
||||||
) -> Outcome {
|
) -> Outcome {
|
||||||
// By convention we expect encoded `PrepareResult`.
|
|
||||||
let result = match PrepareResult::decode(&mut response_bytes.as_slice()) {
|
|
||||||
Ok(result) => result,
|
|
||||||
Err(err) => {
|
|
||||||
// We received invalid bytes from the worker.
|
|
||||||
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
|
|
||||||
gum::warn!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
worker_pid = %pid,
|
|
||||||
"received unexpected response from the prepare worker: {}",
|
|
||||||
HexDisplay::from(&bound_bytes),
|
|
||||||
);
|
|
||||||
return Outcome::IoErr(err.to_string())
|
|
||||||
},
|
|
||||||
};
|
|
||||||
let cpu_time_elapsed = match result {
|
let cpu_time_elapsed = match result {
|
||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
// Timed out on the child. This should already be logged by the child.
|
// Timed out on the child. This should already be logged by the child.
|
||||||
@@ -202,7 +197,7 @@ async fn handle_response_bytes(
|
|||||||
artifact_path.display(),
|
artifact_path.display(),
|
||||||
);
|
);
|
||||||
|
|
||||||
match tokio::fs::rename(&tmp_file, &artifact_path).await {
|
let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await {
|
||||||
Ok(()) => Outcome::Concluded { worker, result },
|
Ok(()) => Outcome::Concluded { worker, result },
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
gum::warn!(
|
gum::warn!(
|
||||||
@@ -215,7 +210,15 @@ async fn handle_response_bytes(
|
|||||||
);
|
);
|
||||||
Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) }
|
Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) }
|
||||||
},
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// If there were no errors up until now, log the memory stats for a successful preparation, if
|
||||||
|
// available.
|
||||||
|
if let Some(memory_stats) = memory_stats {
|
||||||
|
observe_memory_metrics(metrics, memory_stats, pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outcome
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a temporary file for an artifact at the given cache path and execute the given
|
/// Create a temporary file for an artifact at the given cache path and execute the given
|
||||||
@@ -288,17 +291,75 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf,
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
let preparation_timeout = framed_recv(stream).await?;
|
let preparation_timeout = framed_recv(stream).await?;
|
||||||
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|_| {
|
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| {
|
||||||
io::Error::new(
|
io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
"prepare pvf recv_request: failed to decode duration".to_string(),
|
format!("prepare pvf recv_request: failed to decode duration: {:?}", e),
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
Ok((code, tmp_file, preparation_timeout))
|
Ok((code, tmp_file, preparation_timeout))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_response(
|
||||||
|
stream: &mut UnixStream,
|
||||||
|
result: PrepareResult,
|
||||||
|
memory_stats: Option<MemoryStats>,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
framed_send(stream, &result.encode()).await?;
|
||||||
|
framed_send(stream, &memory_stats.encode()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recv_response(
|
||||||
|
stream: &mut UnixStream,
|
||||||
|
pid: u32,
|
||||||
|
) -> io::Result<(PrepareResult, Option<MemoryStats>)> {
|
||||||
|
let result = framed_recv(stream).await?;
|
||||||
|
let result = PrepareResult::decode(&mut &result[..]).map_err(|e| {
|
||||||
|
// We received invalid bytes from the worker.
|
||||||
|
let bound_bytes = &result[..result.len().min(4)];
|
||||||
|
gum::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %pid,
|
||||||
|
"received unexpected response from the prepare worker: {}",
|
||||||
|
HexDisplay::from(&bound_bytes),
|
||||||
|
);
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("prepare pvf recv_response: failed to decode result: {:?}", e),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let memory_stats = framed_recv(stream).await?;
|
||||||
|
let memory_stats = Option::<MemoryStats>::decode(&mut &memory_stats[..]).map_err(|e| {
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
Ok((result, memory_stats))
|
||||||
|
}
|
||||||
|
|
||||||
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
|
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
|
||||||
/// the path to the socket used to communicate with the host.
|
/// the path to the socket used to communicate with the host.
|
||||||
|
///
|
||||||
|
/// # Flow
|
||||||
|
///
|
||||||
|
/// This runs the following in a loop:
|
||||||
|
///
|
||||||
|
/// 1. Get the code and parameters for preparation from the host.
|
||||||
|
///
|
||||||
|
/// 2. Start a memory tracker in a separate thread.
|
||||||
|
///
|
||||||
|
/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
|
||||||
|
///
|
||||||
|
/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor
|
||||||
|
/// thread will trigger first.
|
||||||
|
///
|
||||||
|
/// 5. Stop the memory tracker and get the stats.
|
||||||
|
///
|
||||||
|
/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
|
||||||
|
///
|
||||||
|
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
||||||
|
/// send that in the `PrepareResult`.
|
||||||
pub fn worker_entrypoint(socket_path: &str) {
|
pub fn worker_entrypoint(socket_path: &str) {
|
||||||
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
|
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
|
||||||
loop {
|
loop {
|
||||||
@@ -309,26 +370,40 @@ pub fn worker_entrypoint(socket_path: &str) {
|
|||||||
"worker: preparing artifact",
|
"worker: preparing artifact",
|
||||||
);
|
);
|
||||||
|
|
||||||
// Used to signal to the cpu time monitor thread that it can finish.
|
|
||||||
let (finished_tx, finished_rx) = channel::<()>();
|
|
||||||
let cpu_time_start = ProcessTime::now();
|
let cpu_time_start = ProcessTime::now();
|
||||||
|
|
||||||
|
// Run the memory tracker.
|
||||||
|
let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
|
||||||
|
let memory_tracker_fut =
|
||||||
|
rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
|
||||||
|
|
||||||
// Spawn a new thread that runs the CPU time monitor.
|
// Spawn a new thread that runs the CPU time monitor.
|
||||||
let thread_fut = rt_handle
|
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||||
|
let cpu_time_monitor_fut = rt_handle
|
||||||
.spawn_blocking(move || {
|
.spawn_blocking(move || {
|
||||||
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx)
|
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
|
||||||
})
|
})
|
||||||
.fuse();
|
.fuse();
|
||||||
let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse();
|
// Spawn another thread for preparation.
|
||||||
|
let prepare_fut = rt_handle
|
||||||
|
.spawn_blocking(move || {
|
||||||
|
let prepare_result = prepare_artifact(&code);
|
||||||
|
|
||||||
pin_mut!(thread_fut);
|
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
|
||||||
|
let max_rss = get_max_rss_thread();
|
||||||
|
|
||||||
|
(prepare_result, max_rss)
|
||||||
|
})
|
||||||
|
.fuse();
|
||||||
|
|
||||||
|
pin_mut!(cpu_time_monitor_fut);
|
||||||
pin_mut!(prepare_fut);
|
pin_mut!(prepare_fut);
|
||||||
|
|
||||||
let result = select_biased! {
|
let (result, memory_stats) = select_biased! {
|
||||||
// If this future is not selected, the join handle is dropped and the thread will
|
// If this future is not selected, the join handle is dropped and the thread will
|
||||||
// finish in the background.
|
// finish in the background.
|
||||||
join_res = thread_fut => {
|
join_res = cpu_time_monitor_fut => {
|
||||||
match join_res {
|
let result = match join_res {
|
||||||
Ok(Some(cpu_time_elapsed)) => {
|
Ok(Some(cpu_time_elapsed)) => {
|
||||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||||
gum::warn!(
|
gum::warn!(
|
||||||
@@ -342,18 +417,27 @@ pub fn worker_entrypoint(socket_path: &str) {
|
|||||||
},
|
},
|
||||||
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
|
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
|
||||||
Err(err) => Err(PrepareError::IoErr(err.to_string())),
|
Err(err) => Err(PrepareError::IoErr(err.to_string())),
|
||||||
}
|
};
|
||||||
|
(result, None)
|
||||||
},
|
},
|
||||||
compilation_res = prepare_fut => {
|
compilation_res = prepare_fut => {
|
||||||
let cpu_time_elapsed = cpu_time_start.elapsed();
|
let cpu_time_elapsed = cpu_time_start.elapsed();
|
||||||
let _ = finished_tx.send(());
|
let _ = cpu_time_monitor_tx.send(());
|
||||||
|
|
||||||
match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
|
match compilation_res.unwrap_or_else(|err| (Err(PrepareError::IoErr(err.to_string())), None)) {
|
||||||
Err(err) => {
|
(Err(err), _) => {
|
||||||
// Serialized error will be written into the socket.
|
// Serialized error will be written into the socket.
|
||||||
Err(err)
|
(Err(err), None)
|
||||||
},
|
},
|
||||||
Ok(compiled_artifact) => {
|
(Ok(compiled_artifact), max_rss) => {
|
||||||
|
// Stop the memory stats worker and get its observed memory stats.
|
||||||
|
let memory_tracker_stats =
|
||||||
|
get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await;
|
||||||
|
let memory_stats = MemoryStats {
|
||||||
|
memory_tracker_stats,
|
||||||
|
max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())),
|
||||||
|
};
|
||||||
|
|
||||||
// Write the serialized artifact into a temp file.
|
// Write the serialized artifact into a temp file.
|
||||||
//
|
//
|
||||||
// PVF host only keeps artifacts statuses in its memory, successfully
|
// PVF host only keeps artifacts statuses in its memory, successfully
|
||||||
@@ -369,13 +453,13 @@ pub fn worker_entrypoint(socket_path: &str) {
|
|||||||
);
|
);
|
||||||
tokio::fs::write(&dest, &compiled_artifact).await?;
|
tokio::fs::write(&dest, &compiled_artifact).await?;
|
||||||
|
|
||||||
Ok(cpu_time_elapsed)
|
(Ok(cpu_time_elapsed), Some(memory_stats))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
framed_send(&mut stream, result.encode().as_slice()).await?;
|
send_response(&mut stream, result, memory_stats).await?;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,8 +36,8 @@ impl MemoryAllocationTracker {
|
|||||||
// update stats by advancing the allocation epoch
|
// update stats by advancing the allocation epoch
|
||||||
self.epoch.advance()?;
|
self.epoch.advance()?;
|
||||||
|
|
||||||
let allocated: u64 = self.allocated.read()? as _;
|
let allocated = self.allocated.read()?;
|
||||||
let resident: u64 = self.resident.read()? as _;
|
let resident = self.resident.read()?;
|
||||||
Ok(MemoryAllocationSnapshot { allocated, resident })
|
Ok(MemoryAllocationSnapshot { allocated, resident })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -47,7 +47,7 @@ impl MemoryAllocationTracker {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct MemoryAllocationSnapshot {
|
pub struct MemoryAllocationSnapshot {
|
||||||
/// Total resident memory, in bytes.
|
/// Total resident memory, in bytes.
|
||||||
pub resident: u64,
|
pub resident: usize,
|
||||||
/// Total allocated memory, in bytes.
|
/// Total allocated memory, in bytes.
|
||||||
pub allocated: u64,
|
pub allocated: usize,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,8 +69,8 @@ impl Metrics {
|
|||||||
|
|
||||||
pub(crate) fn memory_stats_snapshot(&self, memory_stats: MemoryAllocationSnapshot) {
|
pub(crate) fn memory_stats_snapshot(&self, memory_stats: MemoryAllocationSnapshot) {
|
||||||
if let Some(metrics) = &self.0 {
|
if let Some(metrics) = &self.0 {
|
||||||
metrics.memory_stats_allocated.set(memory_stats.allocated);
|
metrics.memory_stats_allocated.set(memory_stats.allocated as u64);
|
||||||
metrics.memory_stats_resident.set(memory_stats.resident);
|
metrics.memory_stats_resident.set(memory_stats.resident as u64);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user