Tracking/limiting memory allocator (#1192)

This commit is contained in:
s0me0ne-unkn0wn
2023-11-03 16:48:41 +01:00
committed by GitHub
parent 8cfbee706d
commit cd2d5d2579
19 changed files with 569 additions and 58 deletions
Generated
+9
View File
@@ -12421,15 +12421,20 @@ name = "polkadot-node-core-pvf-prepare-worker"
version = "1.0.0"
dependencies = [
"cfg-if",
"criterion 0.4.0",
"libc",
"parity-scale-codec",
"polkadot-node-core-pvf-common",
"polkadot-primitives",
"rayon",
"rococo-runtime",
"sc-executor-common",
"sc-executor-wasmtime",
"sp-maybe-compressed-blob",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tracing-gum",
"tracking-allocator",
]
[[package]]
@@ -19442,6 +19447,10 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracking-allocator"
version = "1.0.0"
[[package]]
name = "trie-bench"
version = "0.38.0"
+26 -1
View File
@@ -23,27 +23,36 @@ use std::fmt;
pub type PrepareResult = Result<PrepareStats, PrepareError>;
/// An error that occurred during the prepare part of the PVF pipeline.
// Codec indexes are intended to stabilize pre-encoded payloads (see `OOM_PAYLOAD` below)
#[derive(Debug, Clone, Encode, Decode)]
pub enum PrepareError {
/// During the prevalidation stage of preparation an issue was found with the PVF.
#[codec(index = 0)]
Prevalidation(String),
/// Compilation failed for the given PVF.
#[codec(index = 1)]
Preparation(String),
/// Instantiation of the WASM module instance failed.
#[codec(index = 2)]
RuntimeConstruction(String),
/// An unexpected panic has occurred in the preparation worker.
#[codec(index = 3)]
Panic(String),
/// Failed to prepare the PVF due to the time limit.
#[codec(index = 4)]
TimedOut,
/// An IO error occurred. This state is reported by either the validation host or by the
/// worker.
#[codec(index = 5)]
IoErr(String),
/// The temporary file for the artifact could not be created at the given cache path. This
/// state is reported by the validation host (not by the worker).
#[codec(index = 6)]
CreateTmpFileErr(String),
/// The response from the worker is received, but the file cannot be renamed (moved) to the
/// final destination location. This state is reported by the validation host (not by the
/// worker).
#[codec(index = 7)]
RenameTmpFileErr {
err: String,
// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
@@ -51,12 +60,19 @@ pub enum PrepareError {
src: Option<String>,
dest: Option<String>,
},
/// Memory limit reached
#[codec(index = 8)]
OutOfMemory,
/// The response from the worker is received, but the worker cache could not be cleared. The
/// worker has to be killed to avoid jobs having access to data from other jobs. This state is
/// reported by the validation host (not by the worker).
#[codec(index = 9)]
ClearWorkerDir(String),
}
/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)`
pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
impl PrepareError {
/// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
/// errors depend on the PVF itself and the sc-executor/wasmtime logic.
@@ -67,7 +83,7 @@ impl PrepareError {
pub fn is_deterministic(&self) -> bool {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) => true,
Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true,
TimedOut |
IoErr(_) |
CreateTmpFileErr(_) |
@@ -92,6 +108,7 @@ impl fmt::Display for PrepareError {
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr { err, src, dest } =>
write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err),
OutOfMemory => write!(f, "prepare: out of memory"),
ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err),
}
}
@@ -147,3 +164,11 @@ impl fmt::Display for InternalValidationError {
}
}
}
#[test]
fn pre_encoded_payloads() {
let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode();
let mut oom_payload = oom_enc.len().to_le_bytes().to_vec();
oom_payload.extend(oom_enc);
assert_eq!(oom_payload, OOM_PAYLOAD);
}
@@ -166,15 +166,36 @@ pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, S
ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm,
ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm,
ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true,
// TODO: Not implemented yet; <https://github.com/paritytech/polkadot/issues/6472>.
ExecutorParam::PrecheckingMaxMemory(_) => (),
ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), /* Not used here */
ExecutorParam::PrecheckingMaxMemory(_) |
ExecutorParam::PvfPrepTimeout(_, _) |
ExecutorParam::PvfExecTimeout(_, _) => (), /* Not used here */
}
}
sem.deterministic_stack_limit = Some(stack_limit);
Ok(sem)
}
/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error::WasmError> {
let blob = RuntimeBlob::new(code)?;
// It's assumed this function will take care of any prevalidation logic
// that needs to be done.
//
// Do nothing for now.
Ok(blob)
}
/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
pub fn prepare(
blob: RuntimeBlob,
executor_params: &ExecutorParams,
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
let semantics = params_to_wasmtime_semantics(executor_params)
.map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}
/// Available host functions. We leave out:
///
/// 1. storage related stuff (PVF doesn't have a notion of a persistent storage/trie)
+3 -1
View File
@@ -29,12 +29,14 @@ pub struct PrepareStats {
/// supported by the OS, `ru_maxrss`.
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`.
/// Memory stats from `tikv_jemalloc_ctl`, polling-based and not very precise.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. `None` if an error occurred.
#[cfg(target_os = "linux")]
pub max_rss: Option<i64>,
/// Peak allocation in bytes measured by tracking allocator
pub peak_tracked_alloc: u64,
}
/// Statistics of collected memory metrics.
@@ -11,7 +11,9 @@ cfg-if = "1.0"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
rayon = "1.5.1"
tracking-allocator = { path = "../../../tracking-allocator" }
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tikv-jemallocator = { version = "0.5.0", optional = true }
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
@@ -22,11 +24,22 @@ sc-executor-common = { path = "../../../../../substrate/client/executor/common"
sc-executor-wasmtime = { path = "../../../../../substrate/client/executor/wasmtime" }
[target.'cfg(target_os = "linux")'.dependencies]
tikv-jemallocator = "0.5.0"
tikv-jemalloc-ctl = "0.5.0"
[features]
builder = []
jemalloc-allocator = [
"dep:tikv-jemalloc-ctl",
"dep:tikv-jemallocator",
"polkadot-node-core-pvf-common/jemalloc-allocator",
]
[dev-dependencies]
criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support"] }
rococo-runtime = { path = "../../../../runtime/rococo" }
sp-maybe-compressed-blob = { path = "../../../../../substrate/primitives/maybe-compressed-blob" }
[[bench]]
name = "prepare_rococo_runtime"
harness = false
@@ -0,0 +1,65 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use criterion::{criterion_group, criterion_main, Criterion, SamplingMode};
use polkadot_node_core_pvf_common::{
executor_intf::{prepare, prevalidate},
prepare::PrepareJobKind,
pvf::PvfPrepData,
};
use polkadot_primitives::ExecutorParams;
use std::time::Duration;
fn do_prepare_runtime(pvf: PvfPrepData) {
let blob = match prevalidate(&pvf.code()) {
Err(err) => panic!("{:?}", err),
Ok(b) => b,
};
match prepare(blob, &pvf.executor_params()) {
Ok(_) => (),
Err(err) => panic!("{:?}", err),
}
}
fn prepare_rococo_runtime(c: &mut Criterion) {
let blob = rococo_runtime::WASM_BINARY.unwrap();
let pvf = match sp_maybe_compressed_blob::decompress(&blob, 64 * 1024 * 1024) {
Ok(code) => PvfPrepData::from_code(
code.into_owned(),
ExecutorParams::default(),
Duration::from_secs(360),
PrepareJobKind::Compilation,
),
Err(e) => {
panic!("Cannot decompress blob: {:?}", e);
},
};
let mut group = c.benchmark_group("rococo");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(20);
group.measurement_time(Duration::from_secs(240));
group.bench_function("prepare Rococo runtime", |b| {
// `PvfPrepData` is designed to be cheap to clone, so cloning shouldn't affect the
// benchmark accuracy
b.iter(|| do_prepare_runtime(pvf.clone()))
});
group.finish();
}
criterion_group!(preparation, prepare_rococo_runtime);
criterion_main!(preparation);
@@ -1,42 +0,0 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Interface to the Substrate Executor
use polkadot_node_core_pvf_common::executor_intf::params_to_wasmtime_semantics;
use polkadot_primitives::ExecutorParams;
use sc_executor_common::runtime_blob::RuntimeBlob;
/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error::WasmError> {
let blob = RuntimeBlob::new(code)?;
// It's assumed this function will take care of any prevalidation logic
// that needs to be done.
//
// Do nothing for now.
Ok(blob)
}
/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
pub fn prepare(
blob: RuntimeBlob,
executor_params: &ExecutorParams,
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
let semantics = params_to_wasmtime_semantics(executor_params)
.map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}
@@ -16,10 +16,9 @@
//! Contains the logic for preparing PVFs. Used by the polkadot-prepare-worker binary.
mod executor_intf;
mod memory_stats;
pub use executor_intf::{prepare, prevalidate};
use polkadot_node_core_pvf_common::executor_intf::{prepare, prevalidate};
// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`.
@@ -31,7 +30,7 @@ use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
error::{PrepareError, PrepareResult, OOM_PAYLOAD},
executor_intf::create_runtime_from_artifact_bytes,
framed_recv_blocking, framed_send_blocking,
prepare::{MemoryStats, PrepareJobKind, PrepareStats},
@@ -46,11 +45,24 @@ use polkadot_node_core_pvf_common::{
use polkadot_primitives::ExecutorParams;
use std::{
fs, io,
os::unix::net::UnixStream,
os::{
fd::{AsRawFd, RawFd},
unix::net::UnixStream,
},
path::PathBuf,
sync::{mpsc::channel, Arc},
time::Duration,
};
use tracking_allocator::TrackingAllocator;
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[global_allocator]
static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
TrackingAllocator(tikv_jemallocator::Jemalloc);
#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
#[global_allocator]
static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
/// Contains the bytes for a successfully compiled artifact.
pub struct CompiledArtifact(Vec<u8>);
@@ -83,6 +95,44 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
framed_send_blocking(stream, &result.encode())
}
fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
unsafe {
// SAFETY: Inside the failure handler, the allocator is locked and no allocations or
// deallocations are possible. For Linux, that always holds for the code below, so it's
// safe. For MacOS, that technically holds at the time of writing, but there are no future
// guarantees.
// The arguments of unsafe `libc` calls are valid, the payload validity is covered with
// a test.
ALLOC.start_tracking(
limit,
Some(Box::new(move || {
#[cfg(target_os = "linux")]
{
// Syscalls never allocate or deallocate, so this is safe.
libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
libc::syscall(libc::SYS_close, fd);
libc::syscall(libc::SYS_exit, 1);
}
#[cfg(not(target_os = "linux"))]
{
// Syscalls are not available on MacOS, so we have to use `libc` wrappers.
// Technicaly, there may be allocations inside, although they shouldn't be
// there. In that case, we'll see deadlocks on MacOS after the OOM condition
// triggered. As we consider running a validator on MacOS unsafe, and this
// code is only run by a validator, it's a lesser evil.
libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
libc::close(fd);
std::process::exit(1);
}
})),
);
}
}
fn end_memory_tracking() -> isize {
ALLOC.end_tracking()
}
/// The entrypoint that the spawned prepare worker should start with.
///
/// # Parameters
@@ -172,6 +222,22 @@ pub fn worker_entrypoint(
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)?;
start_memory_tracking(
stream.as_raw_fd(),
executor_params.prechecking_max_memory().map(|v| {
v.try_into().unwrap_or_else(|_| {
gum::warn!(
LOG_TARGET,
%worker_pid,
"Illegal pre-checking max memory value {} discarded",
v,
);
0
})
}),
);
// Spawn another thread for preparation.
let prepare_thread = thread::spawn_worker_thread(
"prepare thread",
@@ -207,6 +273,17 @@ pub fn worker_entrypoint(
let outcome = thread::wait_for_threads(condvar);
let peak_alloc = {
let peak = end_memory_tracking();
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"prepare job peak allocation is {} bytes",
peak,
);
peak
};
let result = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
@@ -238,6 +315,14 @@ pub fn worker_entrypoint(
memory_tracker_stats,
#[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, worker_pid),
// Negative peak allocation values are legit; they are narrow
// corner cases and shouldn't affect overall statistics
// significantly
peak_tracked_alloc: if peak_alloc > 0 {
peak_alloc as u64
} else {
0u64
},
};
// Write the serialized artifact into a temp file.
+20
View File
@@ -93,6 +93,10 @@ impl Metrics {
metrics.preparation_max_resident.observe(max_resident_kb);
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
metrics
.preparation_peak_tracked_allocation
.observe((memory_stats.peak_tracked_alloc / 1024) as f64);
}
}
}
@@ -110,10 +114,14 @@ struct MetricsInner {
execution_time: prometheus::Histogram,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::Histogram,
// Max. allocated memory, tracked by Jemallocator, polling-based
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::Histogram,
// Max. resident memory, tracked by Jemallocator, polling-based
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::Histogram,
// Peak allocation value, tracked by tracking-allocator
preparation_peak_tracked_allocation: prometheus::Histogram,
}
impl metrics::Metrics for Metrics {
@@ -271,6 +279,18 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
preparation_peak_tracked_allocation: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_peak_tracked_allocation",
"peak allocation observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
};
Ok(Metrics(Some(inner)))
}
@@ -399,6 +399,20 @@ fn handle_mux(
)?;
}
Ok(())
},
Outcome::OutOfMemory => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::OutOfMemory),
},
)?;
}
Ok(())
},
}
@@ -98,6 +98,8 @@ pub enum Outcome {
///
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
IoErr(String),
/// The worker ran out of memory and is aborting. The worker should be ripped.
OutOfMemory,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -234,6 +236,7 @@ async fn handle_response(
Ok(result) => result,
// Timed out on the child. This should already be logged by the child.
Err(PrepareError::TimedOut) => return Outcome::TimedOut,
Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory,
Err(_) => return Outcome::Concluded { worker, result },
};
+1 -1
View File
@@ -36,8 +36,8 @@ pub fn validate_candidate(
code: &[u8],
params: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
use polkadot_node_core_pvf_common::executor_intf::{prepare, prevalidate};
use polkadot_node_core_pvf_execute_worker::execute_artifact;
use polkadot_node_core_pvf_prepare_worker::{prepare, prevalidate};
let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024)
.expect("Decompressing code failed");
+2
View File
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! PVF host integration tests checking the chain production pipeline.
use super::TestHost;
use adder::{hash_state, BlockData, HeadData};
use parity_scale_codec::{Decode, Encode};
+40 -4
View File
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! General PVF host integration tests checking the functionality of the PVF host itself.
use assert_matches::assert_matches;
use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{
@@ -22,13 +24,10 @@ use polkadot_node_core_pvf::{
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::ExecutorParams;
use polkadot_primitives::{ExecutorParam, ExecutorParams};
#[cfg(target_os = "linux")]
use rusty_fork::rusty_fork_test;
#[cfg(feature = "ci-only-tests")]
use polkadot_primitives::ExecutorParam;
use std::time::Duration;
use tokio::sync::Mutex;
@@ -434,6 +433,43 @@ async fn deleting_prepared_artifact_does_not_dispute() {
}
}
// This test checks if the adder parachain runtime can be prepared with 10Mb preparation memory
// limit enforced. At the moment of writing, the limit if far enough to prepare the PVF. If it
// starts failing, either Wasmtime version has changed, or the PVF code itself has changed, and
// more memory is required now. Multi-threaded preparation, if ever enabled, may also affect
// memory consumption.
#[tokio::test]
async fn prechecking_within_memory_limits() {
let host = TestHost::new().await;
let result = host
.precheck_pvf(
::adder::wasm_binary_unwrap(),
ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(10 * 1024 * 1024)][..]),
)
.await;
assert_matches!(result, Ok(_));
}
// This test checks if the adder parachain runtime can be prepared with 512Kb preparation memory
// limit enforced. At the moment of writing, the limit if not enough to prepare the PVF, and the
// preparation is supposed to generate an error. If the test starts failing, either Wasmtime
// version has changed, or the PVF code itself has changed, and less memory is required now.
#[tokio::test]
async fn prechecking_out_of_memory() {
use polkadot_node_core_pvf::PrepareError;
let host = TestHost::new().await;
let result = host
.precheck_pvf(
::adder::wasm_binary_unwrap(),
ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(512 * 1024)][..]),
)
.await;
assert_matches!(result, Err(PrepareError::OutOfMemory));
}
// With one worker, run multiple preparation jobs serially. They should not conflict.
#[tokio::test]
async fn prepare_can_run_serially() {
+1 -1
View File
@@ -48,7 +48,7 @@ erasure = { package = "polkadot-erasure-coding", path = "../../erasure-coding" }
rand = "0.8.5"
# Required for worker binaries to build.
polkadot-node-core-pvf-common = { path = "../core/pvf/common", features = ["test-utils"] }
polkadot-node-core-pvf-common = { path = "../core/pvf/common" }
polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" }
polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" }
@@ -0,0 +1,6 @@
[package]
name = "tracking-allocator"
description = "Tracking allocator to control the amount of memory consumed by the process"
version = "1.0.0"
authors.workspace = true
edition.workspace = true
+242
View File
@@ -0,0 +1,242 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Tracking/limiting global allocator. Calculates the peak allocation between two checkpoints for
//! the whole process. Accepts an optional limit and a failure handler which is called if the limit
//! is overflown.
use core::{
alloc::{GlobalAlloc, Layout},
ops::{Deref, DerefMut},
};
use std::{
cell::UnsafeCell,
ptr::null_mut,
sync::atomic::{AtomicBool, Ordering},
};
struct Spinlock<T> {
lock: AtomicBool,
data: UnsafeCell<T>,
}
struct SpinlockGuard<'a, T: 'a> {
lock: &'a Spinlock<T>,
}
// SAFETY: We require that the data inside of the `SpinLock` is `Send`, so it can be sent
// and accessed by any thread as long as it's accessed by only one thread at a time.
// The `SpinLock` provides an exclusive lock over it, so it guarantees that multiple
// threads cannot access it at the same time, hence it implements `Sync` (that is, it can be
// accessed concurrently from multiple threads, even though the `T` itself might not
// necessarily be `Sync` too).
unsafe impl<T: Send> Sync for Spinlock<T> {}
impl<T> Spinlock<T> {
pub const fn new(t: T) -> Spinlock<T> {
Spinlock { lock: AtomicBool::new(false), data: UnsafeCell::new(t) }
}
#[inline]
pub fn lock(&self) -> SpinlockGuard<T> {
loop {
// Try to acquire the lock.
if self
.lock
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return SpinlockGuard { lock: self }
}
// We failed to acquire the lock; wait until it's unlocked.
//
// In theory this should result in less coherency traffic as unlike `compare_exchange`
// it is a read-only operation, so multiple cores can execute it simultaneously
// without taking an exclusive lock over the cache line.
while self.lock.load(Ordering::Relaxed) {
std::hint::spin_loop();
}
}
}
#[inline]
fn unlock(&self) {
self.lock.store(false, Ordering::Release);
}
}
impl<T> Deref for SpinlockGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
// SAFETY: It is safe to dereference a guard to the `UnsafeCell` underlying data as the
// presence of the guard means the data is already locked.
unsafe { &*self.lock.data.get() }
}
}
impl<T> DerefMut for SpinlockGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
// SAFETY: Same as for `Deref::deref`.
unsafe { &mut *self.lock.data.get() }
}
}
impl<T> Drop for SpinlockGuard<'_, T> {
fn drop(&mut self) {
self.lock.unlock();
}
}
struct TrackingAllocatorData {
current: isize,
peak: isize,
limit: isize,
failure_handler: Option<Box<dyn Fn() + Send>>,
}
impl TrackingAllocatorData {
fn start_tracking(
mut guard: SpinlockGuard<Self>,
limit: isize,
failure_handler: Option<Box<dyn Fn() + Send>>,
) {
guard.current = 0;
guard.peak = 0;
guard.limit = limit;
// Cannot drop it yet, as it would trigger a deallocation
let old_handler = guard.failure_handler.take();
guard.failure_handler = failure_handler;
drop(guard);
drop(old_handler);
}
fn end_tracking(mut guard: SpinlockGuard<Self>) -> isize {
let peak = guard.peak;
guard.limit = 0;
// Cannot drop it yet, as it would trigger a deallocation
let old_handler = guard.failure_handler.take();
drop(guard);
drop(old_handler);
peak
}
#[inline]
fn track_and_check_limits(
mut guard: SpinlockGuard<Self>,
alloc: isize,
) -> Option<SpinlockGuard<Self>> {
guard.current += alloc;
if guard.current > guard.peak {
guard.peak = guard.current;
}
if guard.limit == 0 || guard.peak <= guard.limit {
None
} else {
Some(guard)
}
}
}
static ALLOCATOR_DATA: Spinlock<TrackingAllocatorData> =
Spinlock::new(TrackingAllocatorData { current: 0, peak: 0, limit: 0, failure_handler: None });
pub struct TrackingAllocator<A: GlobalAlloc>(pub A);
impl<A: GlobalAlloc> TrackingAllocator<A> {
/// Start tracking memory allocations and deallocations.
///
/// # Safety
///
/// Failure handler is called with the allocator being in the locked state. Thus, no
/// allocations or deallocations are allowed inside the failure handler; otherwise, a
/// deadlock will occur.
pub unsafe fn start_tracking(
&self,
limit: Option<isize>,
failure_handler: Option<Box<dyn Fn() + Send>>,
) {
TrackingAllocatorData::start_tracking(
ALLOCATOR_DATA.lock(),
limit.unwrap_or(0),
failure_handler,
);
}
/// End tracking and return the peak allocation value in bytes (as `isize`). Peak allocation
/// value is not guaranteed to be neither non-zero nor positive.
pub fn end_tracking(&self) -> isize {
TrackingAllocatorData::end_tracking(ALLOCATOR_DATA.lock())
}
}
#[cold]
#[inline(never)]
unsafe fn fail_allocation(guard: SpinlockGuard<TrackingAllocatorData>) -> *mut u8 {
if let Some(failure_handler) = &guard.failure_handler {
failure_handler()
}
null_mut()
}
unsafe impl<A: GlobalAlloc> GlobalAlloc for TrackingAllocator<A> {
// SAFETY:
// * The wrapped methods are as safe as the underlying allocator implementation is
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let guard = ALLOCATOR_DATA.lock();
if let Some(guard) =
TrackingAllocatorData::track_and_check_limits(guard, layout.size() as isize)
{
fail_allocation(guard)
} else {
self.0.alloc(layout)
}
}
#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let guard = ALLOCATOR_DATA.lock();
if let Some(guard) =
TrackingAllocatorData::track_and_check_limits(guard, layout.size() as isize)
{
fail_allocation(guard)
} else {
self.0.alloc_zeroed(layout)
}
}
#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) -> () {
let guard = ALLOCATOR_DATA.lock();
TrackingAllocatorData::track_and_check_limits(guard, -(layout.size() as isize));
self.0.dealloc(ptr, layout)
}
#[inline]
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let guard = ALLOCATOR_DATA.lock();
if let Some(guard) = TrackingAllocatorData::track_and_check_limits(
guard,
(new_size as isize) - (layout.size() as isize),
) {
fail_allocation(guard)
} else {
self.0.realloc(ptr, layout, new_size)
}
}
}
@@ -197,6 +197,16 @@ impl ExecutorParams {
None
}
/// Returns pre-checking memory limit, if any
pub fn prechecking_max_memory(&self) -> Option<u64> {
for param in &self.0 {
if let ExecutorParam::PrecheckingMaxMemory(limit) = param {
return Some(*limit)
}
}
None
}
/// Check params coherence.
pub fn check_consistency(&self) -> Result<(), ExecutorParamError> {
use ExecutorParam::*;
+1 -1
View File
@@ -24,7 +24,7 @@ use color_eyre::eyre;
/// `memory_stats::MemoryAllocationTracker`.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[global_allocator]
pub static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
fn main() -> eyre::Result<()> {
color_eyre::install()?;