diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index e52c41f..ddd7a1b 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -1,4 +1,4 @@ -use std::hash::Hash; +use std::{fmt::Display, hash::Hash}; use serde::{Serialize,Deserialize}; use bimap::BiMap; @@ -15,6 +15,11 @@ impl std::convert::From for Id { Id(n) } } +impl Display for Id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} /// A struct that allows you to assign an ID to an arbitrary set of /// details (so long as they are Eq+Hash+Clone), and then access diff --git a/backend/common/src/most_seen.rs b/backend/common/src/most_seen.rs index e2b0c64..8735cfb 100644 --- a/backend/common/src/most_seen.rs +++ b/backend/common/src/most_seen.rs @@ -10,7 +10,13 @@ pub struct MostSeen { others: HashMap } -impl MostSeen { +impl Default for MostSeen { + fn default() -> Self { + MostSeen::new(T::default()) + } +} + +impl MostSeen { pub fn new(item: T) -> Self { Self { current_best: item, diff --git a/backend/common/src/util/dense_map.rs b/backend/common/src/util/dense_map.rs index 8c06af5..3d24669 100644 --- a/backend/common/src/util/dense_map.rs +++ b/backend/common/src/util/dense_map.rs @@ -77,4 +77,16 @@ impl DenseMap { pub fn is_empty(&self) -> bool { self.len() == 0 } + + /// Return the next Id that will be assigned. + pub fn next_id(&self) -> usize { + match self.retired.last() { + Some(id) => { + *id + } + None => { + self.items.len() + } + } + } } diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 4a21640..8ebb584 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -238,21 +238,41 @@ impl InnerLoop { }, state::AddNodeResult::NodeAddedToChain(details) => { let node_id = details.id; - // Note the ID so that we know what node other messages are referring to: + + // Record ID <-> (shardId,localId) for future messages: self.node_ids.insert(node_id, (shard_conn_id, local_id)); - let mut feed_serializer = FeedMessageSerializer::new(); - feed_serializer.push(feed_message::AddedNode(node_id, details.node)); - let chain_label = details.chain.label().to_owned(); + // Don't hold onto details too long because we want &mut self later: + let old_chain_label = details.old_chain_label.to_owned(); + let new_chain_label = details.new_chain_label.to_owned(); + let chain_node_count = details.chain_node_count; + let has_chain_label_changed = details.has_chain_label_changed; - if let Some(bytes) = feed_serializer.into_finalized() { + // Tell chain subscribers about the node we've just added: + let mut feed_messages_for_chain = FeedMessageSerializer::new(); + feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node)); + + if let Some(bytes) = feed_messages_for_chain.into_finalized() { self.broadcast_to_chain_feeds( - &chain_label, + &new_chain_label, ToFeedWebsocket::Bytes(bytes) - ).await + ).await; } - // Currently we only geographically locate IPV4 addresses so ignore IPV6; + // Tell everybody about the new node count and potential rename: + let mut feed_messages_for_all = FeedMessageSerializer::new(); + if has_chain_label_changed { + feed_messages_for_all.push(feed_message::RemovedChain(&old_chain_label)); + } + feed_messages_for_all.push(feed_message::AddedChain(&new_chain_label, chain_node_count)); + + if let Some(bytes) = feed_messages_for_all.into_finalized() { + let msg = ToFeedWebsocket::Bytes(bytes); + self.broadcast_to_all_feeds(msg).await; + } + + // Ask for the grographical location of the node. + // Currently we only geographically locate IPV4 addresses so ignore IPV6. if let Some(IpAddr::V4(ip_v4)) = ip { let _ = self.tx_to_locator.send((node_id, ip_v4)).await; } @@ -260,9 +280,14 @@ impl InnerLoop { } }, FromShardWebsocket::Remove { local_id } => { - if let Some(node_id) = self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { - // TODO: node_state.remove_node, Every feed should know about node count changes. - } + let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { + Some((node_id, _)) => node_id, + None => { + log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id); + return + } + }; + self.remove_nodes_and_broadcast_result(Some(node_id)).await; }, FromShardWebsocket::Update { local_id, payload } => { // TODO: Fill this all in... @@ -309,8 +334,15 @@ impl InnerLoop { // TODO: node_state.update_node, then handle returned diffs }, FromShardWebsocket::Disconnected => { - // The shard has disconnected; remove the shard channel, but also - // remove any nodes associated with the shard, firing the relevant feed messages. + // Find all nodes associated with this shard connection ID: + let node_ids_to_remove: Vec = self.node_ids + .iter() + .filter(|(_, &(this_shard_conn_id, _))| shard_conn_id == this_shard_conn_id) + .map(|(&node_id,_)| node_id) + .collect(); + + // ... and remove them: + self.remove_nodes_and_broadcast_result(node_ids_to_remove).await; } } } @@ -388,7 +420,7 @@ impl InnerLoop { chain.finalized_block().height, chain.finalized_block().hash )); - for (idx, (gid, node)) in chain.nodes().enumerate() { + for (idx, (node_id, node)) in chain.iter_nodes().enumerate() { // Send subscription confirmation and chain head before doing all the nodes, // and continue sending batches of 32 nodes a time over the wire subsequently if idx % 32 == 0 { @@ -396,14 +428,14 @@ impl InnerLoop { let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; } } - feed_serializer.push(feed_message::AddedNode(gid, node)); + feed_serializer.push(feed_message::AddedNode(node_id, node)); feed_serializer.push(feed_message::FinalizedBlock( - gid, + node_id, node.finalized().height, node.finalized().hash, )); if node.stale() { - feed_serializer.push(feed_message::StaleNode(gid)); + feed_serializer.push(feed_message::StaleNode(node_id)); } } if let Some(bytes) = feed_serializer.into_finalized() { @@ -431,6 +463,81 @@ impl InnerLoop { } } + /// Remove all of the node IDs provided and broadcast messages to feeds as needed. + async fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator) { + + // Group by chain to simplify the handling of feed messages: + let mut node_ids_per_chain: HashMap> = HashMap::new(); + for node_id in node_ids.into_iter() { + if let Some(chain) = self.node_state.get_node_chain(node_id) { + let chain_label = chain.label().to_owned(); + node_ids_per_chain.entry(chain_label).or_default().push(node_id); + } + } + + // Remove the nodes for each chain + let mut feed_messages_for_all = FeedMessageSerializer::new(); + for (chain_label, node_ids) in node_ids_per_chain { + let mut feed_messages_for_chain = FeedMessageSerializer::new(); + for node_id in node_ids { + self.remove_node( + node_id, + &mut feed_messages_for_chain, + &mut feed_messages_for_all + ); + } + if let Some(bytes) = feed_messages_for_chain.into_finalized() { + self.broadcast_to_chain_feeds(&chain_label, ToFeedWebsocket::Bytes(bytes)).await; + } + } + + if let Some(bytes) = feed_messages_for_all.into_finalized() { + self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await; + } + } + + /// Remove a single node by its ID, pushing any messages we'd want to send + /// out to feeds onto the provided feed serializers. Doesn't actually send + /// anything to the feeds; just updates state as needed. + fn remove_node( + &mut self, + node_id: NodeId, + feed_for_chain: &mut FeedMessageSerializer, + feed_for_all: &mut FeedMessageSerializer + ) { + // Remove our top level association (this may already have been done). + self.node_ids.remove_by_left(&node_id); + + let removed_details = match self.node_state.remove_node(node_id) { + Ok(remove_details) => remove_details, + Err(err) => { + log::error!("Error removing node {}: {}", node_id, err); + return + } + }; + + // The chain has been removed (no nodes left in it, or it was renamed): + if removed_details.chain_node_count == 0 || removed_details.has_chain_label_changed { + feed_for_all.push(feed_message::RemovedChain( + &removed_details.old_chain_label + )); + } + + // If the chain still exists, tell everybody about the new label or updated node count: + if removed_details.chain_node_count != 0 { + feed_for_all.push( + feed_message::AddedChain(&removed_details.new_chain_label, removed_details.chain_node_count) + ); + } + + // Assuming the chain hasn't gone away, tell chain subscribers about the node removal + if removed_details.chain_node_count != 0 { + feed_for_chain.push( + feed_message::RemovedNode(node_id) + ); + } + } + /// Send a message to all chain feeds. async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) { if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) { @@ -438,9 +545,18 @@ impl InnerLoop { // How much faster would it be if we processed these in parallel? // Is it practical to do so given lifetimes and such? if let Some(chan) = self.feed_channels.get_mut(&feed_id) { - chan.send(message.clone()).await; + let _ = chan.send(message.clone()).await; } } } } + + /// Send a message to everybody. + async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) { + for chan in self.feed_channels.values_mut() { + // How much faster would it be if we processed these in parallel? + // Is it practical to do so given lifetimes and such? + let _ = chan.send(message.clone()).await; + } + } } \ No newline at end of file diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index 874d888..0e8e286 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -19,7 +19,7 @@ pub struct Chain { /// the most commonly used label as nodes are added/removed. labels: MostSeen