Don't remove all feeds subscribed to a chain when one disconnects (#383)

* Only remove the feed that disconnected to not break the rest...

* use multimap struct to avoid sync issues between feed and chain

* add a remove test, too

* cargo fmt

* fix name of test

* move multimap to common so we can doctest it and add 'unique' to name

* cargo fmt

* Return old key if value moved to make uniqueness more obvious
This commit is contained in:
James Wilson
2021-08-27 08:05:44 +01:00
committed by GitHub
parent 19db1a48ef
commit 7a3e30cb01
4 changed files with 180 additions and 28 deletions
+2
View File
@@ -30,6 +30,7 @@ mod dense_map;
mod either_sink;
mod mean_list;
mod most_seen;
mod multi_map_unique;
mod num_stats;
// Export a bunch of common bits at the top level for ease of import:
@@ -38,4 +39,5 @@ pub use dense_map::DenseMap;
pub use either_sink::EitherSink;
pub use mean_list::MeanList;
pub use most_seen::MostSeen;
pub use multi_map_unique::MultiMapUnique;
pub use num_stats::NumStats;
+166
View File
@@ -0,0 +1,166 @@
// Source code for the Substrate Telemetry Server.
// Copyright (C) 2021 Parity Technologies (UK) Ltd.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
/// A map where each key can contain multiple values. We enforce that a value
/// only ever belongs to one key at a time (the latest key it was inserted
/// against).
pub struct MultiMapUnique<K, V> {
value_to_key: HashMap<V, K>,
key_to_values: HashMap<K, HashSet<V>>,
}
impl<K, V> MultiMapUnique<K, V> {
/// Construct a new MultiMap
pub fn new() -> Self {
Self {
value_to_key: HashMap::new(),
key_to_values: HashMap::new(),
}
}
/// Return the set of values associated with a key.
pub fn get_values(&self, key: &K) -> Option<&HashSet<V>>
where
K: Eq + Hash,
{
self.key_to_values.get(key)
}
/// Remove a value from the MultiMap, returning the key it was found
/// under, if it was found at all.
///
/// ```
/// let mut m = common::MultiMapUnique::new();
///
/// m.insert("a", 1);
/// m.insert("a", 2);
///
/// m.insert("b", 3);
/// m.insert("b", 4);
///
/// assert_eq!(m.num_keys(), 2);
/// assert_eq!(m.num_values(), 4);
///
/// m.remove_value(&1);
///
/// assert_eq!(m.num_keys(), 2);
/// assert_eq!(m.num_values(), 3);
///
/// m.remove_value(&2);
///
/// assert_eq!(m.num_keys(), 1);
/// assert_eq!(m.num_values(), 2);
/// ```
pub fn remove_value(&mut self, value: &V) -> Option<K>
where
V: Eq + Hash,
K: Eq + Hash,
{
if let Some(key) = self.value_to_key.remove(value) {
if let Some(m) = self.key_to_values.get_mut(&key) {
m.remove(value);
if m.is_empty() {
self.key_to_values.remove(&key);
}
}
return Some(key);
}
None
}
/// Insert a key+value pair into the multimap. Multiple different
/// values can exist for a single key, but only one of each value can
/// exist in the MultiMap.
///
/// If a previous value did exist, the old key it was inserted against
/// is returned.
///
/// ```
/// let mut m = common::MultiMapUnique::new();
///
/// let old_key = m.insert("a", 1);
/// assert_eq!(old_key, None);
///
/// let old_key = m.insert("b", 1);
/// assert_eq!(old_key, Some("a"));
///
/// let old_key = m.insert("c", 1);
/// assert_eq!(old_key, Some("b"));
///
/// assert_eq!(m.num_keys(), 1);
/// assert_eq!(m.num_values(), 1);
///
/// // The value `1` must be unique in the map, so it only exists
/// // in the last location it was inserted:
/// assert!(m.get_values(&"a").is_none());
/// assert!(m.get_values(&"b").is_none());
/// assert_eq!(m.get_values(&"c").unwrap().iter().collect::<Vec<_>>(), vec![&1]);
/// ```
pub fn insert(&mut self, key: K, value: V) -> Option<K>
where
V: Clone + Eq + Hash,
K: Clone + Eq + Hash,
{
// Ensure that the value doesn't exist elsewhere already;
// values must be unique and only belong to one key:
let old_key = self.remove_value(&value);
self.value_to_key.insert(value.clone(), key.clone());
self.key_to_values.entry(key).or_default().insert(value);
old_key
}
/// Number of values stored in the map
pub fn num_values(&self) -> usize {
self.value_to_key.len()
}
/// Number of keys stored in the map
pub fn num_keys(&self) -> usize {
self.key_to_values.len()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn multiple_values_allowed_per_key() {
let mut m = MultiMapUnique::new();
m.insert("a", 1);
m.insert("a", 2);
m.insert("b", 3);
m.insert("b", 4);
assert_eq!(m.num_keys(), 2);
assert_eq!(m.num_values(), 4);
let a_vals = m.get_values(&"a").expect("a vals");
assert!(a_vals.contains(&1));
assert!(a_vals.contains(&2));
let b_vals = m.get_values(&"b").expect("b vals");
assert!(b_vals.contains(&3));
assert!(b_vals.contains(&4));
}
}
@@ -23,7 +23,7 @@ use common::{
internal_messages::{self, MuteReason, ShardNodeId},
node_message,
node_types::BlockHash,
time,
time, MultiMapUnique,
};
use std::collections::{HashMap, HashSet};
use std::sync::{
@@ -169,12 +169,8 @@ pub struct InnerLoop {
/// Keep track of how to send messages out to shards.
shard_channels: HashMap<ConnId, flume::Sender<ToShardWebsocket>>,
/// Which chain is a feed subscribed to?
/// Feed Connection ID -> Chain Genesis Hash
feed_conn_id_to_chain: HashMap<ConnId, BlockHash>,
/// Which feeds are subscribed to a given chain (needs to stay in sync with above)?
/// Chain Genesis Hash -> Feed Connection IDs
chain_to_feed_conn_ids: HashMap<BlockHash, HashSet<ConnId>>,
/// Which feeds are subscribed to a given chain?
chain_to_feed_conn_ids: MultiMapUnique<BlockHash, ConnId>,
/// These feeds want finality info, too.
feed_conn_id_finality: HashSet<ConnId>,
@@ -199,8 +195,7 @@ impl InnerLoop {
node_ids: BiMap::new(),
feed_channels: HashMap::new(),
shard_channels: HashMap::new(),
feed_conn_id_to_chain: HashMap::new(),
chain_to_feed_conn_ids: HashMap::new(),
chain_to_feed_conn_ids: MultiMapUnique::new(),
feed_conn_id_finality: HashSet::new(),
tx_to_locator,
max_queue_len,
@@ -272,8 +267,8 @@ impl InnerLoop {
) {
let timestamp_unix_ms = time::now();
let connected_nodes = self.node_ids.len();
let subscribed_feeds = self.feed_conn_id_to_chain.len();
let chains_subscribed_to = self.chain_to_feed_conn_ids.len();
let subscribed_feeds = self.chain_to_feed_conn_ids.num_values();
let chains_subscribed_to = self.chain_to_feed_conn_ids.num_keys();
let subscribed_finality_feeds = self.feed_conn_id_finality.len();
let connected_shards = self.shard_channels.len();
let connected_feeds = self.feed_channels.len();
@@ -492,12 +487,7 @@ impl InnerLoop {
};
// Unsubscribe from previous chain if subscribed to one:
let old_genesis_hash = self.feed_conn_id_to_chain.remove(&feed_conn_id);
if let Some(old_genesis_hash) = &old_genesis_hash {
if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_genesis_hash) {
map.remove(&feed_conn_id);
}
}
let old_genesis_hash = self.chain_to_feed_conn_ids.remove_value(&feed_conn_id);
// Untoggle request for finality feeds:
self.feed_conn_id_finality.remove(&feed_conn_id);
@@ -570,12 +560,8 @@ impl InnerLoop {
// Actually make a note of the new chain subsciption:
let new_genesis_hash = *new_chain.genesis_hash();
self.feed_conn_id_to_chain
.insert(feed_conn_id, new_genesis_hash);
self.chain_to_feed_conn_ids
.entry(new_genesis_hash)
.or_default()
.insert(feed_conn_id);
.insert(new_genesis_hash, feed_conn_id);
}
FromFeedWebsocket::SendFinality => {
self.feed_conn_id_finality.insert(feed_conn_id);
@@ -585,9 +571,7 @@ impl InnerLoop {
}
FromFeedWebsocket::Disconnected => {
// The feed has disconnected; clean up references to it:
if let Some(chain) = self.feed_conn_id_to_chain.remove(&feed_conn_id) {
self.chain_to_feed_conn_ids.remove(&chain);
}
self.chain_to_feed_conn_ids.remove_value(&feed_conn_id);
self.feed_channels.remove(&feed_conn_id);
self.feed_conn_id_finality.remove(&feed_conn_id);
}
@@ -677,7 +661,7 @@ impl InnerLoop {
/// Send a message to all chain feeds.
fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get_values(genesis_hash) {
for &feed_id in feeds {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.send(message.clone());
@@ -717,7 +701,7 @@ impl InnerLoop {
genesis_hash: &BlockHash,
message: ToFeedWebsocket,
) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get_values(genesis_hash) {
// Get all feeds for the chain, but only broadcast to those feeds that
// are also subscribed to receive finality updates.
for &feed_id in feeds.union(&self.feed_conn_id_finality) {
+1 -1
View File
@@ -127,7 +127,7 @@ async fn run_soak_test(opts: SoakTestOpts) {
tokio::spawn(async move {
let telemetry = test_utils::fake_telemetry::FakeTelemetry::new(
Duration::from_secs(3),
format!("Node {}", idx + 1),
format!("Node {}", (ids_per_node * idx) + id + 1),
"Polkadot".to_owned(),
id + 1,
);