Backend: add parallel cpu benchmarks processing (#603)

* Add parallel cpu benchmarks processing

* Expose Parallel Hashrate Score in UI (as 'Parallel CPU speed')

* Remove unused variable and fix cfg(debug)

* Fix tests after removing old_chain_name

* Update Dockerfile

* bullseye for both images

---------

Co-authored-by: Volodymyr Brazhnyk <volbr@pm.me>
This commit is contained in:
James Wilson
2025-08-28 14:52:08 +01:00
committed by GitHub
parent 635776328d
commit 4a5cd54cd8
11 changed files with 39 additions and 16 deletions
+3 -3
View File
@@ -1,14 +1,14 @@
FROM docker.io/paritytech/ci-linux:production as builder
FROM docker.io/paritytech/ci-unified:bullseye-1.88.0-2025-06-27-v202507221446 AS builder
ARG PROFILE=release
WORKDIR /app
COPY . .
RUN cargo build --${PROFILE} --bins
RUN cargo build --${PROFILE} --bins -j 2
# MAIN IMAGE FOR PEOPLE TO PULL --- small one#
FROM docker.io/debian:buster-slim
FROM docker.io/debian:bullseye-slim
LABEL maintainer="Parity Technologies"
LABEL description="Substrate Telemetry Backend shard/core binaries, static build"
+3
View File
@@ -98,6 +98,9 @@ pub struct NodeHwBench {
pub memory_memcpy_score: u64,
pub disk_sequential_write_score: Option<u64>,
pub disk_random_write_score: Option<u64>,
pub parallel_cpu_hashrate_score: Option<u64>,
//// Dev note: this exists but isn't needed yet:
// pub parallel_cpu_cores: Option<usize>,
}
impl Payload {
+5
View File
@@ -73,6 +73,11 @@ pub struct NodeHwBench {
pub disk_sequential_write_score: Option<u64>,
/// Random disk write speed in MB/s.
pub disk_random_write_score: Option<u64>,
/// The parallel CPU speed, as measured in how many MB/s it can hash using the BLAKE2b-256 hash.
pub parallel_cpu_hashrate_score: Option<u64>,
//// Dev note: this exists but isn't needed yet:
// /// The number of cores used for the parallel CPU benchmark.
// pub parallel_cpu_cores: Option<usize>,
}
/// A couple of node statistics.
@@ -246,6 +246,7 @@ pub struct ChainStats {
pub linux_distro: Ranking<String>,
pub is_virtual_machine: Ranking<bool>,
pub cpu_hashrate_score: Ranking<(u32, Option<u32>)>,
pub parallel_cpu_hashrate_score: Ranking<(u32, Option<u32>)>,
pub memory_memcpy_score: Ranking<(u32, Option<u32>)>,
pub disk_sequential_write_score: Ranking<(u32, Option<u32>)>,
pub disk_random_write_score: Ranking<(u32, Option<u32>)>,
@@ -213,6 +213,7 @@ impl Chain {
memory_memcpy_score: hwbench.memory_memcpy_score,
disk_sequential_write_score: hwbench.disk_sequential_write_score,
disk_random_write_score: hwbench.disk_random_write_score,
parallel_cpu_hashrate_score: hwbench.parallel_cpu_hashrate_score,
};
let old_hwbench = node.update_hwbench(new_hwbench);
// The `hwbench` for this node has changed, send an updated "add node".
@@ -149,6 +149,7 @@ pub struct ChainStatsCollator {
disk_sequential_write_score: Counter<(u32, Option<u32>)>,
disk_random_write_score: Counter<(u32, Option<u32>)>,
cpu_vendor: Counter<String>,
parallel_cpu_hashrate_score: Counter<(u32, Option<u32>)>,
}
impl ChainStatsCollator {
@@ -241,6 +242,14 @@ impl ChainStatsCollator {
.as_ref(),
op,
);
self.parallel_cpu_hashrate_score.modify(
hwbench
.and_then(|hwbench| hwbench.parallel_cpu_hashrate_score)
.map(|score| bucket_score(score, REFERENCE_CPU_SCORE))
.as_ref(),
op,
);
}
pub fn generate(&self) -> ChainStats {
@@ -261,6 +270,7 @@ impl ChainStatsCollator {
.generate_ranking_ordered(),
disk_random_write_score: self.disk_random_write_score.generate_ranking_ordered(),
cpu_vendor: self.cpu_vendor.generate_ranking_top(10),
parallel_cpu_hashrate_score: self.parallel_cpu_hashrate_score.generate_ranking_top(10),
}
}
}
+2 -11
View File
@@ -79,8 +79,6 @@ impl<'a> AddNodeResult<'a> {
pub struct NodeAddedToChain<'a> {
/// The ID assigned to this node.
pub id: NodeId,
/// The old label of the chain.
pub old_chain_label: Box<str>,
/// The new label of the chain.
pub new_chain_label: &'a str,
/// The node that was added.
@@ -97,8 +95,6 @@ pub struct RemovedNode {
pub chain_node_count: usize,
/// Has the chain label been updated?
pub has_chain_label_changed: bool,
/// The old label of the chain.
pub old_chain_label: Box<str>,
/// Genesis hash of the chain to be updated.
pub chain_genesis_hash: BlockHash,
/// The new label of the chain.
@@ -164,7 +160,6 @@ impl State {
);
let node = Node::new(node_details);
let old_chain_label = chain.label().into();
match chain.add_node(node) {
chain::AddNodeResult::Overquota => AddNodeResult::ChainOverQuota,
@@ -174,7 +169,6 @@ impl State {
AddNodeResult::NodeAddedToChain(NodeAddedToChain {
id: NodeId(chain_id, id),
node: chain.get_node(id).expect("node added above"),
old_chain_label,
new_chain_label: chain.label(),
chain_node_count: chain.node_count(),
has_chain_label_changed: chain_renamed,
@@ -186,7 +180,6 @@ impl State {
/// Remove a node
pub fn remove_node(&mut self, NodeId(chain_id, chain_node_id): NodeId) -> Option<RemovedNode> {
let chain = self.chains.get_mut(chain_id)?;
let old_chain_label = chain.label().into();
// Actually remove the node
let remove_result = chain.remove_node(chain_node_id);
@@ -204,7 +197,6 @@ impl State {
}
Some(RemovedNode {
old_chain_label,
new_chain_label,
chain_node_count,
chain_genesis_hash,
@@ -320,12 +312,11 @@ mod test {
};
assert_eq!(add_node_result.id, NodeId(0.into(), 0.into()));
assert_eq!(&*add_node_result.old_chain_label, "");
assert_eq!(&*add_node_result.new_chain_label, "Chain One");
assert_eq!(add_node_result.chain_node_count, 1);
assert_eq!(add_node_result.has_chain_label_changed, true);
let add_result = state.add_node(chain1_genesis, node("A", "Chain One"));
let add_result = state.add_node(chain1_genesis, node("A", "Chain Two"));
let add_node_result = match add_result {
AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"),
@@ -334,7 +325,7 @@ mod test {
};
assert_eq!(add_node_result.id, NodeId(0.into(), 1.into()));
assert_eq!(&*add_node_result.old_chain_label, "Chain One");
// Chain One and Chain Two as common, so Chain One is kept.
assert_eq!(&*add_node_result.new_chain_label, "Chain One");
assert_eq!(add_node_result.chain_node_count, 2);
assert_eq!(add_node_result.has_chain_label_changed, false);
@@ -211,6 +211,9 @@ pub struct NodeHwBench {
pub memory_memcpy_score: u64,
pub disk_sequential_write_score: Option<u64>,
pub disk_random_write_score: Option<u64>,
pub parallel_cpu_hashrate_score: Option<u64>,
//// This exists but isn't needed at the moment:
// pub parallel_cpu_cores: Option<usize>,
}
impl From<NodeHwBench> for node_types::NodeHwBench {
@@ -220,6 +223,7 @@ impl From<NodeHwBench> for node_types::NodeHwBench {
memory_memcpy_score: hwbench.memory_memcpy_score,
disk_sequential_write_score: hwbench.disk_sequential_write_score,
disk_random_write_score: hwbench.disk_random_write_score,
parallel_cpu_hashrate_score: hwbench.parallel_cpu_hashrate_score,
}
}
}
@@ -231,6 +235,7 @@ impl From<NodeHwBench> for internal::NodeHwBench {
memory_memcpy_score: msg.memory_memcpy_score,
disk_sequential_write_score: msg.disk_sequential_write_score,
disk_random_write_score: msg.disk_random_write_score,
parallel_cpu_hashrate_score: msg.parallel_cpu_hashrate_score,
}
}
}
+2 -2
View File
@@ -323,14 +323,14 @@ where
// Deserialize from JSON, warning in debug mode if deserialization fails:
let node_message: json_message::NodeMessage = match serde_json::from_slice(&bytes) {
Ok(node_message) => node_message,
#[cfg(debug)]
#[cfg(debug_assertions)]
Err(e) => {
let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
log::warn!("Failed to parse node message ({msg_start}): {e}");
continue;
},
#[cfg(not(debug))]
#[cfg(not(debug_assertions))]
Err(_) => {
continue;
}
+1
View File
@@ -131,6 +131,7 @@ export type ChainStats = {
linux_distro: Maybe<Ranking<string>>;
linux_kernel: Maybe<Ranking<string>>;
cpu_hashrate_score: Maybe<Ranking<Range>>;
parallel_cpu_hashrate_score: Maybe<Ranking<Range>>;
memory_memcpy_score: Maybe<Ranking<Range>>;
disk_sequential_write_score: Maybe<Ranking<Range>>;
disk_random_write_score: Maybe<Ranking<Range>>;
+6
View File
@@ -174,6 +174,12 @@ export class Stats extends React.Component<StatsProps> {
formatScore,
stats.cpu_hashrate_score
);
add(
'parallel_cpi_hashrate_score',
'Parallel CPU Speed',
formatScore,
stats.parallel_cpu_hashrate_score
);
add(
'memory_memcpy_score',
'Memory Speed',