diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 45b9c9f..65fb478 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -13,7 +13,7 @@ use std::{net::{IpAddr, Ipv4Addr}, str::FromStr}; use futures::channel::{ mpsc }; use futures::{ SinkExt, StreamExt }; use std::collections::{ HashMap, HashSet }; -use crate::state::{ self, State, NodeId }; +use crate::state::{ self, State, NodeId, OnUpdateNode }; use crate::feed_message::{ self, FeedMessageSerializer }; use crate::find_location; @@ -201,7 +201,7 @@ impl InnerLoop { )); let chain_genesis_hash = self.node_state - .get_node_chain(node_id) + .get_chain_by_node_id(node_id) .map(|chain| *chain.genesis_hash()); if let Some(chain_genesis_hash) = chain_genesis_hash { @@ -279,48 +279,14 @@ impl InnerLoop { self.remove_nodes_and_broadcast_result(Some(node_id)).await; }, FromShardWebsocket::Update { local_id, payload } => { - // TODO: Fill this all in... let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) { - Some(id) => id, - None => return + Some(id) => *id, + None => { + log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id); + return + } }; - - if let Some(block) = payload.best_block() { - - } - - match payload { - node::Payload::SystemInterval(system_interval) => { - - }, - node::Payload::AfgAuthoritySet(_) => { - - }, - node::Payload::AfgFinalized(_) => { - - }, - node::Payload::AfgReceivedPrecommit(_) => { - - }, - node::Payload::AfgReceivedPrevote(_) => { - - }, - // This message should have been handled before the payload made it this far: - node::Payload::SystemConnected(_) => { - unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard"); - }, - // The following messages aren't handled at the moment. List them explicitly so - // that we have to make an explicit choice for any new messages: - node::Payload::BlockImport(_) | - node::Payload::NotifyFinalized(_) | - node::Payload::AfgReceivedCommit(_) | - node::Payload::TxPoolImport | - node::Payload::AfgFinalizedBlocksUpTo | - node::Payload::AuraPreSealedBlock | - node::Payload::PreparedBlockForProposing => {}, - } - - // TODO: node_state.update_node, then handle returned diffs + self.handle_from_shard_update(node_id, payload).await; }, FromShardWebsocket::Disconnected => { // Find all nodes associated with this shard connection ID: @@ -336,6 +302,67 @@ impl InnerLoop { } } + async fn handle_from_shard_update(&mut self, node_id: NodeId, payload: node::Payload) { + let mut feed_message_serializer = FeedMessageSerializer::new(); + let mut broadcast_finality = false; + + self.node_state.update_node(node_id, payload, |msg| { + match msg { + OnUpdateNode::StaleNode(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::BestBlock(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::BestFinalized(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::ImportedBlock(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::Hardware(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::NodeStatsUpdate(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::NodeIOUpdate(to_feed) => { + feed_message_serializer.push(to_feed); + } + OnUpdateNode::AfgFinalized(to_feed) => { + feed_message_serializer.push(to_feed); + // All messages sent in an update leading to this message are only + // broadcast to feeds subscribed to chain finality info + broadcast_finality = true; + } + OnUpdateNode::AfgReceivedPrecommit(to_feed) => { + feed_message_serializer.push(to_feed); + // All messages sent in an update leading to this message are only + // broadcast to feeds subscribed to chain finality info + broadcast_finality = true; + } + OnUpdateNode::AfgReceivedPrevote(to_feed) => { + feed_message_serializer.push(to_feed); + // All messages sent in an update leading to this message are only + // broadcast to feeds subscribed to chain finality info + broadcast_finality = true; + } + OnUpdateNode::FinalizedBlock(to_feed) => { + feed_message_serializer.push(to_feed); + } + } + }); + + if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { + let genesis_hash = *chain.genesis_hash(); + if broadcast_finality { + self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await; + } else { + self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await; + } + } + } + /// Handle messages coming from feeds. async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) { log::debug!("Message from feed ({}): {:?}", feed_conn_id, msg); @@ -465,7 +492,7 @@ impl InnerLoop { // Group by chain to simplify the handling of feed messages: let mut node_ids_per_chain: HashMap> = HashMap::new(); for node_id in node_ids.into_iter() { - if let Some(chain) = self.node_state.get_node_chain(node_id) { + if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { node_ids_per_chain.entry(*chain.genesis_hash()).or_default().push(node_id); } } @@ -537,13 +564,8 @@ impl InnerLoop { /// Send a message to all chain feeds. async fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { - -println!("BROADCAST TO CHAIN FEEDS, {}, \n\n{:?}\n\n{:?}\n\n", genesis_hash, self.chain_to_feed_conn_ids, self.feed_conn_id_to_chain); - if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { for &feed_id in feeds { - // How much faster would it be if we processed these in parallel? - // Is it practical to do so given lifetimes and such? if let Some(chan) = self.feed_channels.get_mut(&feed_id) { let _ = chan.send(message.clone()).await; } @@ -561,9 +583,27 @@ println!("BROADCAST TO CHAIN FEEDS, {}, \n\n{:?}\n\n{:?}\n\n", genesis_hash, sel /// Send a message to everybody. async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) { for chan in self.feed_channels.values_mut() { - // How much faster would it be if we processed these in parallel? - // Is it practical to do so given lifetimes and such? let _ = chan.send(message.clone()).await; } } + + /// Finalize a [`FeedMessageSerializer`] and broadcast the result to chain finality feeds + async fn finalize_and_broadcast_to_chain_finality_feeds(&mut self, genesis_hash: &BlockHash, serializer: FeedMessageSerializer) { + if let Some(bytes) = serializer.into_finalized() { + self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)).await; + } + } + + /// Send a message to all chain finality feeds. + async fn broadcast_to_chain_finality_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { + if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { + // Get all feeds for the chain, but only broadcast to those feeds that + // are also subscribed to receive finality updates. + for &feed_id in feeds.union(&self.feed_conn_id_finality) { + if let Some(chan) = self.feed_channels.get_mut(&feed_id) { + let _ = chan.send(message.clone()).await; + } + } + } + } } \ No newline at end of file diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index 0e8e286..5046a32 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -1,19 +1,20 @@ -use std::sync::Arc; -use std::collections::{ HashSet, HashMap }; -use common::types::{ BlockHash }; -use common::types::{Block, NodeDetails, NodeLocation, Timestamp}; +use std::collections::{ HashSet }; +use common::types::{ BlockHash, BlockNumber }; +use common::types::{Block, Timestamp}; use common::util::{now, DenseMap, NumStats}; -use common::most_seen::{ MostSeen, self }; +use common::most_seen::MostSeen; use common::node::Payload; -use std::iter::IntoIterator; use once_cell::sync::Lazy; +use crate::feed_message; + use super::node::Node; use super::NodeId; -pub type ChainId = usize; pub type Label = Box; +const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes + pub struct Chain { /// Labels that nodes use for this chain. We keep track of /// the most commonly used label as nodes are added/removed. @@ -45,6 +46,54 @@ pub struct RemoveNodeResult { pub chain_renamed: bool } +/// When we update a node, we subscribe and receive various messages +/// about the update that take this form. The expectation is that we'll +/// push and broadcast these messages to feeds. This allows the caller to +/// retain control over exactly how/when that will happen. +pub enum OnUpdateNode<'a> { + StaleNode(feed_message::StaleNode), + BestBlock(feed_message::BestBlock), + BestFinalized(feed_message::BestFinalized), + ImportedBlock(feed_message::ImportedBlock<'a>), + Hardware(feed_message::Hardware<'a>), + NodeStatsUpdate(feed_message::NodeStatsUpdate<'a>), + NodeIOUpdate(feed_message::NodeIOUpdate<'a>), + AfgFinalized(feed_message::AfgFinalized), + AfgReceivedPrecommit(feed_message::AfgReceivedPrecommit), + AfgReceivedPrevote(feed_message::AfgReceivedPrevote), + FinalizedBlock(feed_message::FinalizedBlock) +} + +macro_rules! into_on_update { + ($name:ident) => { + impl <'a> From for OnUpdateNode<'a> { + fn from(val: feed_message::$name) -> Self { + OnUpdateNode::$name(val) + } + } + } +} +macro_rules! into_on_update_lt { + ($name:ident) => { + impl <'a> From> for OnUpdateNode<'a> { + fn from(val: feed_message::$name<'a>) -> Self { + OnUpdateNode::$name(val) + } + } + } +} +into_on_update!(StaleNode); +into_on_update!(BestBlock); +into_on_update!(BestFinalized); +into_on_update_lt!(ImportedBlock); +into_on_update_lt!(Hardware); +into_on_update_lt!(NodeStatsUpdate); +into_on_update_lt!(NodeIOUpdate); +into_on_update!(AfgFinalized); +into_on_update!(AfgReceivedPrecommit); +into_on_update!(AfgReceivedPrevote); +into_on_update!(FinalizedBlock); + /// Labels of chains we consider "first party". These chains allow any /// number of nodes to connect. static FIRST_PARTY_NETWORKS: Lazy> = Lazy::new(|| { @@ -107,6 +156,200 @@ impl Chain { } } + /// Attempt to update the best block seen in this chain. + pub fn update_node(&mut self, all_nodes: &mut DenseMap, nid: NodeId, payload: Payload, mut on_update: OnUpdate) + where OnUpdate: FnMut(OnUpdateNode) + { + if let Some(block) = payload.best_block() { + self.handle_block(all_nodes, block, nid, &mut on_update); + } + + if let Some(node) = all_nodes.get_mut(nid) { + match payload { + Payload::SystemInterval(ref interval) => { + if node.update_hardware(interval) { + on_update(feed_message::Hardware(nid, node.hardware()).into()); + } + + if let Some(stats) = node.update_stats(interval) { + on_update(feed_message::NodeStatsUpdate(nid, stats).into()); + } + + if let Some(io) = node.update_io(interval) { + on_update(feed_message::NodeIOUpdate(nid, io).into()); + } + } + Payload::AfgAuthoritySet(authority) => { + node.set_validator_address(authority.authority_id.clone()); + return; + } + Payload::AfgFinalized(finalized) => { + if let Ok(finalized_number) = finalized.finalized_number.parse::() + { + if let Some(addr) = node.details().validator.clone() { + on_update(feed_message::AfgFinalized( + addr, + finalized_number, + finalized.finalized_hash, + ).into()); + } + } + return; + } + Payload::AfgReceivedPrecommit(precommit) => { + if let Ok(finalized_number) = + precommit.target_number.parse::() + { + if let Some(addr) = node.details().validator.clone() { + let voter = precommit.voter.clone(); + on_update(feed_message::AfgReceivedPrecommit( + addr, + finalized_number, + precommit.target_hash, + voter, + ).into()); + } + } + return; + } + Payload::AfgReceivedPrevote(prevote) => { + if let Ok(finalized_number) = + prevote.target_number.parse::() + { + if let Some(addr) = node.details().validator.clone() { + let voter = prevote.voter.clone(); + on_update(feed_message::AfgReceivedPrevote( + addr, + finalized_number, + prevote.target_hash, + voter, + ).into()); + } + } + return; + } + Payload::AfgReceivedCommit(_) => {} + _ => (), + } + + if let Some(block) = payload.finalized_block() { + if let Some(finalized) = node.update_finalized(block) { + on_update(feed_message::FinalizedBlock( + nid, + finalized.height, + finalized.hash, + ).into()); + + if finalized.height > self.finalized.height { + self.finalized = *finalized; + on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into()); + } + } + } + } + } + + fn handle_block(&mut self, all_nodes: &mut DenseMap, block: &Block, nid: NodeId, mut on_update: OnUpdate) + where OnUpdate: FnMut(OnUpdateNode) + { + let mut propagation_time = None; + let now = now(); + let nodes_len = self.node_ids.len(); + + self.update_stale_nodes(all_nodes, now, &mut on_update); + + let node = match all_nodes.get_mut(nid) { + Some(node) => node, + None => return, + }; + + if node.update_block(*block) { + if block.height > self.best.height { + self.best = *block; + log::debug!( + "[{}] [nodes={}] new best block={}/{:?}", + self.labels.best(), + nodes_len, + self.best.height, + self.best.hash, + ); + if let Some(timestamp) = self.timestamp { + self.block_times.push(now - timestamp); + self.average_block_time = Some(self.block_times.average()); + } + self.timestamp = Some(now); + on_update(feed_message::BestBlock( + self.best.height, + now, + self.average_block_time, + ).into()); + propagation_time = Some(0); + } else if block.height == self.best.height { + if let Some(timestamp) = self.timestamp { + propagation_time = Some(now - timestamp); + } + } + + if let Some(details) = node.update_details(now, propagation_time) { + on_update(feed_message::ImportedBlock(nid, details).into()); + } + } + } + + /// Check if the chain is stale (has not received a new best block in a while). + /// If so, find a new best block, ignoring any stale nodes and marking them as such. + fn update_stale_nodes(&mut self, all_nodes: &mut DenseMap, now: u64, mut on_update: OnUpdate) + where OnUpdate: FnMut(OnUpdateNode) + { + let threshold = now - STALE_TIMEOUT; + let timestamp = match self.timestamp { + Some(ts) => ts, + None => return, + }; + + if timestamp > threshold { + // Timestamp is in range, nothing to do + return; + } + + let mut best = Block::zero(); + let mut finalized = Block::zero(); + let mut timestamp = None; + + for &nid in self.node_ids.iter() { + let node = match all_nodes.get_mut(nid) { + Some(node) => node, + None => continue + }; + if !node.update_stale(threshold) { + if node.best().height > best.height { + best = *node.best(); + timestamp = Some(node.best_timestamp()); + } + + if node.finalized().height > finalized.height { + finalized = *node.finalized(); + } + } else { + on_update(feed_message::StaleNode(nid).into()); + } + } + + if self.best.height != 0 || self.finalized.height != 0 { + self.best = best; + self.finalized = finalized; + self.block_times.reset(); + self.timestamp = timestamp; + + on_update(feed_message::BestBlock( + self.best.height, + timestamp.unwrap_or(now), + None, + ).into()); + on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into()); + } + } + pub fn label(&self) -> &str { &self.labels.best() } @@ -119,8 +362,8 @@ impl Chain { pub fn best_block(&self) -> &Block { &self.best } - pub fn timestamp(&self) -> Timestamp { - self.timestamp.unwrap_or(0) + pub fn timestamp(&self) -> Option { + self.timestamp } pub fn average_block_time(&self) -> Option { self.average_block_time diff --git a/backend/telemetry/src/state/node.rs b/backend/telemetry/src/state/node.rs index cc92af3..cc53965 100644 --- a/backend/telemetry/src/state/node.rs +++ b/backend/telemetry/src/state/node.rs @@ -1,7 +1,5 @@ -use std::sync::Arc; - use common::types::{ - Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeId, NodeLocation, NodeStats, + Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeLocation, NodeStats, Timestamp, }; use common::util::now; diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry/src/state/state.rs index 6490268..1fcfcae 100644 --- a/backend/telemetry/src/state/state.rs +++ b/backend/telemetry/src/state/state.rs @@ -1,9 +1,8 @@ -use std::sync::Arc; use std::collections::{ HashSet, HashMap }; use common::types::{ BlockHash }; use super::node::Node; -use common::types::{Block, NodeDetails, NodeLocation, Timestamp}; -use common::util::{now, DenseMap, NumStats}; +use common::types::{Block, NodeDetails, Timestamp}; +use common::util::{DenseMap}; use common::node::Payload; use std::iter::IntoIterator; use crate::find_location; @@ -53,7 +52,10 @@ pub struct NodeAddedToChain<'a> { pub has_chain_label_changed: bool } -// if removing a node is successful, we get this information back. +/// During a node update, we get given various messages about the update +pub type OnUpdateNode<'a> = chain::OnUpdateNode<'a>; + +/// if removing a node is successful, we get this information back. pub struct RemovedNode { /// How many nodes remain on the chain (0 if the chain was removed) pub chain_node_count: usize, @@ -94,6 +96,13 @@ impl State { .map(move |(_,chain)| StateChain { state: self, chain }) } + pub fn get_chain_by_node_id(&self, node_id: NodeId) -> Option> { + self.chains_by_node + .get(&node_id) + .and_then(|&chain_id| self.chains.get(chain_id)) + .map(|chain| StateChain { state: self, chain }) + } + pub fn get_chain_by_genesis_hash(&self, genesis_hash: &BlockHash) -> Option> { self.chains_by_genesis_hash .get(genesis_hash) @@ -207,28 +216,32 @@ impl State { }) } + /// Attempt to update the best block seen, given a node and block. + pub fn update_node(&mut self, node_id: NodeId, payload: Payload, on_update: OnUpdate) + where OnUpdate: FnMut(OnUpdateNode) + { + let chain_id = match self.chains_by_node.get(&node_id) { + Some(chain_id) => *chain_id, + None => { log::error!("Cannot find chain_id for node with ID {}", node_id); return } + }; + + let chain = match self.chains.get_mut(chain_id) { + Some(chain) => chain, + None => { log::error!("Cannot find chain for node with ID {}", node_id); return } + }; + + chain.update_node(&mut self.nodes, node_id, payload, on_update); + } + /// Update the location for a node. Return `false` if the node was not found. pub fn update_node_location(&mut self, node_id: NodeId, location: find_location::Location) -> bool { - if let Some(node) = self.get_node_mut(node_id) { + if let Some(node) = self.nodes.get_mut(node_id) { node.update_location(location); true } else { false } } - - /// Get the chain that a node belongs to. - pub fn get_node_chain(&self, node_id: NodeId) -> Option> { - self.chains_by_node - .get(&node_id) - .and_then(|&chain_id| self.chains.get(chain_id)) - .map(|chain| StateChain { state: self, chain }) - } - - /// Obtain mutable access to a node, if it's found. - fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> { - self.nodes.get_mut(node_id) - } } @@ -255,7 +268,7 @@ impl <'a> StateChain<'a> { self.chain.best_block() } pub fn timestamp(&self) -> Timestamp { - self.chain.timestamp() + self.chain.timestamp().unwrap_or(0) } pub fn average_block_time(&self) -> Option { self.chain.average_block_time() @@ -335,21 +348,21 @@ mod test { let chain1_genesis = BlockHash::from_low_u64_be(1); state.add_node(chain1_genesis, node("A", "Chain One")); // 0 - assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One"); + assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain One"); assert!(state.get_chain_by_label("Chain One").is_some()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); state.add_node(chain1_genesis, node("B", "Chain Two")); // 1 // Chain name hasn't changed yet; "Chain One" as common as "Chain Two".. - assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One"); + assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain One"); assert!(state.get_chain_by_label("Chain One").is_some()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); state.add_node(chain1_genesis, node("B", "Chain Two")); // 2 // Chain name has changed; "Chain Two" the winner now.. - assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain Two"); + assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain Two"); assert!(state.get_chain_by_label("Chain One").is_none()); assert!(state.get_chain_by_label("Chain Two").is_some()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); @@ -358,7 +371,7 @@ mod test { state.remove_node(2).expect("Removal OK (id: 2"); // Removed both "Chain Two" nodes; dominant name now "Chain One" again.. - assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One"); + assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain One"); assert!(state.get_chain_by_label("Chain One").is_some()); assert!(state.get_chain_by_label("Chain Two").is_none()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());