diff --git a/backend/common/src/most_seen.rs b/backend/common/src/most_seen.rs index 776e5e7..5e8da1a 100644 --- a/backend/common/src/most_seen.rs +++ b/backend/common/src/most_seen.rs @@ -137,8 +137,9 @@ mod test { #[test] fn default_renames_instantly() { let mut a: MostSeen<&str> = MostSeen::default(); - a.insert(&"Hello"); + let res = a.insert(&"Hello"); assert_eq!(*a.best(), "Hello"); + assert!(res.has_changed()); } #[test] diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 01aef94..45b9c9f 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -4,6 +4,7 @@ use common::{ LocalId, MuteReason }, + types::BlockHash, node, util::now }; @@ -88,7 +89,7 @@ pub enum FromFeedWebsocket { NoMoreFinality, /// An explicit ping message. Ping { - chain: Box + value: Box }, /// The feed is disconnected. Disconnected @@ -98,13 +99,13 @@ pub enum FromFeedWebsocket { impl FromStr for FromFeedWebsocket { type Err = anyhow::Error; fn from_str(s: &str) -> Result { - let (cmd, chain) = match s.find(':') { + let (cmd, value) = match s.find(':') { Some(idx) => (&s[..idx], s[idx+1..].into()), None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`")) }; match cmd { - "ping" => Ok(FromFeedWebsocket::Ping { chain }), - "subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }), + "ping" => Ok(FromFeedWebsocket::Ping { value }), + "subscribe" => Ok(FromFeedWebsocket::Subscribe { chain: value }), "send-finality" => Ok(FromFeedWebsocket::SendFinality), "no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality), _ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)) @@ -136,9 +137,11 @@ pub struct InnerLoop { shard_channels: HashMap>, /// Which chain is a feed subscribed to? - feed_conn_id_to_chain: HashMap>, + /// Feed Connection ID -> Chain Genesis Hash + feed_conn_id_to_chain: HashMap, /// Which feeds are subscribed to a given chain (needs to stay in sync with above)? - chain_to_feed_conn_ids: HashMap, HashSet>, + /// Chain Genesis Hash -> Feed Connection IDs + chain_to_feed_conn_ids: HashMap>, /// These feeds want finality info, too. feed_conn_id_finality: HashSet, @@ -197,18 +200,19 @@ impl InnerLoop { &loc.city )); - let chain_label = self.node_state + let chain_genesis_hash = self.node_state .get_node_chain(node_id) - .map(|chain| chain.label().to_owned()); + .map(|chain| *chain.genesis_hash()); - if let Some(chain_label) = chain_label { - self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_message_serializer).await; + if let Some(chain_genesis_hash) = chain_genesis_hash { + self.finalize_and_broadcast_to_chain_feeds(&chain_genesis_hash, feed_message_serializer).await; } } } /// Handle messages coming from shards. async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) { + log::debug!("Message from shard ({}): {:?}", shard_conn_id, msg); match msg { FromShardWebsocket::Initialize { channel } => { self.shard_channels.insert(shard_conn_id, channel); @@ -246,7 +250,7 @@ impl InnerLoop { // Tell chain subscribers about the node we've just added: let mut feed_messages_for_chain = FeedMessageSerializer::new(); feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node)); - self.finalize_and_broadcast_to_chain_feeds(&old_chain_label, feed_messages_for_chain).await; + self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_messages_for_chain).await; // Tell everybody about the new node count and potential rename: let mut feed_messages_for_all = FeedMessageSerializer::new(); @@ -334,6 +338,7 @@ impl InnerLoop { /// Handle messages coming from feeds. async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) { + log::debug!("Message from feed ({}): {:?}", feed_conn_id, msg); match msg { FromFeedWebsocket::Initialize { mut channel } => { self.feed_channels.insert(feed_conn_id, channel.clone()); @@ -353,7 +358,7 @@ impl InnerLoop { let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await; } }, - FromFeedWebsocket::Ping { chain } => { + FromFeedWebsocket::Ping { value } => { let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) { Some(chan) => chan, None => return @@ -361,7 +366,7 @@ impl InnerLoop { // Pong! let mut feed_serializer = FeedMessageSerializer::new(); - feed_serializer.push(feed_message::Pong(&chain)); + feed_serializer.push(feed_message::Pong(&value)); if let Some(bytes) = feed_serializer.into_finalized() { let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; } @@ -373,9 +378,9 @@ impl InnerLoop { }; // Unsubscribe from previous chain if subscribed to one: - let old_chain_label = self.feed_conn_id_to_chain.remove(&feed_conn_id); - if let Some(old_chain_label) = &old_chain_label { - if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_chain_label) { + let old_genesis_hash = self.feed_conn_id_to_chain.remove(&feed_conn_id); + if let Some(old_genesis_hash) = &old_genesis_hash { + if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_genesis_hash) { map.remove(&feed_conn_id); } } @@ -383,29 +388,34 @@ impl InnerLoop { // Untoggle request for finality feeds: self.feed_conn_id_finality.remove(&feed_conn_id); - // Get the chain we're subscribing to, ignoring the rest if it doesn't exist. - let chain = match self.node_state.get_chain_by_label(&chain) { + // Get old chain if there was one: + let node_state = &self.node_state; + let old_chain = old_genesis_hash + .and_then(|hash| node_state.get_chain_by_genesis_hash(&hash)); + + // Get new chain, ignoring the rest if it doesn't exist. + let new_chain = match self.node_state.get_chain_by_label(&chain) { Some(chain) => chain, None => return }; - // Send messages to the feed about the new chain: + // Send messages to the feed about this subscription: let mut feed_serializer = FeedMessageSerializer::new(); - if let Some(old_chain_label) = old_chain_label { - feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label)); + if let Some(old_chain) = old_chain { + feed_serializer.push(feed_message::UnsubscribedFrom(old_chain.label())); } - feed_serializer.push(feed_message::SubscribedTo(chain.label())); + feed_serializer.push(feed_message::SubscribedTo(new_chain.label())); feed_serializer.push(feed_message::TimeSync(now())); feed_serializer.push(feed_message::BestBlock ( - chain.best_block().height, - chain.timestamp(), - chain.average_block_time() + new_chain.best_block().height, + new_chain.timestamp(), + new_chain.average_block_time() )); feed_serializer.push(feed_message::BestFinalized ( - chain.finalized_block().height, - chain.finalized_block().hash + new_chain.finalized_block().height, + new_chain.finalized_block().hash )); - for (idx, (node_id, node)) in chain.iter_nodes().enumerate() { + for (idx, (node_id, node)) in new_chain.iter_nodes().enumerate() { // Send subscription confirmation and chain head before doing all the nodes, // and continue sending batches of 32 nodes a time over the wire subsequently if idx % 32 == 0 { @@ -428,8 +438,9 @@ impl InnerLoop { } // Actually make a note of the new chain subsciption: - self.feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into()); - self.chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id); + let new_genesis_hash = *new_chain.genesis_hash(); + self.feed_conn_id_to_chain.insert(feed_conn_id, new_genesis_hash); + self.chain_to_feed_conn_ids.entry(new_genesis_hash).or_default().insert(feed_conn_id); }, FromFeedWebsocket::SendFinality => { self.feed_conn_id_finality.insert(feed_conn_id); @@ -452,11 +463,10 @@ impl InnerLoop { async fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator) { // Group by chain to simplify the handling of feed messages: - let mut node_ids_per_chain: HashMap> = HashMap::new(); + let mut node_ids_per_chain: HashMap> = HashMap::new(); for node_id in node_ids.into_iter() { if let Some(chain) = self.node_state.get_node_chain(node_id) { - let chain_label = chain.label().to_owned(); - node_ids_per_chain.entry(chain_label).or_default().push(node_id); + node_ids_per_chain.entry(*chain.genesis_hash()).or_default().push(node_id); } } @@ -519,15 +529,18 @@ impl InnerLoop { } /// Finalize a [`FeedMessageSerializer`] and broadcast the result to feeds for the chain. - async fn finalize_and_broadcast_to_chain_feeds(&mut self, chain: &str, serializer: FeedMessageSerializer) { + async fn finalize_and_broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, serializer: FeedMessageSerializer) { if let Some(bytes) = serializer.into_finalized() { - self.broadcast_to_chain_feeds(chain, ToFeedWebsocket::Bytes(bytes)).await; + self.broadcast_to_chain_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)).await; } } /// Send a message to all chain feeds. - async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) { - if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) { + async fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { + +println!("BROADCAST TO CHAIN FEEDS, {}, \n\n{:?}\n\n{:?}\n\n", genesis_hash, self.chain_to_feed_conn_ids, self.feed_conn_id_to_chain); + + if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { for &feed_id in feeds { // How much faster would it be if we processed these in parallel? // Is it practical to do so given lifetimes and such? diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index eb1fbad..17e7d9f 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -252,6 +252,7 @@ async fn handle_feed_websocket_connection(mut websocket: ws::WebSocket, mut t // pre-serialized bytes that we send as binary): match msg { ToFeedWebsocket::Bytes(bytes) => { + log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8")); let _ = websocket.send(ws::Message::binary(bytes)).await; } } diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry/src/state/state.rs index b5dc182..6490268 100644 --- a/backend/telemetry/src/state/state.rs +++ b/backend/telemetry/src/state/state.rs @@ -94,6 +94,13 @@ impl State { .map(move |(_,chain)| StateChain { state: self, chain }) } + pub fn get_chain_by_genesis_hash(&self, genesis_hash: &BlockHash) -> Option> { + self.chains_by_genesis_hash + .get(genesis_hash) + .and_then(|&chain_id| self.chains.get(chain_id)) + .map(|chain| StateChain { state: self, chain }) + } + pub fn get_chain_by_label(&self, label: &str) -> Option> { self.chains_by_label .get(label) @@ -107,6 +114,9 @@ impl State { } // Get the chain ID, creating a new empty chain if one doesn't exist. + // If we create a chain here, we are expecting that it will allow at + // least this node to be added, because we don't currently try and clean it up + // if the add fails. let chain_id = match self.chains_by_genesis_hash.get(&genesis_hash) { Some(id) => *id, None => { @@ -124,22 +134,25 @@ impl State { // to add it until we know whether the chain will accept it, but we want // an ID to give to the chain. let node_id = self.nodes.next_id(); - let chain_label = node_details.chain.clone(); + let node_chain_label = node_details.chain.clone(); + let old_chain_label = chain.label().into(); - match chain.add_node(node_id, &chain_label) { + match chain.add_node(node_id, &node_chain_label) { chain::AddNodeResult::Overquota => { AddNodeResult::ChainOverQuota }, chain::AddNodeResult::Added { chain_renamed } => { let chain = &*chain; - // Actually add the node if the chain accepts it: + // Actually add the node, and a reference to its chain, + // if the chain adds it successfully: self.nodes.add(Node::new(node_details)); + self.chains_by_node.insert(node_id, chain_id); // Update the label we use to reference the chain if // it changes (it'll always change first time a node's added): if chain_renamed { - self.chains_by_label.remove(&chain_label); + self.chains_by_label.remove(&old_chain_label); self.chains_by_label.insert(chain.label().to_string().into_boxed_str(), chain_id); } @@ -147,7 +160,7 @@ impl State { AddNodeResult::NodeAddedToChain(NodeAddedToChain { id: node_id, node: node, - old_chain_label: chain_label, + old_chain_label: old_chain_label, new_chain_label: chain.label(), chain_node_count: chain.node_count(), has_chain_label_changed: chain_renamed @@ -216,38 +229,13 @@ impl State { fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> { self.nodes.get_mut(node_id) } - - // /// Add a new node to our state. - // pub fn add_node(&mut self, id: GlobalId, genesis_hash: BlockHash, node: &NodeDetails) -> AddNodeResult { - // if self.denylist.contains(&*node.chain) { - // return AddNodeResult::ChainOnDenyList; - // } - // let chain_id = self.chains.get_or_create(genesis_hash, &node.chain); - - // return Ok(()) - // } - - // /// Remove a node from our state. - // pub fn remove_node(&mut self, id: GlobalId) -> RemoveNodeResult { - - // } - - // /// Update a node with new data. This needs breaking down into parts so - // /// that we can emit a useful result in each case to inform the aggregator - // /// what messages it needs to emit. - // pub fn update_node(&mut self, id: GlobalId, payload: Payload) { - - // } - - // fn get_or_create_chain(genesis_hash: BlockHash, chain: &str) -> ChainId { - - // } } /// When we ask for a chain, we get this struct back. This ensures that we have /// a consistent public interface, and don't expose methods on [`Chain`] that -/// aren't really intended for use outside of [`State`] methods. +/// aren't really intended for use outside of [`State`] methods. Any modification +/// of a chain needs to go through [`State`]. pub struct StateChain<'a> { state: &'a State, chain: &'a Chain @@ -257,6 +245,9 @@ impl <'a> StateChain<'a> { pub fn label(&self) -> &'a str { self.chain.label() } + pub fn genesis_hash(&self) -> &'a BlockHash { + self.chain.genesis_hash() + } pub fn node_count(&self) -> usize { self.chain.node_count() } @@ -278,4 +269,116 @@ impl <'a> StateChain<'a> { Some((id, state.nodes.get(id)?)) }) } +} + +#[cfg(test)] +mod test { + use super::*; + + fn node(name: &str, chain: &str) -> NodeDetails { + NodeDetails { + chain: chain.into(), + name: name.into(), + implementation: "Bar".into(), + version: "0.1".into(), + validator: None, + network_id: None, + startup_time: None + } + } + + #[test] + fn adding_a_node_returns_expected_response() { + let mut state = State::new(None); + + let chain1_genesis = BlockHash::from_low_u64_be(1); + + let add_result = state.add_node( + chain1_genesis, + node("A", "Chain One") + ); + + let add_node_result = match add_result { + AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"), + AddNodeResult::ChainOverQuota => panic!("Chain not Overquota"), + AddNodeResult::NodeAddedToChain(details) => details + }; + + assert_eq!(add_node_result.id, 0); + assert_eq!(&*add_node_result.old_chain_label, ""); + assert_eq!(&*add_node_result.new_chain_label, "Chain One"); + assert_eq!(add_node_result.chain_node_count, 1); + assert_eq!(add_node_result.has_chain_label_changed, true); + + let add_result = state.add_node( + chain1_genesis, + node("A", "Chain One") + ); + + let add_node_result = match add_result { + AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"), + AddNodeResult::ChainOverQuota => panic!("Chain not Overquota"), + AddNodeResult::NodeAddedToChain(details) => details + }; + + assert_eq!(add_node_result.id, 1); + assert_eq!(&*add_node_result.old_chain_label, "Chain One"); + assert_eq!(&*add_node_result.new_chain_label, "Chain One"); + assert_eq!(add_node_result.chain_node_count, 2); + assert_eq!(add_node_result.has_chain_label_changed, false); + } + + #[test] + fn adding_and_removing_nodes_updates_chain_label_mapping() { + let mut state = State::new(None); + + let chain1_genesis = BlockHash::from_low_u64_be(1); + state.add_node(chain1_genesis, node("A", "Chain One")); // 0 + + assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One"); + assert!(state.get_chain_by_label("Chain One").is_some()); + assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); + + state.add_node(chain1_genesis, node("B", "Chain Two")); // 1 + + // Chain name hasn't changed yet; "Chain One" as common as "Chain Two".. + assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One"); + assert!(state.get_chain_by_label("Chain One").is_some()); + assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); + + state.add_node(chain1_genesis, node("B", "Chain Two")); // 2 + + // Chain name has changed; "Chain Two" the winner now.. + assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain Two"); + assert!(state.get_chain_by_label("Chain One").is_none()); + assert!(state.get_chain_by_label("Chain Two").is_some()); + assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); + + state.remove_node(1).expect("Removal OK (id: 1)"); + state.remove_node(2).expect("Removal OK (id: 2"); + + // Removed both "Chain Two" nodes; dominant name now "Chain One" again.. + assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One"); + assert!(state.get_chain_by_label("Chain One").is_some()); + assert!(state.get_chain_by_label("Chain Two").is_none()); + assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); + } + + #[test] + fn chain_removed_when_last_node_is() { + let mut state = State::new(None); + + let chain1_genesis = BlockHash::from_low_u64_be(1); + state.add_node(chain1_genesis, node("A", "Chain One")); // 0 + + assert!(state.get_chain_by_label("Chain One").is_some()); + assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); + assert_eq!(state.iter_chains().count(), 1); + + state.remove_node(0); + + assert!(state.get_chain_by_label("Chain One").is_none()); + assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_none()); + assert_eq!(state.iter_chains().count(), 0); + } } \ No newline at end of file