From 7ff9a316d1e97c68e8db371bd9ea8c5dd2f7f87a Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 13 Oct 2021 13:08:53 +0100 Subject: [PATCH] First/Third party by genesis hash, not label. Make limit configurable and default to 1k (#424) * First/Third party by genesis hash, not label. Make limit configurable * Fix a test that relies on not being a node limit * remove a now-invalid comment * Cargo fmt * Fix another naff comment * Update backend/telemetry_core/src/state/chain.rs Comment tweak Co-authored-by: David Co-authored-by: David --- .../src/aggregator/aggregator.rs | 16 ++++-- .../src/aggregator/inner_loop.rs | 3 +- backend/telemetry_core/src/main.rs | 4 ++ backend/telemetry_core/src/state/chain.rs | 49 +++++++++---------- backend/telemetry_core/src/state/state.rs | 19 +++++-- backend/telemetry_core/tests/e2e_tests.rs | 9 +++- 6 files changed, 64 insertions(+), 36 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index c172680..f46bdca 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -41,6 +41,9 @@ pub struct AggregatorOpts { /// If our incoming message queue exceeds this length, we start /// dropping non-essential messages. pub max_queue_len: usize, + /// How many nodes from third party chains are allowed to connect + /// before we prevent connections from them. + pub max_third_party_nodes: usize, } struct AggregatorInternal { @@ -75,6 +78,7 @@ impl Aggregator { tx_to_locator, opts.max_queue_len, opts.denylist, + opts.max_third_party_nodes, )); // Return a handle to our aggregator: @@ -93,10 +97,16 @@ impl Aggregator { tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>, max_queue_len: usize, denylist: Vec, + max_third_party_nodes: usize, ) { - inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len) - .handle(rx_from_external) - .await; + inner_loop::InnerLoop::new( + tx_to_aggregator, + denylist, + max_queue_len, + max_third_party_nodes, + ) + .handle(rx_from_external) + .await; } /// Gather metrics from our aggregator loop diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index d6c1f40..8aaf632 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -195,9 +195,10 @@ impl InnerLoop { tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, denylist: Vec, max_queue_len: usize, + max_third_party_nodes: usize, ) -> Self { InnerLoop { - node_state: State::new(denylist), + node_state: State::new(denylist, max_third_party_nodes), node_ids: BiMap::new(), feed_channels: HashMap::new(), shard_channels: HashMap::new(), diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 3af3f6f..977b048 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -79,6 +79,9 @@ struct Opts { /// messages in an attempt to let it reduce? #[structopt(long)] aggregator_queue_len: Option, + /// How many nodes from third party chains are allowed to connect before we prevent connections from them. + #[structopt(long, default_value = "1000")] + max_third_party_nodes: usize, } fn main() { @@ -128,6 +131,7 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> AggregatorOpts { max_queue_len: aggregator_queue_len, denylist: opts.denylist, + max_third_party_nodes: opts.max_third_party_nodes, }, ) .await?; diff --git a/backend/telemetry_core/src/state/chain.rs b/backend/telemetry_core/src/state/chain.rs index 39f1334..3f7e322 100644 --- a/backend/telemetry_core/src/state/chain.rs +++ b/backend/telemetry_core/src/state/chain.rs @@ -20,6 +20,7 @@ use common::node_types::{BlockHash, BlockNumber}; use common::{id_type, time, DenseMap, MostSeen, NumStats}; use once_cell::sync::Lazy; use std::collections::HashSet; +use std::str::FromStr; use crate::feed_message::{self, FeedMessageSerializer}; use crate::find_location; @@ -53,6 +54,8 @@ pub struct Chain { timestamp: Option, /// Genesis hash of this chain genesis_hash: BlockHash, + /// Maximum number of nodes allowed to connect from this chain + max_nodes: usize, } pub enum AddNodeResult { @@ -67,23 +70,31 @@ pub struct RemoveNodeResult { pub chain_renamed: bool, } -/// Labels of chains we consider "first party". These chains allow any +/// Genesis hashes of chains we consider "first party". These chains allow any /// number of nodes to connect. -static FIRST_PARTY_NETWORKS: Lazy> = Lazy::new(|| { - let mut set = HashSet::new(); - set.insert("Polkadot"); - set.insert("Kusama"); - set.insert("Westend"); - set.insert("Rococo"); - set +static FIRST_PARTY_NETWORKS: Lazy> = Lazy::new(|| { + let genesis_hash_strs = &[ + "0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3", // Polkadot + "0xb0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe", // Kusama + "0xe143f23803ac50e8f6f8e62695d1ce9e4e1d68aa36c1cd2cfd15340213f3423e", // Westend + "0xf6e9983c37baf68846fedafe21e56718790e39fb1c582abc408b81bc7b208f9a", // Rococo + ]; + + genesis_hash_strs + .iter() + .map(|h| BlockHash::from_str(h).expect("hardcoded hash str should be valid")) + .collect() }); -/// Max number of nodes allowed to connect to the telemetry server. -const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500; +/// When we construct a chain, we want to check to see whether or not it's a "first party" +/// network first, and assign a `max_nodes` accordingly. This helps us do that. +pub fn is_first_party_network(genesis_hash: &BlockHash) -> bool { + FIRST_PARTY_NETWORKS.contains(genesis_hash) +} impl Chain { /// Create a new chain with an initial label. - pub fn new(genesis_hash: BlockHash) -> Self { + pub fn new(genesis_hash: BlockHash, max_nodes: usize) -> Self { Chain { labels: MostSeen::default(), nodes: DenseMap::new(), @@ -93,14 +104,13 @@ impl Chain { average_block_time: None, timestamp: None, genesis_hash, + max_nodes, } } /// Is the chain the node belongs to overquota? pub fn is_overquota(&self) -> bool { - // Dynamically determine the max nodes based on the most common - // label so far, in case it changes to something with a different limit. - self.nodes.len() >= max_nodes(self.labels.best()) + self.nodes.len() >= self.max_nodes } /// Assign a node to this chain. @@ -373,14 +383,3 @@ impl Chain { self.genesis_hash } } - -/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes. -/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and -/// no more. -fn max_nodes(label: &str) -> usize { - if FIRST_PARTY_NETWORKS.contains(label) { - usize::MAX - } else { - THIRD_PARTY_NETWORKS_MAX_NODES - } -} diff --git a/backend/telemetry_core/src/state/state.rs b/backend/telemetry_core/src/state/state.rs index f507cd7..e5ca5ec 100644 --- a/backend/telemetry_core/src/state/state.rs +++ b/backend/telemetry_core/src/state/state.rs @@ -50,6 +50,10 @@ pub struct State { /// Chain labels that we do not want to allow connecting. denylist: HashSet, + + /// How many nodes from third party chains are allowed to connect + /// before we prevent connections from them. + max_third_party_nodes: usize, } /// Adding a node to a chain leads to this node_idult @@ -102,11 +106,12 @@ pub struct RemovedNode { } impl State { - pub fn new>(denylist: T) -> State { + pub fn new>(denylist: T, max_third_party_nodes: usize) -> State { State { chains: DenseMap::new(), chains_by_genesis_hash: HashMap::new(), denylist: denylist.into_iter().collect(), + max_third_party_nodes, } } @@ -143,7 +148,11 @@ impl State { let chain_id = match self.chains_by_genesis_hash.get(&genesis_hash) { Some(id) => *id, None => { - let chain_id = self.chains.add(Chain::new(genesis_hash)); + let max_nodes = match chain::is_first_party_network(&genesis_hash) { + true => usize::MAX, + false => self.max_third_party_nodes, + }; + let chain_id = self.chains.add(Chain::new(genesis_hash, max_nodes)); self.chains_by_genesis_hash.insert(genesis_hash, chain_id); chain_id } @@ -290,7 +299,7 @@ mod test { #[test] fn adding_a_node_returns_expected_response() { - let mut state = State::new(None); + let mut state = State::new(None, 1000); let chain1_genesis = BlockHash::from_low_u64_be(1); @@ -325,7 +334,7 @@ mod test { #[test] fn adding_and_removing_nodes_updates_chain_label_mapping() { - let mut state = State::new(None); + let mut state = State::new(None, 1000); let chain1_genesis = BlockHash::from_low_u64_be(1); let node_id0 = state @@ -385,7 +394,7 @@ mod test { #[test] fn chain_removed_when_last_node_is() { - let mut state = State::new(None); + let mut state = State::new(None, 1000); let chain1_genesis = BlockHash::from_low_u64_be(1); let node_id = state diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index b6afd61..a9e27ca 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -39,13 +39,18 @@ cargo test e2e -- --ignored use common::node_types::BlockHash; use common::ws_client::SentMessage; use serde_json::json; -use std::time::Duration; +use std::{str::FromStr, time::Duration}; use test_utils::{ assert_contains_matches, feed_message_de::{FeedMessage, NodeDetails}, workspace::{start_server, start_server_debug, CoreOpts, ServerOpts, ShardOpts}, }; +fn polkadot_genesis_hash() -> BlockHash { + BlockHash::from_str("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3") + .expect("valid polkadot genesis hash") +} + /// Helper for concise testing fn ghash(id: u64) -> BlockHash { BlockHash::from_low_u64_be(id) @@ -641,7 +646,7 @@ async fn e2e_slow_feeds_are_disconnected() { "authority":true, "chain":"Polkadot", "config":"", - "genesis_hash": ghash(1), + "genesis_hash": polkadot_genesis_hash(), // First party node connections aren't limited. "implementation":"Substrate Node", "msg":"system.connected", "name": format!("Alice {}", n),