From 06bd660599abaed354e01b0459567ed0fdbe6974 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 30 Jun 2021 14:20:58 +0100 Subject: [PATCH] Passing a callback isn't worth the extra code; just pass a feed thing --- .../telemetry/src/aggregator/inner_loop.rs | 50 +------- backend/telemetry/src/state/chain.rs | 117 +++++------------- backend/telemetry/src/state/state.rs | 15 +-- 3 files changed, 42 insertions(+), 140 deletions(-) diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 65fb478..2d72acf 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -13,7 +13,7 @@ use std::{net::{IpAddr, Ipv4Addr}, str::FromStr}; use futures::channel::{ mpsc }; use futures::{ SinkExt, StreamExt }; use std::collections::{ HashMap, HashSet }; -use crate::state::{ self, State, NodeId, OnUpdateNode }; +use crate::state::{ self, State, NodeId }; use crate::feed_message::{ self, FeedMessageSerializer }; use crate::find_location; @@ -304,54 +304,8 @@ impl InnerLoop { async fn handle_from_shard_update(&mut self, node_id: NodeId, payload: node::Payload) { let mut feed_message_serializer = FeedMessageSerializer::new(); - let mut broadcast_finality = false; - self.node_state.update_node(node_id, payload, |msg| { - match msg { - OnUpdateNode::StaleNode(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::BestBlock(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::BestFinalized(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::ImportedBlock(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::Hardware(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::NodeStatsUpdate(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::NodeIOUpdate(to_feed) => { - feed_message_serializer.push(to_feed); - } - OnUpdateNode::AfgFinalized(to_feed) => { - feed_message_serializer.push(to_feed); - // All messages sent in an update leading to this message are only - // broadcast to feeds subscribed to chain finality info - broadcast_finality = true; - } - OnUpdateNode::AfgReceivedPrecommit(to_feed) => { - feed_message_serializer.push(to_feed); - // All messages sent in an update leading to this message are only - // broadcast to feeds subscribed to chain finality info - broadcast_finality = true; - } - OnUpdateNode::AfgReceivedPrevote(to_feed) => { - feed_message_serializer.push(to_feed); - // All messages sent in an update leading to this message are only - // broadcast to feeds subscribed to chain finality info - broadcast_finality = true; - } - OnUpdateNode::FinalizedBlock(to_feed) => { - feed_message_serializer.push(to_feed); - } - } - }); + let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer); if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { let genesis_hash = *chain.genesis_hash(); diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index 5046a32..8bdd8b9 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -6,7 +6,7 @@ use common::most_seen::MostSeen; use common::node::Payload; use once_cell::sync::Lazy; -use crate::feed_message; +use crate::feed_message::{self, FeedMessageSerializer}; use super::node::Node; use super::NodeId; @@ -46,54 +46,6 @@ pub struct RemoveNodeResult { pub chain_renamed: bool } -/// When we update a node, we subscribe and receive various messages -/// about the update that take this form. The expectation is that we'll -/// push and broadcast these messages to feeds. This allows the caller to -/// retain control over exactly how/when that will happen. -pub enum OnUpdateNode<'a> { - StaleNode(feed_message::StaleNode), - BestBlock(feed_message::BestBlock), - BestFinalized(feed_message::BestFinalized), - ImportedBlock(feed_message::ImportedBlock<'a>), - Hardware(feed_message::Hardware<'a>), - NodeStatsUpdate(feed_message::NodeStatsUpdate<'a>), - NodeIOUpdate(feed_message::NodeIOUpdate<'a>), - AfgFinalized(feed_message::AfgFinalized), - AfgReceivedPrecommit(feed_message::AfgReceivedPrecommit), - AfgReceivedPrevote(feed_message::AfgReceivedPrevote), - FinalizedBlock(feed_message::FinalizedBlock) -} - -macro_rules! into_on_update { - ($name:ident) => { - impl <'a> From for OnUpdateNode<'a> { - fn from(val: feed_message::$name) -> Self { - OnUpdateNode::$name(val) - } - } - } -} -macro_rules! into_on_update_lt { - ($name:ident) => { - impl <'a> From> for OnUpdateNode<'a> { - fn from(val: feed_message::$name<'a>) -> Self { - OnUpdateNode::$name(val) - } - } - } -} -into_on_update!(StaleNode); -into_on_update!(BestBlock); -into_on_update!(BestFinalized); -into_on_update_lt!(ImportedBlock); -into_on_update_lt!(Hardware); -into_on_update_lt!(NodeStatsUpdate); -into_on_update_lt!(NodeIOUpdate); -into_on_update!(AfgFinalized); -into_on_update!(AfgReceivedPrecommit); -into_on_update!(AfgReceivedPrevote); -into_on_update!(FinalizedBlock); - /// Labels of chains we consider "first party". These chains allow any /// number of nodes to connect. static FIRST_PARTY_NETWORKS: Lazy> = Lazy::new(|| { @@ -157,44 +109,44 @@ impl Chain { } /// Attempt to update the best block seen in this chain. - pub fn update_node(&mut self, all_nodes: &mut DenseMap, nid: NodeId, payload: Payload, mut on_update: OnUpdate) - where OnUpdate: FnMut(OnUpdateNode) - { + /// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false). + pub fn update_node(&mut self, all_nodes: &mut DenseMap, nid: NodeId, payload: Payload, feed: &mut FeedMessageSerializer) -> bool { + if let Some(block) = payload.best_block() { - self.handle_block(all_nodes, block, nid, &mut on_update); + self.handle_block(all_nodes, block, nid, feed); } if let Some(node) = all_nodes.get_mut(nid) { match payload { Payload::SystemInterval(ref interval) => { if node.update_hardware(interval) { - on_update(feed_message::Hardware(nid, node.hardware()).into()); + feed.push(feed_message::Hardware(nid, node.hardware())); } if let Some(stats) = node.update_stats(interval) { - on_update(feed_message::NodeStatsUpdate(nid, stats).into()); + feed.push(feed_message::NodeStatsUpdate(nid, stats)); } if let Some(io) = node.update_io(interval) { - on_update(feed_message::NodeIOUpdate(nid, io).into()); + feed.push(feed_message::NodeIOUpdate(nid, io)); } } Payload::AfgAuthoritySet(authority) => { node.set_validator_address(authority.authority_id.clone()); - return; + return false; } Payload::AfgFinalized(finalized) => { if let Ok(finalized_number) = finalized.finalized_number.parse::() { if let Some(addr) = node.details().validator.clone() { - on_update(feed_message::AfgFinalized( + feed.push(feed_message::AfgFinalized( addr, finalized_number, finalized.finalized_hash, - ).into()); + )); } } - return; + return true; } Payload::AfgReceivedPrecommit(precommit) => { if let Ok(finalized_number) = @@ -202,15 +154,15 @@ impl Chain { { if let Some(addr) = node.details().validator.clone() { let voter = precommit.voter.clone(); - on_update(feed_message::AfgReceivedPrecommit( + feed.push(feed_message::AfgReceivedPrecommit( addr, finalized_number, precommit.target_hash, voter, - ).into()); + )); } } - return; + return true; } Payload::AfgReceivedPrevote(prevote) => { if let Ok(finalized_number) = @@ -218,15 +170,15 @@ impl Chain { { if let Some(addr) = node.details().validator.clone() { let voter = prevote.voter.clone(); - on_update(feed_message::AfgReceivedPrevote( + feed.push(feed_message::AfgReceivedPrevote( addr, finalized_number, prevote.target_hash, voter, - ).into()); + )); } } - return; + return true; } Payload::AfgReceivedCommit(_) => {} _ => (), @@ -234,29 +186,29 @@ impl Chain { if let Some(block) = payload.finalized_block() { if let Some(finalized) = node.update_finalized(block) { - on_update(feed_message::FinalizedBlock( + feed.push(feed_message::FinalizedBlock( nid, finalized.height, finalized.hash, - ).into()); + )); if finalized.height > self.finalized.height { self.finalized = *finalized; - on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into()); + feed.push(feed_message::BestFinalized(finalized.height, finalized.hash)); } } } } + + false } - fn handle_block(&mut self, all_nodes: &mut DenseMap, block: &Block, nid: NodeId, mut on_update: OnUpdate) - where OnUpdate: FnMut(OnUpdateNode) - { + fn handle_block(&mut self, all_nodes: &mut DenseMap, block: &Block, nid: NodeId, feed: &mut FeedMessageSerializer) { let mut propagation_time = None; let now = now(); let nodes_len = self.node_ids.len(); - self.update_stale_nodes(all_nodes, now, &mut on_update); + self.update_stale_nodes(all_nodes, now, feed); let node = match all_nodes.get_mut(nid) { Some(node) => node, @@ -278,11 +230,11 @@ impl Chain { self.average_block_time = Some(self.block_times.average()); } self.timestamp = Some(now); - on_update(feed_message::BestBlock( + feed.push(feed_message::BestBlock( self.best.height, now, self.average_block_time, - ).into()); + )); propagation_time = Some(0); } else if block.height == self.best.height { if let Some(timestamp) = self.timestamp { @@ -291,16 +243,15 @@ impl Chain { } if let Some(details) = node.update_details(now, propagation_time) { - on_update(feed_message::ImportedBlock(nid, details).into()); + feed.push(feed_message::ImportedBlock(nid, details)); } } } /// Check if the chain is stale (has not received a new best block in a while). /// If so, find a new best block, ignoring any stale nodes and marking them as such. - fn update_stale_nodes(&mut self, all_nodes: &mut DenseMap, now: u64, mut on_update: OnUpdate) - where OnUpdate: FnMut(OnUpdateNode) - { + fn update_stale_nodes(&mut self, all_nodes: &mut DenseMap, now: u64, feed: &mut FeedMessageSerializer) { + let threshold = now - STALE_TIMEOUT; let timestamp = match self.timestamp { Some(ts) => ts, @@ -331,7 +282,7 @@ impl Chain { finalized = *node.finalized(); } } else { - on_update(feed_message::StaleNode(nid).into()); + feed.push(feed_message::StaleNode(nid)); } } @@ -341,12 +292,12 @@ impl Chain { self.block_times.reset(); self.timestamp = timestamp; - on_update(feed_message::BestBlock( + feed.push(feed_message::BestBlock( self.best.height, timestamp.unwrap_or(now), None, - ).into()); - on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into()); + )); + feed.push(feed_message::BestFinalized(finalized.height, finalized.hash)); } } diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry/src/state/state.rs index 1fcfcae..96fd2ae 100644 --- a/backend/telemetry/src/state/state.rs +++ b/backend/telemetry/src/state/state.rs @@ -5,6 +5,7 @@ use common::types::{Block, NodeDetails, Timestamp}; use common::util::{DenseMap}; use common::node::Payload; use std::iter::IntoIterator; +use crate::feed_message::FeedMessageSerializer; use crate::find_location; use super::chain::{ self, Chain }; @@ -52,9 +53,6 @@ pub struct NodeAddedToChain<'a> { pub has_chain_label_changed: bool } -/// During a node update, we get given various messages about the update -pub type OnUpdateNode<'a> = chain::OnUpdateNode<'a>; - /// if removing a node is successful, we get this information back. pub struct RemovedNode { /// How many nodes remain on the chain (0 if the chain was removed) @@ -217,20 +215,19 @@ impl State { } /// Attempt to update the best block seen, given a node and block. - pub fn update_node(&mut self, node_id: NodeId, payload: Payload, on_update: OnUpdate) - where OnUpdate: FnMut(OnUpdateNode) - { + /// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false). + pub fn update_node(&mut self, node_id: NodeId, payload: Payload, feed: &mut FeedMessageSerializer) -> bool { let chain_id = match self.chains_by_node.get(&node_id) { Some(chain_id) => *chain_id, - None => { log::error!("Cannot find chain_id for node with ID {}", node_id); return } + None => { log::error!("Cannot find chain_id for node with ID {}", node_id); return false } }; let chain = match self.chains.get_mut(chain_id) { Some(chain) => chain, - None => { log::error!("Cannot find chain for node with ID {}", node_id); return } + None => { log::error!("Cannot find chain for node with ID {}", node_id); return false } }; - chain.update_node(&mut self.nodes, node_id, payload, on_update); + chain.update_node(&mut self.nodes, node_id, payload, feed) } /// Update the location for a node. Return `false` if the node was not found.