Add per-chain aggregate software/hardware telemetry (#464)

* Add per-chain aggregate software/hardware telemetry

* Fix tests' compilation

* Add extra comments for the `Counter` struct

* Replace the boolean argument with an enum

* Rename `replace_hwbench` to `update_hwbench`

* Move `Counter` into a separate file

* Move `ChainStatsCollator` to `chain_stats.rs`

* Fix incorrect key on the unknown table

* Improve types for the stats component; get rid of `any`
This commit is contained in:
Koute
2022-04-27 18:44:34 +09:00
committed by GitHub
parent 978c070bdd
commit 45878f9876
22 changed files with 1034 additions and 18 deletions
@@ -514,6 +514,7 @@ impl InnerLoop {
new_chain.finalized_block().height,
new_chain.finalized_block().hash,
));
feed_serializer.push(feed_message::ChainStatsUpdate(new_chain.stats()));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes));
}
@@ -122,6 +122,7 @@ actions! {
// We maintain existing IDs for backward compatibility.
20: StaleNode,
21: NodeIOUpdate<'_>,
22: ChainStatsUpdate<'_>,
}
#[derive(Serialize)]
@@ -202,3 +203,30 @@ impl FeedMessageWrite for AddedNode<'_> {
));
}
}
#[derive(Serialize)]
pub struct ChainStatsUpdate<'a>(pub &'a ChainStats);
#[derive(Serialize, PartialEq, Eq, Default)]
pub struct Ranking<K> {
pub list: Vec<(K, u64)>,
pub other: u64,
pub unknown: u64,
}
#[derive(Serialize, PartialEq, Eq, Default)]
pub struct ChainStats {
pub version: Ranking<String>,
pub target_os: Ranking<String>,
pub target_arch: Ranking<String>,
pub cpu: Ranking<String>,
pub memory: Ranking<(u32, Option<u32>)>,
pub core_count: Ranking<u32>,
pub linux_kernel: Ranking<String>,
pub linux_distro: Ranking<String>,
pub is_virtual_machine: Ranking<bool>,
pub cpu_hashrate_score: Ranking<(u32, Option<u32>)>,
pub memory_memcpy_score: Ranking<(u32, Option<u32>)>,
pub disk_sequential_write_score: Ranking<(u32, Option<u32>)>,
pub disk_random_write_score: Ranking<(u32, Option<u32>)>,
}
+55 -2
View File
@@ -21,10 +21,13 @@ use common::{id_type, time, DenseMap, MostSeen, NumStats};
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::str::FromStr;
use std::time::{Duration, Instant};
use crate::feed_message::{self, FeedMessageSerializer};
use crate::feed_message::{self, ChainStats, FeedMessageSerializer};
use crate::find_location;
use super::chain_stats::ChainStatsCollator;
use super::counter::CounterValue;
use super::node::Node;
id_type! {
@@ -35,6 +38,7 @@ id_type! {
pub type Label = Box<str>;
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
const STATS_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
pub struct Chain {
/// Labels that nodes use for this chain. We keep track of
@@ -56,6 +60,12 @@ pub struct Chain {
genesis_hash: BlockHash,
/// Maximum number of nodes allowed to connect from this chain
max_nodes: usize,
/// Collator for the stats.
stats_collator: ChainStatsCollator,
/// Stats for this chain.
stats: ChainStats,
/// Timestamp of when the stats were last regenerated.
stats_last_regenerated: Instant,
}
pub enum AddNodeResult {
@@ -105,6 +115,9 @@ impl Chain {
timestamp: None,
genesis_hash,
max_nodes,
stats_collator: Default::default(),
stats: Default::default(),
stats_last_regenerated: Instant::now(),
}
}
@@ -119,7 +132,11 @@ impl Chain {
return AddNodeResult::Overquota;
}
let node_chain_label = &node.details().chain;
let details = node.details();
self.stats_collator
.add_or_remove_node(details, None, CounterValue::Increment);
let node_chain_label = &details.chain;
let label_result = self.labels.insert(node_chain_label);
let node_id = self.nodes.add(node);
@@ -140,6 +157,10 @@ impl Chain {
}
};
let details = node.details();
self.stats_collator
.add_or_remove_node(details, node.hwbench(), CounterValue::Decrement);
let node_chain_label = &node.details().chain;
let label_result = self.labels.remove(node_chain_label);
@@ -181,6 +202,19 @@ impl Chain {
}
return;
}
Payload::HwBench(ref hwbench) => {
let new_hwbench = common::node_types::NodeHwBench {
cpu_hashrate_score: hwbench.cpu_hashrate_score,
memory_memcpy_score: hwbench.memory_memcpy_score,
disk_sequential_write_score: hwbench.disk_sequential_write_score,
disk_random_write_score: hwbench.disk_random_write_score,
};
let old_hwbench = node.update_hwbench(new_hwbench);
self.stats_collator
.update_hwbench(old_hwbench.as_ref(), CounterValue::Decrement);
self.stats_collator
.update_hwbench(node.hwbench(), CounterValue::Increment);
}
_ => {}
}
@@ -210,6 +244,7 @@ impl Chain {
let nodes_len = self.nodes.len();
self.update_stale_nodes(now, feed);
self.regenerate_stats_if_necessary(feed);
let node = match self.nodes.get_mut(nid) {
Some(node) => node,
@@ -300,6 +335,21 @@ impl Chain {
}
}
fn regenerate_stats_if_necessary(&mut self, feed: &mut FeedMessageSerializer) {
let now = Instant::now();
let elapsed = now - self.stats_last_regenerated;
if elapsed < STATS_UPDATE_INTERVAL {
return;
}
self.stats_last_regenerated = now;
let new_stats = self.stats_collator.generate();
if new_stats != self.stats {
self.stats = new_stats;
feed.push(feed_message::ChainStatsUpdate(&self.stats));
}
}
pub fn update_node_location(
&mut self,
node_id: ChainNodeId,
@@ -340,4 +390,7 @@ impl Chain {
pub fn genesis_hash(&self) -> BlockHash {
self.genesis_hash
}
pub fn stats(&self) -> &ChainStats {
&self.stats
}
}
@@ -0,0 +1,225 @@
// Source code for the Substrate Telemetry Server.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::counter::{Counter, CounterValue};
use crate::feed_message::ChainStats;
// These are the benchmark scores generated on our reference hardware.
const REFERENCE_CPU_SCORE: u64 = 1028;
const REFERENCE_MEMORY_SCORE: u64 = 14899;
const REFERENCE_DISK_SEQUENTIAL_WRITE_SCORE: u64 = 485;
const REFERENCE_DISK_RANDOM_WRITE_SCORE: u64 = 222;
macro_rules! buckets {
(@try $value:expr, $bucket_min:expr, $bucket_max:expr,) => {
if $value < $bucket_max {
return ($bucket_min, Some($bucket_max));
}
};
($value:expr, $bucket_min:expr, $bucket_max:expr, $($remaining:expr,)*) => {
buckets! { @try $value, $bucket_min, $bucket_max, }
buckets! { $value, $bucket_max, $($remaining,)* }
};
($value:expr, $bucket_last:expr,) => {
($bucket_last, None)
}
}
/// Translates a given raw benchmark score into a relative measure
/// of how the score compares to the reference score.
///
/// The value returned is the range (in percent) within which the given score
/// falls into. For example, a value of `(90, Some(110))` means that the score
/// is between 90% and 110% of the reference score, with the lower bound being
/// inclusive and the upper bound being exclusive.
fn bucket_score(score: u64, reference_score: u64) -> (u32, Option<u32>) {
let relative_score = ((score as f64 / reference_score as f64) * 100.0) as u32;
buckets! {
relative_score,
0,
10,
30,
50,
70,
90,
110,
130,
150,
200,
300,
400,
500,
}
}
#[test]
fn test_bucket_score() {
assert_eq!(bucket_score(0, 100), (0, Some(10)));
assert_eq!(bucket_score(9, 100), (0, Some(10)));
assert_eq!(bucket_score(10, 100), (10, Some(30)));
assert_eq!(bucket_score(29, 100), (10, Some(30)));
assert_eq!(bucket_score(30, 100), (30, Some(50)));
assert_eq!(bucket_score(100, 100), (90, Some(110)));
assert_eq!(bucket_score(500, 100), (500, None));
}
fn bucket_memory(memory: u64) -> (u32, Option<u32>) {
let memory = memory / (1024 * 1024) / 1000;
buckets! {
memory,
1,
2,
4,
6,
8,
10,
16,
24,
32,
48,
56,
64,
}
}
#[derive(Default)]
pub struct ChainStatsCollator {
version: Counter<String>,
target_os: Counter<String>,
target_arch: Counter<String>,
cpu: Counter<String>,
memory: Counter<(u32, Option<u32>)>,
core_count: Counter<u32>,
linux_kernel: Counter<String>,
linux_distro: Counter<String>,
is_virtual_machine: Counter<bool>,
cpu_hashrate_score: Counter<(u32, Option<u32>)>,
memory_memcpy_score: Counter<(u32, Option<u32>)>,
disk_sequential_write_score: Counter<(u32, Option<u32>)>,
disk_random_write_score: Counter<(u32, Option<u32>)>,
}
impl ChainStatsCollator {
pub fn add_or_remove_node(
&mut self,
details: &common::node_types::NodeDetails,
hwbench: Option<&common::node_types::NodeHwBench>,
op: CounterValue,
) {
self.version.modify(Some(&*details.version), op);
self.target_os
.modify(details.target_os.as_ref().map(|value| &**value), op);
self.target_arch
.modify(details.target_arch.as_ref().map(|value| &**value), op);
let sysinfo = details.sysinfo.as_ref();
self.cpu.modify(
sysinfo
.and_then(|sysinfo| sysinfo.cpu.as_ref())
.map(|value| &**value),
op,
);
let memory = sysinfo.and_then(|sysinfo| sysinfo.memory.map(bucket_memory));
self.memory.modify(memory.as_ref(), op);
self.core_count
.modify(sysinfo.and_then(|sysinfo| sysinfo.core_count.as_ref()), op);
self.linux_kernel.modify(
sysinfo
.and_then(|sysinfo| sysinfo.linux_kernel.as_ref())
.map(|value| &**value),
op,
);
self.linux_distro.modify(
sysinfo
.and_then(|sysinfo| sysinfo.linux_distro.as_ref())
.map(|value| &**value),
op,
);
self.is_virtual_machine.modify(
sysinfo.and_then(|sysinfo| sysinfo.is_virtual_machine.as_ref()),
op,
);
self.update_hwbench(hwbench, op);
}
pub fn update_hwbench(
&mut self,
hwbench: Option<&common::node_types::NodeHwBench>,
op: CounterValue,
) {
self.cpu_hashrate_score.modify(
hwbench
.map(|hwbench| bucket_score(hwbench.cpu_hashrate_score, REFERENCE_CPU_SCORE))
.as_ref(),
op,
);
self.memory_memcpy_score.modify(
hwbench
.map(|hwbench| bucket_score(hwbench.memory_memcpy_score, REFERENCE_MEMORY_SCORE))
.as_ref(),
op,
);
self.disk_sequential_write_score.modify(
hwbench
.and_then(|hwbench| hwbench.disk_sequential_write_score)
.map(|score| bucket_score(score, REFERENCE_DISK_SEQUENTIAL_WRITE_SCORE))
.as_ref(),
op,
);
self.disk_random_write_score.modify(
hwbench
.and_then(|hwbench| hwbench.disk_random_write_score)
.map(|score| bucket_score(score, REFERENCE_DISK_RANDOM_WRITE_SCORE))
.as_ref(),
op,
);
}
pub fn generate(&self) -> ChainStats {
ChainStats {
version: self.version.generate_ranking_top(10),
target_os: self.target_os.generate_ranking_top(10),
target_arch: self.target_arch.generate_ranking_top(10),
cpu: self.cpu.generate_ranking_top(10),
memory: self.memory.generate_ranking_ordered(),
core_count: self.core_count.generate_ranking_top(10),
linux_kernel: self.linux_kernel.generate_ranking_top(10),
linux_distro: self.linux_distro.generate_ranking_top(10),
is_virtual_machine: self.is_virtual_machine.generate_ranking_ordered(),
cpu_hashrate_score: self.cpu_hashrate_score.generate_ranking_top(10),
memory_memcpy_score: self.memory_memcpy_score.generate_ranking_ordered(),
disk_sequential_write_score: self
.disk_sequential_write_score
.generate_ranking_ordered(),
disk_random_write_score: self.disk_random_write_score.generate_ranking_ordered(),
}
}
}
+119
View File
@@ -0,0 +1,119 @@
// Source code for the Substrate Telemetry Server.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::feed_message::Ranking;
use std::collections::HashMap;
/// A data structure which counts how many occurences of a given key we've seen.
#[derive(Default)]
pub struct Counter<K> {
/// A map containing the number of occurences of a given key.
///
/// If there are none then the entry is removed.
map: HashMap<K, u64>,
/// The number of occurences where the key is `None`.
empty: u64,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum CounterValue {
Increment,
Decrement,
}
impl<K> Counter<K>
where
K: Sized + std::hash::Hash + Eq,
{
/// Either adds or removes a single occurence of a given `key`.
pub fn modify<'a, Q>(&mut self, key: Option<&'a Q>, op: CounterValue)
where
Q: ?Sized + std::hash::Hash + Eq,
K: std::borrow::Borrow<Q>,
Q: std::borrow::ToOwned<Owned = K>,
{
if let Some(key) = key {
if let Some(entry) = self.map.get_mut(key) {
match op {
CounterValue::Increment => {
*entry += 1;
}
CounterValue::Decrement => {
*entry -= 1;
if *entry == 0 {
// Don't keep entries for which there are no hits.
self.map.remove(key);
}
}
}
} else {
assert_eq!(op, CounterValue::Increment);
self.map.insert(key.to_owned(), 1);
}
} else {
match op {
CounterValue::Increment => {
self.empty += 1;
}
CounterValue::Decrement => {
self.empty -= 1;
}
}
}
}
/// Generates a top-N table of the most common keys.
pub fn generate_ranking_top(&self, max_count: usize) -> Ranking<K>
where
K: Clone,
{
let mut all: Vec<(&K, u64)> = self.map.iter().map(|(key, count)| (key, *count)).collect();
all.sort_unstable_by_key(|&(_, count)| !count);
let list = all
.iter()
.take(max_count)
.map(|&(key, count)| (key.clone(), count))
.collect();
let other = all
.iter()
.skip(max_count)
.fold(0, |sum, (_, count)| sum + *count);
Ranking {
list,
other,
unknown: self.empty,
}
}
/// Generates a sorted table of all of the keys.
pub fn generate_ranking_ordered(&self) -> Ranking<K>
where
K: Copy + Clone + Ord,
{
let mut list: Vec<(K, u64)> = self.map.iter().map(|(key, count)| (*key, *count)).collect();
list.sort_unstable_by_key(|&(key, count)| (key, !count));
Ranking {
list,
other: 0,
unknown: self.empty,
}
}
}
+2
View File
@@ -15,6 +15,8 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
mod chain;
mod chain_stats;
mod counter;
mod node;
mod state;
+13 -1
View File
@@ -17,7 +17,8 @@
use crate::find_location;
use common::node_message::SystemInterval;
use common::node_types::{
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeLocation, NodeStats, Timestamp,
Block, BlockDetails, NodeDetails, NodeHardware, NodeHwBench, NodeIO, NodeLocation, NodeStats,
Timestamp,
};
use common::time;
@@ -47,6 +48,8 @@ pub struct Node {
stale: bool,
/// Unix timestamp for when node started up (falls back to connection time)
startup_time: Option<Timestamp>,
/// Hardware benchmark results for the node
hwbench: Option<NodeHwBench>,
}
impl Node {
@@ -67,6 +70,7 @@ impl Node {
location: None,
stale: false,
startup_time,
hwbench: None,
}
}
@@ -110,6 +114,14 @@ impl Node {
&self.best
}
pub fn hwbench(&self) -> Option<&NodeHwBench> {
self.hwbench.as_ref()
}
pub fn update_hwbench(&mut self, hwbench: NodeHwBench) -> Option<NodeHwBench> {
self.hwbench.replace(hwbench)
}
pub fn update_block(&mut self, block: Block) -> bool {
if block.height > self.best.block.height {
self.stale = false;
+8 -1
View File
@@ -15,7 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::node::Node;
use crate::feed_message::FeedMessageSerializer;
use crate::feed_message::{ChainStats, FeedMessageSerializer};
use crate::find_location;
use common::node_message::Payload;
use common::node_types::{Block, BlockHash, NodeDetails, Timestamp};
@@ -277,6 +277,9 @@ impl<'a> StateChain<'a> {
pub fn nodes_slice(&self) -> &[Option<Node>] {
self.chain.nodes_slice()
}
pub fn stats(&self) -> &ChainStats {
self.chain.stats()
}
}
#[cfg(test)]
@@ -289,10 +292,14 @@ mod test {
chain: chain.into(),
name: name.into(),
implementation: "Bar".into(),
target_arch: Some("x86_64".into()),
target_os: Some("linux".into()),
target_env: Some("env".into()),
version: "0.1".into(),
validator: None,
network_id: NetworkId::new(),
startup_time: None,
sysinfo: None,
}
}