Add new hardware and software metrics (#11062)

* Add new hardware and software metrics

* Move sysinfo tests into `mod tests`

* Correct a typo in a comment

* Remove unnecessary `nix` dependency

* Fix the version tests

* Add a `--disable-hardware-benchmarks` CLI argument

* Disable hardware benchmarks in the integration tests

* Remove unused import

* Fix benchmarks compilation

* Move code to a new `sc-sysinfo` crate

* Correct `impl_version` comment

* Move `--disable-hardware-benchmarks` to the chain-specific bin crate

* Move printing out of hardware bench results to `sc-sysinfo`

* Move hardware benchmarks to a separate messages; trigger them manually

* Rename some of the fields in the `HwBench` struct

* Revert changes to the telemetry crate; manually send hwbench messages

* Move sysinfo logs into the sysinfo crate

* Move the `TARGET_OS_*` constants into the sysinfo crate

* Minor cleanups

* Move the `HwBench` struct to the sysinfo crate

* Derive `Clone` for `HwBench`

* Fix broken telemetry connection notification stream

* Prevent the telemetry connection notifiers from leaking if they're disconnected

* Turn the telemetry notification failure log into a debug log

* Rename `--disable-hardware-benchmarks` to `--no-hardware-benchmarks`
This commit is contained in:
Koute
2022-04-11 18:46:53 +09:00
committed by GitHub
parent f517e57f67
commit 8351ada6a3
29 changed files with 808 additions and 63 deletions
+111
View File
@@ -0,0 +1,111 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! This crate contains the code necessary to gather basic hardware
//! and software telemetry information about the node on which we're running.
use futures::prelude::*;
mod sysinfo;
#[cfg(target_os = "linux")]
mod sysinfo_linux;
pub use sysinfo::{gather_hwbench, gather_sysinfo};
/// The operating system part of the current target triplet.
pub const TARGET_OS: &str = include_str!(concat!(env!("OUT_DIR"), "/target_os.txt"));
/// The CPU ISA architecture part of the current target triplet.
pub const TARGET_ARCH: &str = include_str!(concat!(env!("OUT_DIR"), "/target_arch.txt"));
/// The environment part of the current target triplet.
pub const TARGET_ENV: &str = include_str!(concat!(env!("OUT_DIR"), "/target_env.txt"));
/// Hardware benchmark results for the node.
#[derive(Clone, Debug, serde::Serialize)]
pub struct HwBench {
/// The CPU speed, as measured in how many MB/s it can hash using the BLAKE2b-256 hash.
pub cpu_hashrate_score: u64,
/// Memory bandwidth in MB/s, calculated by measuring the throughput of `memcpy`.
pub memory_memcpy_score: u64,
/// Sequential disk write speed in MB/s.
pub disk_sequential_write_score: Option<u64>,
/// Random disk write speed in MB/s.
pub disk_random_write_score: Option<u64>,
}
/// Prints out the system software/hardware information in the logs.
pub fn print_sysinfo(sysinfo: &sc_telemetry::SysInfo) {
log::info!("💻 Operating system: {}", TARGET_OS);
log::info!("💻 CPU architecture: {}", TARGET_ARCH);
if !TARGET_ENV.is_empty() {
log::info!("💻 Target environment: {}", TARGET_ENV);
}
if let Some(ref cpu) = sysinfo.cpu {
log::info!("💻 CPU: {}", cpu);
}
if let Some(core_count) = sysinfo.core_count {
log::info!("💻 CPU cores: {}", core_count);
}
if let Some(memory) = sysinfo.memory {
log::info!("💻 Memory: {}MB", memory / (1024 * 1024));
}
if let Some(ref linux_kernel) = sysinfo.linux_kernel {
log::info!("💻 Kernel: {}", linux_kernel);
}
if let Some(ref linux_distro) = sysinfo.linux_distro {
log::info!("💻 Linux distribution: {}", linux_distro);
}
if let Some(is_virtual_machine) = sysinfo.is_virtual_machine {
log::info!("💻 Virtual machine: {}", if is_virtual_machine { "yes" } else { "no" });
}
}
/// Prints out the results of the hardware benchmarks in the logs.
pub fn print_hwbench(hwbench: &HwBench) {
log::info!("🏁 CPU score: {}MB/s", hwbench.cpu_hashrate_score);
log::info!("🏁 Memory score: {}MB/s", hwbench.memory_memcpy_score);
if let Some(score) = hwbench.disk_sequential_write_score {
log::info!("🏁 Disk score (seq. writes): {}MB/s", score);
}
if let Some(score) = hwbench.disk_random_write_score {
log::info!("🏁 Disk score (rand. writes): {}MB/s", score);
}
}
/// Initializes the hardware benchmarks telemetry.
pub fn initialize_hwbench_telemetry(
telemetry_handle: sc_telemetry::TelemetryHandle,
hwbench: HwBench,
) -> impl std::future::Future<Output = ()> {
let mut connect_stream = telemetry_handle.on_connect_stream();
async move {
let payload = serde_json::to_value(&hwbench)
.expect("the `HwBench` can always be serialized into a JSON object; qed");
let mut payload = match payload {
serde_json::Value::Object(map) => map,
_ => unreachable!("the `HwBench` always serializes into a JSON object; qed"),
};
payload.insert("msg".into(), "sysinfo.hwbench".into());
while connect_stream.next().await.is_some() {
telemetry_handle.send_telemetry(sc_telemetry::SUBSTRATE_INFO, payload.clone());
}
}
}
+393
View File
@@ -0,0 +1,393 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::HwBench;
use rand::{seq::SliceRandom, Rng};
use sc_telemetry::SysInfo;
use std::{
fs::File,
io::{Seek, SeekFrom, Write},
ops::{Deref, DerefMut},
path::{Path, PathBuf},
time::{Duration, Instant},
};
#[inline(always)]
pub(crate) fn benchmark<E>(
name: &str,
size: usize,
max_iterations: usize,
max_duration: Duration,
mut run: impl FnMut() -> Result<(), E>,
) -> Result<u64, E> {
// Run the benchmark once as a warmup to get the code into the L1 cache.
run()?;
// Then run it multiple times and average the result.
let timestamp = Instant::now();
let mut elapsed = Duration::default();
let mut count = 0;
for _ in 0..max_iterations {
run()?;
count += 1;
elapsed = timestamp.elapsed();
if elapsed >= max_duration {
break
}
}
let score = (((size * count) as f64 / elapsed.as_secs_f64()) / (1024.0 * 1024.0)) as u64;
log::trace!(
"Calculated {} of {}MB/s in {} iterations in {}ms",
name,
score,
count,
elapsed.as_millis()
);
Ok(score)
}
/// Gathers information about node's hardware and software.
pub fn gather_sysinfo() -> SysInfo {
#[allow(unused_mut)]
let mut sysinfo = SysInfo {
cpu: None,
memory: None,
core_count: None,
linux_kernel: None,
linux_distro: None,
is_virtual_machine: None,
};
#[cfg(target_os = "linux")]
crate::sysinfo_linux::gather_linux_sysinfo(&mut sysinfo);
sysinfo
}
#[inline(never)]
fn clobber(slice: &mut [u8]) {
assert!(!slice.is_empty());
// Discourage the compiler from optimizing out our benchmarks.
//
// Volatile reads and writes are guaranteed to not be elided nor reordered,
// so we can use them to effectively clobber a piece of memory and prevent
// the compiler from optimizing out our technically unnecessary code.
//
// This is not totally bulletproof in theory, but should work in practice.
//
// SAFETY: We've checked that the slice is not empty, so reading and writing
// its first element is always safe.
unsafe {
let value = std::ptr::read_volatile(slice.as_ptr());
std::ptr::write_volatile(slice.as_mut_ptr(), value);
}
}
// This benchmarks the CPU speed as measured by calculating BLAKE2b-256 hashes, in MB/s.
fn benchmark_cpu() -> u64 {
// In general the results of this benchmark are somewhat sensitive to how much
// data we hash at the time. The smaller this is the *less* MB/s we can hash,
// the bigger this is the *more* MB/s we can hash, up until a certain point
// where we can achieve roughly ~100% of what the hasher can do. If we'd plot
// this on a graph with the number of bytes we want to hash on the X axis
// and the speed in MB/s on the Y axis then we'd essentially see it grow
// logarithmically.
//
// In practice however we might not always have enough data to hit the maximum
// possible speed that the hasher can achieve, so the size set here should be
// picked in such a way as to still measure how fast the hasher is at hashing,
// but without hitting its theoretical maximum speed.
const SIZE: usize = 32 * 1024;
const MAX_ITERATIONS: usize = 4 * 1024;
const MAX_DURATION: Duration = Duration::from_millis(100);
let mut buffer = Vec::new();
buffer.resize(SIZE, 0x66);
let mut hash = Default::default();
let run = || -> Result<(), ()> {
clobber(&mut buffer);
hash = sp_core::hashing::blake2_256(&buffer);
clobber(&mut hash);
Ok(())
};
benchmark("CPU score", SIZE, MAX_ITERATIONS, MAX_DURATION, run)
.expect("benchmark cannot fail; qed")
}
// This benchmarks the effective `memcpy` memory bandwidth available in MB/s.
//
// It doesn't technically measure the absolute maximum memory bandwidth available,
// but that's fine, because real code most of the time isn't optimized to take
// advantage of the full memory bandwidth either.
fn benchmark_memory() -> u64 {
// Ideally this should be at least as big as the CPU's L3 cache,
// and it should be big enough so that the `memcpy` takes enough
// time to be actually measurable.
//
// As long as it's big enough increasing it further won't change
// the benchmark's results.
const SIZE: usize = 64 * 1024 * 1024;
const MAX_ITERATIONS: usize = 32;
const MAX_DURATION: Duration = Duration::from_millis(100);
let mut src = Vec::new();
let mut dst = Vec::new();
// Prefault the pages; we want to measure the memory bandwidth,
// not how fast the kernel can supply us with fresh memory pages.
src.resize(SIZE, 0x66);
dst.resize(SIZE, 0x77);
let run = || -> Result<(), ()> {
clobber(&mut src);
clobber(&mut dst);
// SAFETY: Both vectors are of the same type and of the same size,
// so copying data between them is safe.
unsafe {
// We use `memcpy` directly here since `copy_from_slice` isn't actually
// guaranteed to be turned into a `memcpy`.
libc::memcpy(dst.as_mut_ptr().cast(), src.as_ptr().cast(), SIZE);
}
clobber(&mut dst);
clobber(&mut src);
Ok(())
};
benchmark("memory score", SIZE, MAX_ITERATIONS, MAX_DURATION, run)
.expect("benchmark cannot fail; qed")
}
struct TemporaryFile {
fp: Option<File>,
path: PathBuf,
}
impl Drop for TemporaryFile {
fn drop(&mut self) {
let _ = self.fp.take();
// Remove the file.
//
// This has to be done *after* the benchmark,
// otherwise it changes the results as the data
// doesn't actually get properly flushed to the disk,
// since the file's not there anymore.
if let Err(error) = std::fs::remove_file(&self.path) {
log::warn!("Failed to remove the file used for the disk benchmark: {}", error);
}
}
}
impl Deref for TemporaryFile {
type Target = File;
fn deref(&self) -> &Self::Target {
self.fp.as_ref().expect("`fp` is None only during `drop`")
}
}
impl DerefMut for TemporaryFile {
fn deref_mut(&mut self) -> &mut Self::Target {
self.fp.as_mut().expect("`fp` is None only during `drop`")
}
}
fn rng() -> rand_pcg::Pcg64 {
rand_pcg::Pcg64::new(0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7ac28fa16a64abf96)
}
fn random_data(size: usize) -> Vec<u8> {
let mut buffer = Vec::new();
buffer.resize(size, 0);
rng().fill(&mut buffer[..]);
buffer
}
pub fn benchmark_disk_sequential_writes(directory: &Path) -> Result<u64, String> {
const SIZE: usize = 64 * 1024 * 1024;
const MAX_ITERATIONS: usize = 32;
const MAX_DURATION: Duration = Duration::from_millis(300);
let buffer = random_data(SIZE);
let path = directory.join(".disk_bench_seq_wr.tmp");
let fp =
File::create(&path).map_err(|error| format!("failed to create a test file: {}", error))?;
let mut fp = TemporaryFile { fp: Some(fp), path };
fp.sync_all()
.map_err(|error| format!("failed to fsync the test file: {}", error))?;
let run = || {
// Just dump everything to the disk in one go.
fp.write_all(&buffer)
.map_err(|error| format!("failed to write to the test file: {}", error))?;
// And then make sure it was actually written to disk.
fp.sync_all()
.map_err(|error| format!("failed to fsync the test file: {}", error))?;
// Rewind to the beginning for the next iteration of the benchmark.
fp.seek(SeekFrom::Start(0))
.map_err(|error| format!("failed to seek to the start of the test file: {}", error))?;
Ok(())
};
benchmark("disk sequential write score", SIZE, MAX_ITERATIONS, MAX_DURATION, run)
}
pub fn benchmark_disk_random_writes(directory: &Path) -> Result<u64, String> {
const SIZE: usize = 64 * 1024 * 1024;
const MAX_ITERATIONS: usize = 32;
const MAX_DURATION: Duration = Duration::from_millis(300);
let buffer = random_data(SIZE);
let path = directory.join(".disk_bench_rand_wr.tmp");
let fp =
File::create(&path).map_err(|error| format!("failed to create a test file: {}", error))?;
let mut fp = TemporaryFile { fp: Some(fp), path };
// Since we want to test random writes we need an existing file
// through which we can seek, so here we just populate it with some data.
fp.write_all(&buffer)
.map_err(|error| format!("failed to write to the test file: {}", error))?;
fp.sync_all()
.map_err(|error| format!("failed to fsync the test file: {}", error))?;
// Generate a list of random positions at which we'll issue writes.
let mut positions = Vec::with_capacity(SIZE / 4096);
{
let mut position = 0;
while position < SIZE {
positions.push(position);
position += 4096;
}
}
positions.shuffle(&mut rng());
let run = || {
for &position in &positions {
fp.seek(SeekFrom::Start(position as u64))
.map_err(|error| format!("failed to seek in the test file: {}", error))?;
// Here we deliberately only write half of the chunk since we don't
// want the OS' disk scheduler to coalesce our writes into one single
// sequential write.
//
// Also the chunk's size is deliberately exactly half of a modern disk's
// sector size to trigger an RMW cycle.
let chunk = &buffer[position..position + 2048];
fp.write_all(&chunk)
.map_err(|error| format!("failed to write to the test file: {}", error))?;
}
fp.sync_all()
.map_err(|error| format!("failed to fsync the test file: {}", error))?;
Ok(())
};
// We only wrote half of the bytes hence `SIZE / 2`.
benchmark("disk random write score", SIZE / 2, MAX_ITERATIONS, MAX_DURATION, run)
}
/// Benchmarks the hardware and returns the results of those benchmarks.
///
/// Optionally accepts a path to a `scratch_directory` to use to benchmark the disk.
pub fn gather_hwbench(scratch_directory: Option<&Path>) -> HwBench {
#[allow(unused_mut)]
let mut hwbench = HwBench {
cpu_hashrate_score: benchmark_cpu(),
memory_memcpy_score: benchmark_memory(),
disk_sequential_write_score: None,
disk_random_write_score: None,
};
if let Some(scratch_directory) = scratch_directory {
hwbench.disk_sequential_write_score =
match benchmark_disk_sequential_writes(scratch_directory) {
Ok(score) => Some(score),
Err(error) => {
log::warn!("Failed to run the sequential write disk benchmark: {}", error);
None
},
};
hwbench.disk_random_write_score = match benchmark_disk_random_writes(scratch_directory) {
Ok(score) => Some(score),
Err(error) => {
log::warn!("Failed to run the random write disk benchmark: {}", error);
None
},
};
}
hwbench
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(target_os = "linux")]
#[test]
fn test_gather_sysinfo_linux() {
let sysinfo = gather_sysinfo();
assert!(sysinfo.cpu.unwrap().len() > 0);
assert!(sysinfo.core_count.unwrap() > 0);
assert!(sysinfo.memory.unwrap() > 0);
assert_ne!(sysinfo.is_virtual_machine, None);
assert_ne!(sysinfo.linux_kernel, None);
assert_ne!(sysinfo.linux_distro, None);
}
#[test]
fn test_benchmark_cpu() {
assert_ne!(benchmark_cpu(), 0);
}
#[test]
fn test_benchmark_memory() {
assert_ne!(benchmark_memory(), 0);
}
#[test]
fn test_benchmark_disk_sequential_writes() {
assert!(benchmark_disk_sequential_writes("./".as_ref()).unwrap() > 0);
}
#[test]
fn test_benchmark_disk_random_writes() {
assert!(benchmark_disk_random_writes("./".as_ref()).unwrap() > 0);
}
}
@@ -0,0 +1,101 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use regex::Regex;
use sc_telemetry::SysInfo;
use std::collections::HashSet;
fn read_file(path: &str) -> Option<String> {
match std::fs::read_to_string(path) {
Ok(data) => Some(data),
Err(error) => {
log::warn!("Failed to read '{}': {}", path, error);
None
},
}
}
fn extract<T>(data: &str, regex: &str) -> Option<T>
where
T: std::str::FromStr,
{
Regex::new(regex)
.expect("regex is correct; qed")
.captures(&data)?
.get(1)?
.as_str()
.parse()
.ok()
}
const LINUX_REGEX_CPU: &str = r#"(?m)^model name\s*:\s*([^\n]+)"#;
const LINUX_REGEX_PHYSICAL_ID: &str = r#"(?m)^physical id\s*:\s*(\d+)"#;
const LINUX_REGEX_CORE_ID: &str = r#"(?m)^core id\s*:\s*(\d+)"#;
const LINUX_REGEX_HYPERVISOR: &str = r#"(?m)^flags\s*:.+?\bhypervisor\b"#;
const LINUX_REGEX_MEMORY: &str = r#"(?m)^MemTotal:\s*(\d+) kB"#;
const LINUX_REGEX_DISTRO: &str = r#"(?m)^PRETTY_NAME\s*=\s*"?(.+?)"?$"#;
pub fn gather_linux_sysinfo(sysinfo: &mut SysInfo) {
if let Some(data) = read_file("/proc/cpuinfo") {
sysinfo.cpu = extract(&data, LINUX_REGEX_CPU);
sysinfo.is_virtual_machine =
Some(Regex::new(LINUX_REGEX_HYPERVISOR).unwrap().is_match(&data));
// The /proc/cpuinfo returns a list of all of the hardware threads.
//
// Here we extract all of the unique {CPU ID, core ID} pairs to get
// the total number of cores.
let mut set: HashSet<(u32, u32)> = HashSet::new();
for chunk in data.split("\n\n") {
let pid = extract(chunk, LINUX_REGEX_PHYSICAL_ID);
let cid = extract(chunk, LINUX_REGEX_CORE_ID);
if let (Some(pid), Some(cid)) = (pid, cid) {
set.insert((pid, cid));
}
}
if !set.is_empty() {
sysinfo.core_count = Some(set.len() as u32);
}
}
if let Some(data) = read_file("/proc/meminfo") {
sysinfo.memory = extract(&data, LINUX_REGEX_MEMORY).map(|memory: u64| memory * 1024);
}
if let Some(data) = read_file("/etc/os-release") {
sysinfo.linux_distro = extract(&data, LINUX_REGEX_DISTRO);
}
// NOTE: We don't use the `nix` crate to call this since it doesn't
// currently check for errors.
unsafe {
// SAFETY: The `utsname` is full of byte arrays, so this is safe.
let mut uname: libc::utsname = std::mem::zeroed();
if libc::uname(&mut uname) < 0 {
log::warn!("uname failed: {}", std::io::Error::last_os_error());
} else {
let length =
uname.release.iter().position(|&byte| byte == 0).unwrap_or(uname.release.len());
let release = std::slice::from_raw_parts(uname.release.as_ptr().cast(), length);
if let Ok(release) = std::str::from_utf8(release) {
sysinfo.linux_kernel = Some(release.into());
}
}
}
}