diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 85e14fa..c0df1f6 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -30,6 +30,7 @@ mod dense_map; mod either_sink; mod mean_list; mod most_seen; +mod multi_map_unique; mod num_stats; // Export a bunch of common bits at the top level for ease of import: @@ -38,4 +39,5 @@ pub use dense_map::DenseMap; pub use either_sink::EitherSink; pub use mean_list::MeanList; pub use most_seen::MostSeen; +pub use multi_map_unique::MultiMapUnique; pub use num_stats::NumStats; diff --git a/backend/common/src/multi_map_unique.rs b/backend/common/src/multi_map_unique.rs new file mode 100644 index 0000000..51e8e9a --- /dev/null +++ b/backend/common/src/multi_map_unique.rs @@ -0,0 +1,166 @@ +// Source code for the Substrate Telemetry Server. +// Copyright (C) 2021 Parity Technologies (UK) Ltd. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; + +/// A map where each key can contain multiple values. We enforce that a value +/// only ever belongs to one key at a time (the latest key it was inserted +/// against). +pub struct MultiMapUnique { + value_to_key: HashMap, + key_to_values: HashMap>, +} + +impl MultiMapUnique { + /// Construct a new MultiMap + pub fn new() -> Self { + Self { + value_to_key: HashMap::new(), + key_to_values: HashMap::new(), + } + } + + /// Return the set of values associated with a key. + pub fn get_values(&self, key: &K) -> Option<&HashSet> + where + K: Eq + Hash, + { + self.key_to_values.get(key) + } + + /// Remove a value from the MultiMap, returning the key it was found + /// under, if it was found at all. + /// + /// ``` + /// let mut m = common::MultiMapUnique::new(); + /// + /// m.insert("a", 1); + /// m.insert("a", 2); + /// + /// m.insert("b", 3); + /// m.insert("b", 4); + /// + /// assert_eq!(m.num_keys(), 2); + /// assert_eq!(m.num_values(), 4); + /// + /// m.remove_value(&1); + /// + /// assert_eq!(m.num_keys(), 2); + /// assert_eq!(m.num_values(), 3); + /// + /// m.remove_value(&2); + /// + /// assert_eq!(m.num_keys(), 1); + /// assert_eq!(m.num_values(), 2); + /// ``` + pub fn remove_value(&mut self, value: &V) -> Option + where + V: Eq + Hash, + K: Eq + Hash, + { + if let Some(key) = self.value_to_key.remove(value) { + if let Some(m) = self.key_to_values.get_mut(&key) { + m.remove(value); + if m.is_empty() { + self.key_to_values.remove(&key); + } + } + return Some(key); + } + None + } + + /// Insert a key+value pair into the multimap. Multiple different + /// values can exist for a single key, but only one of each value can + /// exist in the MultiMap. + /// + /// If a previous value did exist, the old key it was inserted against + /// is returned. + /// + /// ``` + /// let mut m = common::MultiMapUnique::new(); + /// + /// let old_key = m.insert("a", 1); + /// assert_eq!(old_key, None); + /// + /// let old_key = m.insert("b", 1); + /// assert_eq!(old_key, Some("a")); + /// + /// let old_key = m.insert("c", 1); + /// assert_eq!(old_key, Some("b")); + /// + /// assert_eq!(m.num_keys(), 1); + /// assert_eq!(m.num_values(), 1); + /// + /// // The value `1` must be unique in the map, so it only exists + /// // in the last location it was inserted: + /// assert!(m.get_values(&"a").is_none()); + /// assert!(m.get_values(&"b").is_none()); + /// assert_eq!(m.get_values(&"c").unwrap().iter().collect::>(), vec![&1]); + /// ``` + pub fn insert(&mut self, key: K, value: V) -> Option + where + V: Clone + Eq + Hash, + K: Clone + Eq + Hash, + { + // Ensure that the value doesn't exist elsewhere already; + // values must be unique and only belong to one key: + let old_key = self.remove_value(&value); + + self.value_to_key.insert(value.clone(), key.clone()); + self.key_to_values.entry(key).or_default().insert(value); + + old_key + } + + /// Number of values stored in the map + pub fn num_values(&self) -> usize { + self.value_to_key.len() + } + + /// Number of keys stored in the map + pub fn num_keys(&self) -> usize { + self.key_to_values.len() + } +} + +#[cfg(test)] +mod test { + + use super::*; + + #[test] + fn multiple_values_allowed_per_key() { + let mut m = MultiMapUnique::new(); + + m.insert("a", 1); + m.insert("a", 2); + + m.insert("b", 3); + m.insert("b", 4); + + assert_eq!(m.num_keys(), 2); + assert_eq!(m.num_values(), 4); + + let a_vals = m.get_values(&"a").expect("a vals"); + assert!(a_vals.contains(&1)); + assert!(a_vals.contains(&2)); + + let b_vals = m.get_values(&"b").expect("b vals"); + assert!(b_vals.contains(&3)); + assert!(b_vals.contains(&4)); + } +} diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 357137d..7b08cd2 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -23,7 +23,7 @@ use common::{ internal_messages::{self, MuteReason, ShardNodeId}, node_message, node_types::BlockHash, - time, + time, MultiMapUnique, }; use std::collections::{HashMap, HashSet}; use std::sync::{ @@ -169,12 +169,8 @@ pub struct InnerLoop { /// Keep track of how to send messages out to shards. shard_channels: HashMap>, - /// Which chain is a feed subscribed to? - /// Feed Connection ID -> Chain Genesis Hash - feed_conn_id_to_chain: HashMap, - /// Which feeds are subscribed to a given chain (needs to stay in sync with above)? - /// Chain Genesis Hash -> Feed Connection IDs - chain_to_feed_conn_ids: HashMap>, + /// Which feeds are subscribed to a given chain? + chain_to_feed_conn_ids: MultiMapUnique, /// These feeds want finality info, too. feed_conn_id_finality: HashSet, @@ -199,8 +195,7 @@ impl InnerLoop { node_ids: BiMap::new(), feed_channels: HashMap::new(), shard_channels: HashMap::new(), - feed_conn_id_to_chain: HashMap::new(), - chain_to_feed_conn_ids: HashMap::new(), + chain_to_feed_conn_ids: MultiMapUnique::new(), feed_conn_id_finality: HashSet::new(), tx_to_locator, max_queue_len, @@ -272,8 +267,8 @@ impl InnerLoop { ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); - let subscribed_feeds = self.feed_conn_id_to_chain.len(); - let chains_subscribed_to = self.chain_to_feed_conn_ids.len(); + let subscribed_feeds = self.chain_to_feed_conn_ids.num_values(); + let chains_subscribed_to = self.chain_to_feed_conn_ids.num_keys(); let subscribed_finality_feeds = self.feed_conn_id_finality.len(); let connected_shards = self.shard_channels.len(); let connected_feeds = self.feed_channels.len(); @@ -492,12 +487,7 @@ impl InnerLoop { }; // Unsubscribe from previous chain if subscribed to one: - let old_genesis_hash = self.feed_conn_id_to_chain.remove(&feed_conn_id); - if let Some(old_genesis_hash) = &old_genesis_hash { - if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_genesis_hash) { - map.remove(&feed_conn_id); - } - } + let old_genesis_hash = self.chain_to_feed_conn_ids.remove_value(&feed_conn_id); // Untoggle request for finality feeds: self.feed_conn_id_finality.remove(&feed_conn_id); @@ -570,12 +560,8 @@ impl InnerLoop { // Actually make a note of the new chain subsciption: let new_genesis_hash = *new_chain.genesis_hash(); - self.feed_conn_id_to_chain - .insert(feed_conn_id, new_genesis_hash); self.chain_to_feed_conn_ids - .entry(new_genesis_hash) - .or_default() - .insert(feed_conn_id); + .insert(new_genesis_hash, feed_conn_id); } FromFeedWebsocket::SendFinality => { self.feed_conn_id_finality.insert(feed_conn_id); @@ -585,9 +571,7 @@ impl InnerLoop { } FromFeedWebsocket::Disconnected => { // The feed has disconnected; clean up references to it: - if let Some(chain) = self.feed_conn_id_to_chain.remove(&feed_conn_id) { - self.chain_to_feed_conn_ids.remove(&chain); - } + self.chain_to_feed_conn_ids.remove_value(&feed_conn_id); self.feed_channels.remove(&feed_conn_id); self.feed_conn_id_finality.remove(&feed_conn_id); } @@ -677,7 +661,7 @@ impl InnerLoop { /// Send a message to all chain feeds. fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { - if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { + if let Some(feeds) = self.chain_to_feed_conn_ids.get_values(genesis_hash) { for &feed_id in feeds { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { let _ = chan.send(message.clone()); @@ -717,7 +701,7 @@ impl InnerLoop { genesis_hash: &BlockHash, message: ToFeedWebsocket, ) { - if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { + if let Some(feeds) = self.chain_to_feed_conn_ids.get_values(genesis_hash) { // Get all feeds for the chain, but only broadcast to those feeds that // are also subscribed to receive finality updates. for &feed_id in feeds.union(&self.feed_conn_id_finality) { diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 2aef316..cf4c6db 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -127,7 +127,7 @@ async fn run_soak_test(opts: SoakTestOpts) { tokio::spawn(async move { let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( Duration::from_secs(3), - format!("Node {}", idx + 1), + format!("Node {}", (ids_per_node * idx) + id + 1), "Polkadot".to_owned(), id + 1, );