diff --git a/backend/common/src/node_types.rs b/backend/common/src/node_types.rs index 1fdc5d1..5cafd1b 100644 --- a/backend/common/src/node_types.rs +++ b/backend/common/src/node_types.rs @@ -46,7 +46,7 @@ pub struct NodeDetails { } /// Hardware and software information for the node. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct NodeSysInfo { /// The exact CPU model. pub cpu: Option>, @@ -63,7 +63,7 @@ pub struct NodeSysInfo { } /// Hardware benchmark results for the node. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct NodeHwBench { /// The CPU speed, as measured in how many MB/s it can hash using the BLAKE2b-256 hash. pub cpu_hashrate_score: u64, diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index c4c7941..1442d78 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -44,8 +44,9 @@ pub struct AggregatorOpts { /// How many nodes from third party chains are allowed to connect /// before we prevent connections from them. pub max_third_party_nodes: usize, - /// Flag to expose the IP addresses of all connected nodes to the feed subscribers. - pub expose_node_ips: bool, + /// Flag to expose the node's details (IP address, SysInfo, HwBench) of all connected + /// nodes to the feed subscribers. + pub expose_node_details: bool, } struct AggregatorInternal { diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index b72af01..1c8559d 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -174,8 +174,9 @@ pub struct InnerLoop { /// are prioritised and dropped to try and get back on track. max_queue_len: usize, - /// Flag to expose the IP addresses of all connected nodes to the feed subscribers. - expose_node_ips: bool, + /// Flag to expose the node's details (IP address, SysInfo, HwBench) of all connected + /// nodes to the feed subscribers. + expose_node_details: bool, } impl InnerLoop { @@ -189,7 +190,7 @@ impl InnerLoop { chain_to_feed_conn_ids: MultiMapUnique::new(), tx_to_locator, max_queue_len: opts.max_queue_len, - expose_node_ips: opts.expose_node_ips, + expose_node_details: opts.expose_node_details, } } @@ -326,7 +327,7 @@ impl InnerLoop { genesis_hash, } => { // Conditionally modify the node's details to include the IP address. - node.ip = self.expose_node_ips.then_some(ip.to_string().into()); + node.ip = self.expose_node_details.then_some(ip.to_string().into()); match self.node_state.add_node(genesis_hash, node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { @@ -360,6 +361,7 @@ impl InnerLoop { feed_messages_for_chain.push(feed_message::AddedNode( node_id.get_chain_node_id().into(), &details.node, + self.expose_node_details, )); self.finalize_and_broadcast_to_chain_feeds( &genesis_hash, @@ -409,8 +411,12 @@ impl InnerLoop { }; let mut feed_message_serializer = FeedMessageSerializer::new(); - self.node_state - .update_node(node_id, payload, &mut feed_message_serializer); + self.node_state.update_node( + node_id, + payload, + &mut feed_message_serializer, + self.expose_node_details, + ); if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { let genesis_hash = chain.genesis_hash(); @@ -531,7 +537,11 @@ impl InnerLoop { .iter() .filter_map(|&(idx, n)| n.as_ref().map(|n| (idx, n))) { - feed_serializer.push(feed_message::AddedNode(node_id, node)); + feed_serializer.push(feed_message::AddedNode( + node_id, + node, + self.expose_node_details, + )); feed_serializer.push(feed_message::FinalizedBlock( node_id, node.finalized().height, diff --git a/backend/telemetry_core/src/feed_message.rs b/backend/telemetry_core/src/feed_message.rs index dc5fe4d..daaf231 100644 --- a/backend/telemetry_core/src/feed_message.rs +++ b/backend/telemetry_core/src/feed_message.rs @@ -134,7 +134,7 @@ pub struct BestBlock(pub BlockNumber, pub Timestamp, pub Option); #[derive(Serialize)] pub struct BestFinalized(pub BlockNumber, pub BlockHash); -pub struct AddedNode<'a>(pub FeedNodeId, pub &'a Node); +pub struct AddedNode<'a>(pub FeedNodeId, pub &'a Node, pub bool); #[derive(Serialize)] pub struct RemovedNode(pub FeedNodeId); @@ -180,16 +180,26 @@ pub struct StaleNode(pub FeedNodeId); impl FeedMessageWrite for AddedNode<'_> { fn write_to_feed(&self, ser: &mut FeedMessageSerializer) { - let AddedNode(nid, node) = self; + let AddedNode(nid, node, expose_node_details) = self; let details = node.details(); + // Hide the ip, sysinfo and hwbench if the `expose_node_details` flag was not specified. + let node_hwbench = node.hwbench(); + let (ip, sys_info, hwbench) = if *expose_node_details { + (&details.ip, &details.sysinfo, &node_hwbench) + } else { + (&None, &None, &None) + }; + let details = ( &details.name, &details.implementation, &details.version, &details.validator, &details.network_id, - &details.ip, + &ip, + &sys_info, + &hwbench, ); ser.write(&( diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 79c1a71..33717f0 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -82,9 +82,10 @@ struct Opts { /// How many nodes from third party chains are allowed to connect before we prevent connections from them. #[structopt(long, default_value = "1000")] max_third_party_nodes: usize, - /// Flag to expose the IP addresses of all connected nodes to the feed subscribers. + /// Flag to expose the node's details (IP address, SysInfo, HwBench) of all connected + /// nodes to the feed subscribers. #[structopt(long)] - pub expose_node_ips: bool, + pub expose_node_details: bool, } fn main() { @@ -135,7 +136,7 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> max_queue_len: aggregator_queue_len, denylist: opts.denylist, max_third_party_nodes: opts.max_third_party_nodes, - expose_node_ips: opts.expose_node_ips, + expose_node_details: opts.expose_node_details, }, ) .await?; diff --git a/backend/telemetry_core/src/state/chain.rs b/backend/telemetry_core/src/state/chain.rs index 1e1ca37..8e265a8 100644 --- a/backend/telemetry_core/src/state/chain.rs +++ b/backend/telemetry_core/src/state/chain.rs @@ -175,6 +175,7 @@ impl Chain { nid: ChainNodeId, payload: Payload, feed: &mut FeedMessageSerializer, + expose_node_details: bool, ) { if let Some(block) = payload.best_block() { self.handle_block(block, nid, feed); @@ -198,7 +199,11 @@ impl Chain { // If our node validator address (and thus details) change, send an // updated "add node" feed message: if node.set_validator_address(authority.authority_id.clone()) { - feed.push(feed_message::AddedNode(nid.into(), &node)); + feed.push(feed_message::AddedNode( + nid.into(), + &node, + expose_node_details, + )); } return; } @@ -210,6 +215,17 @@ impl Chain { disk_random_write_score: hwbench.disk_random_write_score, }; let old_hwbench = node.update_hwbench(new_hwbench); + // The `hwbench` for this node has changed, send an updated "add node". + // Note: There is no need to send this message if the details + // will not be serialized over the wire. + if expose_node_details { + feed.push(feed_message::AddedNode( + nid.into(), + &node, + expose_node_details, + )); + } + self.stats_collator .update_hwbench(old_hwbench.as_ref(), CounterValue::Decrement); self.stats_collator diff --git a/backend/telemetry_core/src/state/state.rs b/backend/telemetry_core/src/state/state.rs index 5385e24..f34da06 100644 --- a/backend/telemetry_core/src/state/state.rs +++ b/backend/telemetry_core/src/state/state.rs @@ -218,6 +218,7 @@ impl State { NodeId(chain_id, chain_node_id): NodeId, payload: Payload, feed: &mut FeedMessageSerializer, + expose_node_details: bool, ) { let chain = match self.chains.get_mut(chain_id) { Some(chain) => chain, @@ -227,7 +228,7 @@ impl State { } }; - chain.update_node(chain_node_id, payload, feed) + chain.update_node(chain_node_id, payload, feed, expose_node_details) } /// Update the location for a node. Return `false` if the node was not found. diff --git a/backend/test_utils/src/feed_message_de.rs b/backend/test_utils/src/feed_message_de.rs index b46e614..aa0d724 100644 --- a/backend/test_utils/src/feed_message_de.rs +++ b/backend/test_utils/src/feed_message_de.rs @@ -16,7 +16,8 @@ use anyhow::Context; use common::node_types::{ - BlockDetails, BlockHash, BlockNumber, NodeLocation, NodeStats, Timestamp, + BlockDetails, BlockHash, BlockNumber, NodeHwBench, NodeLocation, NodeStats, NodeSysInfo, + Timestamp, }; use serde_json::value::RawValue; @@ -41,6 +42,7 @@ pub enum FeedMessage { block_details: BlockDetails, location: Option, startup_time: Option, + hwbench: Option, }, RemovedNode { node_id: usize, @@ -135,6 +137,7 @@ pub struct NodeDetails { pub validator: Option, pub network_id: Option, pub ip: Option, + pub sysinfo: Option, } impl FeedMessage { @@ -186,7 +189,7 @@ impl FeedMessage { 3 => { let ( node_id, - (name, implementation, version, validator, network_id, ip), + (name, implementation, version, validator, network_id, ip, sysinfo, hwbench), stats, io, hardware, @@ -207,11 +210,13 @@ impl FeedMessage { validator, network_id, ip, + sysinfo, }, stats, block_details, location, startup_time, + hwbench, } } // RemoveNode