Introduce jemalloc-allocator feature flag (#6675)

* Introduce jemalloc-stats feature flag

* remove unneeded space

* Update node/overseer/src/lib.rs

Co-authored-by: Marcin S. <marcin@bytedude.com>

* Update Cargo.toml

Co-authored-by: Marcin S. <marcin@bytedude.com>

* revert making tikv-jemallocator depend on jemalloc-stats

* conditionally import memory_stats instead of using dead_code

* fix test via expllicit import

* Add jemalloc-stats feature to crates, propagate it from root

* Apply `jemalloc-stats` feature to prepare mem stats; small refactor

* effect changes recommended on PR

* Update node/overseer/src/metrics.rs

Co-authored-by: Marcin S. <marcin@bytedude.com>

* fix compile error on in pipeline for linux. missing import

* Update node/overseer/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* revert to defining collect_memory_stats inline

---------

Co-authored-by: Marcin S. <marcin@bytedude.com>
Co-authored-by: Marcin S <marcin@realemail.net>
Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Anthony Alaribe
2023-02-09 10:09:10 +01:00
committed by GitHub
parent 7619fea80f
commit b8eaf25040
10 changed files with 212 additions and 178 deletions
+165 -158
View File
@@ -29,16 +29,7 @@
use crate::{metrics::Metrics, LOG_TARGET};
use parity_scale_codec::{Decode, Encode};
use std::{
io,
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use tikv_jemalloc_ctl::{epoch, stats, Error};
use tokio::task::JoinHandle;
#[cfg(target_os = "linux")]
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};
use std::io;
/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
/// supported by the OS, `ru_maxrss`.
@@ -60,164 +51,17 @@ pub struct MemoryAllocationStats {
pub allocated: u64,
}
#[derive(Clone)]
struct MemoryAllocationTracker {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}
impl MemoryAllocationTracker {
pub fn new() -> Result<Self, Error> {
Ok(Self {
epoch: epoch::mib()?,
allocated: stats::allocated::mib()?,
resident: stats::resident::mib()?,
})
}
pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
// update stats by advancing the allocation epoch
self.epoch.advance()?;
// Convert to `u64`, as `usize` is not `Encode`able.
let allocated = self.allocated.read()? as u64;
let resident = self.resident.read()? as u64;
Ok(MemoryAllocationStats { allocated, resident })
}
}
/// Get the rusage stats for the current thread.
#[cfg(target_os = "linux")]
fn getrusage_thread() -> io::Result<rusage> {
let mut result = rusage {
ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
ru_maxrss: 0,
ru_ixrss: 0,
ru_idrss: 0,
ru_isrss: 0,
ru_minflt: 0,
ru_majflt: 0,
ru_nswap: 0,
ru_inblock: 0,
ru_oublock: 0,
ru_msgsnd: 0,
ru_msgrcv: 0,
ru_nsignals: 0,
ru_nvcsw: 0,
ru_nivcsw: 0,
};
if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 {
return Err(io::Error::last_os_error())
}
Ok(result)
}
/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just
/// returns `None`.
pub fn get_max_rss_thread() -> Option<io::Result<i64>> {
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
#[cfg(target_os = "linux")]
let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
let max_rss = Some(getrusage::getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
#[cfg(not(target_os = "linux"))]
let max_rss = None;
max_rss
}
/// Runs a thread in the background that observes memory statistics. The goal is to try to get
/// accurate stats during preparation.
///
/// # Algorithm
///
/// 1. Create the memory tracker.
///
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
// Apart from that, there is not really a science to this number.
const POLL_INTERVAL: Duration = Duration::from_millis(100);
let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
let mut max_stats = MemoryAllocationStats::default();
let mut update_stats = || -> Result<(), String> {
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
if current_stats.resident > max_stats.resident {
max_stats.resident = current_stats.resident;
}
if current_stats.allocated > max_stats.allocated {
max_stats.allocated = current_stats.allocated;
}
Ok(())
};
loop {
// Take a snapshot and update the max stats.
update_stats()?;
// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
}
}
}
/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error sending signal to memory tracker_thread: {}", err
);
None
} else {
// Join on the thread handle.
match fut.await {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error joining on memory tracker thread: {}", err
);
None
},
}
}
}
/// Helper function to send the memory metrics, if available, to prometheus.
pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) {
if let Some(max_rss) = memory_stats.max_rss {
@@ -241,3 +85,166 @@ pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid:
metrics.observe_preparation_max_allocated(allocated_kb);
}
}
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub mod memory_tracker {
use super::*;
use std::{
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use tikv_jemalloc_ctl::{epoch, stats, Error};
use tokio::task::JoinHandle;
#[derive(Clone)]
struct MemoryAllocationTracker {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}
impl MemoryAllocationTracker {
pub fn new() -> Result<Self, Error> {
Ok(Self {
epoch: epoch::mib()?,
allocated: stats::allocated::mib()?,
resident: stats::resident::mib()?,
})
}
pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
// update stats by advancing the allocation epoch
self.epoch.advance()?;
// Convert to `u64`, as `usize` is not `Encode`able.
let allocated = self.allocated.read()? as u64;
let resident = self.resident.read()? as u64;
Ok(MemoryAllocationStats { allocated, resident })
}
}
/// Runs a thread in the background that observes memory statistics. The goal is to try to get
/// accurate stats during preparation.
///
/// # Algorithm
///
/// 1. Create the memory tracker.
///
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
// Apart from that, there is not really a science to this number.
const POLL_INTERVAL: Duration = Duration::from_millis(100);
let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
let mut max_stats = MemoryAllocationStats::default();
let mut update_stats = || -> Result<(), String> {
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
if current_stats.resident > max_stats.resident {
max_stats.resident = current_stats.resident;
}
if current_stats.allocated > max_stats.allocated {
max_stats.allocated = current_stats.allocated;
}
Ok(())
};
loop {
// Take a snapshot and update the max stats.
update_stats()?;
// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
}
}
}
/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error sending signal to memory tracker_thread: {}", err
);
None
} else {
// Join on the thread handle.
match fut.await {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error joining on memory tracker thread: {}", err
);
None
},
}
}
}
}
#[cfg(target_os = "linux")]
mod getrusage {
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};
use std::io;
/// Get the rusage stats for the current thread.
pub fn getrusage_thread() -> io::Result<rusage> {
let mut result = rusage {
ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
ru_maxrss: 0,
ru_ixrss: 0,
ru_idrss: 0,
ru_isrss: 0,
ru_minflt: 0,
ru_majflt: 0,
ru_nswap: 0,
ru_inblock: 0,
ru_oublock: 0,
ru_msgsnd: 0,
ru_msgrcv: 0,
ru_nsignals: 0,
ru_nvcsw: 0,
ru_nivcsw: 0,
};
if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 {
return Err(io::Error::last_os_error())
}
Ok(result)
}
}
+9 -6
View File
@@ -14,10 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::memory_stats::{
get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics,
MemoryStats,
};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use super::memory_stats::{get_max_rss_thread, observe_memory_metrics, MemoryStats};
use crate::{
artifacts::CompiledArtifact,
error::{PrepareError, PrepareResult},
@@ -373,9 +372,10 @@ pub fn worker_entrypoint(socket_path: &str) {
let cpu_time_start = ProcessTime::now();
// Run the memory tracker.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
let memory_tracker_fut =
rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
@@ -431,8 +431,11 @@ pub fn worker_entrypoint(socket_path: &str) {
},
(Ok(compiled_artifact), max_rss) => {
// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats =
get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await;
#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
let memory_tracker_stats = None;
let memory_stats = MemoryStats {
memory_tracker_stats,
max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())),