mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-05-08 23:57:59 +00:00
First/Third party by genesis hash, not label. Make limit configurable and default to 1k (#424)
* First/Third party by genesis hash, not label. Make limit configurable * Fix a test that relies on not being a node limit * remove a now-invalid comment * Cargo fmt * Fix another naff comment * Update backend/telemetry_core/src/state/chain.rs Comment tweak Co-authored-by: David <dvdplm@gmail.com> Co-authored-by: David <dvdplm@gmail.com>
This commit is contained in:
@@ -41,6 +41,9 @@ pub struct AggregatorOpts {
|
||||
/// If our incoming message queue exceeds this length, we start
|
||||
/// dropping non-essential messages.
|
||||
pub max_queue_len: usize,
|
||||
/// How many nodes from third party chains are allowed to connect
|
||||
/// before we prevent connections from them.
|
||||
pub max_third_party_nodes: usize,
|
||||
}
|
||||
|
||||
struct AggregatorInternal {
|
||||
@@ -75,6 +78,7 @@ impl Aggregator {
|
||||
tx_to_locator,
|
||||
opts.max_queue_len,
|
||||
opts.denylist,
|
||||
opts.max_third_party_nodes,
|
||||
));
|
||||
|
||||
// Return a handle to our aggregator:
|
||||
@@ -93,10 +97,16 @@ impl Aggregator {
|
||||
tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>,
|
||||
max_queue_len: usize,
|
||||
denylist: Vec<String>,
|
||||
max_third_party_nodes: usize,
|
||||
) {
|
||||
inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len)
|
||||
.handle(rx_from_external)
|
||||
.await;
|
||||
inner_loop::InnerLoop::new(
|
||||
tx_to_aggregator,
|
||||
denylist,
|
||||
max_queue_len,
|
||||
max_third_party_nodes,
|
||||
)
|
||||
.handle(rx_from_external)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Gather metrics from our aggregator loop
|
||||
|
||||
@@ -195,9 +195,10 @@ impl InnerLoop {
|
||||
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
|
||||
denylist: Vec<String>,
|
||||
max_queue_len: usize,
|
||||
max_third_party_nodes: usize,
|
||||
) -> Self {
|
||||
InnerLoop {
|
||||
node_state: State::new(denylist),
|
||||
node_state: State::new(denylist, max_third_party_nodes),
|
||||
node_ids: BiMap::new(),
|
||||
feed_channels: HashMap::new(),
|
||||
shard_channels: HashMap::new(),
|
||||
|
||||
@@ -79,6 +79,9 @@ struct Opts {
|
||||
/// messages in an attempt to let it reduce?
|
||||
#[structopt(long)]
|
||||
aggregator_queue_len: Option<usize>,
|
||||
/// How many nodes from third party chains are allowed to connect before we prevent connections from them.
|
||||
#[structopt(long, default_value = "1000")]
|
||||
max_third_party_nodes: usize,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -128,6 +131,7 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()>
|
||||
AggregatorOpts {
|
||||
max_queue_len: aggregator_queue_len,
|
||||
denylist: opts.denylist,
|
||||
max_third_party_nodes: opts.max_third_party_nodes,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -20,6 +20,7 @@ use common::node_types::{BlockHash, BlockNumber};
|
||||
use common::{id_type, time, DenseMap, MostSeen, NumStats};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::feed_message::{self, FeedMessageSerializer};
|
||||
use crate::find_location;
|
||||
@@ -53,6 +54,8 @@ pub struct Chain {
|
||||
timestamp: Option<Timestamp>,
|
||||
/// Genesis hash of this chain
|
||||
genesis_hash: BlockHash,
|
||||
/// Maximum number of nodes allowed to connect from this chain
|
||||
max_nodes: usize,
|
||||
}
|
||||
|
||||
pub enum AddNodeResult {
|
||||
@@ -67,23 +70,31 @@ pub struct RemoveNodeResult {
|
||||
pub chain_renamed: bool,
|
||||
}
|
||||
|
||||
/// Labels of chains we consider "first party". These chains allow any
|
||||
/// Genesis hashes of chains we consider "first party". These chains allow any
|
||||
/// number of nodes to connect.
|
||||
static FIRST_PARTY_NETWORKS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
|
||||
let mut set = HashSet::new();
|
||||
set.insert("Polkadot");
|
||||
set.insert("Kusama");
|
||||
set.insert("Westend");
|
||||
set.insert("Rococo");
|
||||
set
|
||||
static FIRST_PARTY_NETWORKS: Lazy<HashSet<BlockHash>> = Lazy::new(|| {
|
||||
let genesis_hash_strs = &[
|
||||
"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3", // Polkadot
|
||||
"0xb0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe", // Kusama
|
||||
"0xe143f23803ac50e8f6f8e62695d1ce9e4e1d68aa36c1cd2cfd15340213f3423e", // Westend
|
||||
"0xf6e9983c37baf68846fedafe21e56718790e39fb1c582abc408b81bc7b208f9a", // Rococo
|
||||
];
|
||||
|
||||
genesis_hash_strs
|
||||
.iter()
|
||||
.map(|h| BlockHash::from_str(h).expect("hardcoded hash str should be valid"))
|
||||
.collect()
|
||||
});
|
||||
|
||||
/// Max number of nodes allowed to connect to the telemetry server.
|
||||
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
|
||||
/// When we construct a chain, we want to check to see whether or not it's a "first party"
|
||||
/// network first, and assign a `max_nodes` accordingly. This helps us do that.
|
||||
pub fn is_first_party_network(genesis_hash: &BlockHash) -> bool {
|
||||
FIRST_PARTY_NETWORKS.contains(genesis_hash)
|
||||
}
|
||||
|
||||
impl Chain {
|
||||
/// Create a new chain with an initial label.
|
||||
pub fn new(genesis_hash: BlockHash) -> Self {
|
||||
pub fn new(genesis_hash: BlockHash, max_nodes: usize) -> Self {
|
||||
Chain {
|
||||
labels: MostSeen::default(),
|
||||
nodes: DenseMap::new(),
|
||||
@@ -93,14 +104,13 @@ impl Chain {
|
||||
average_block_time: None,
|
||||
timestamp: None,
|
||||
genesis_hash,
|
||||
max_nodes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Is the chain the node belongs to overquota?
|
||||
pub fn is_overquota(&self) -> bool {
|
||||
// Dynamically determine the max nodes based on the most common
|
||||
// label so far, in case it changes to something with a different limit.
|
||||
self.nodes.len() >= max_nodes(self.labels.best())
|
||||
self.nodes.len() >= self.max_nodes
|
||||
}
|
||||
|
||||
/// Assign a node to this chain.
|
||||
@@ -373,14 +383,3 @@ impl Chain {
|
||||
self.genesis_hash
|
||||
}
|
||||
}
|
||||
|
||||
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
|
||||
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
|
||||
/// no more.
|
||||
fn max_nodes(label: &str) -> usize {
|
||||
if FIRST_PARTY_NETWORKS.contains(label) {
|
||||
usize::MAX
|
||||
} else {
|
||||
THIRD_PARTY_NETWORKS_MAX_NODES
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,10 @@ pub struct State {
|
||||
|
||||
/// Chain labels that we do not want to allow connecting.
|
||||
denylist: HashSet<String>,
|
||||
|
||||
/// How many nodes from third party chains are allowed to connect
|
||||
/// before we prevent connections from them.
|
||||
max_third_party_nodes: usize,
|
||||
}
|
||||
|
||||
/// Adding a node to a chain leads to this node_idult
|
||||
@@ -102,11 +106,12 @@ pub struct RemovedNode {
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn new<T: IntoIterator<Item = String>>(denylist: T) -> State {
|
||||
pub fn new<T: IntoIterator<Item = String>>(denylist: T, max_third_party_nodes: usize) -> State {
|
||||
State {
|
||||
chains: DenseMap::new(),
|
||||
chains_by_genesis_hash: HashMap::new(),
|
||||
denylist: denylist.into_iter().collect(),
|
||||
max_third_party_nodes,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +148,11 @@ impl State {
|
||||
let chain_id = match self.chains_by_genesis_hash.get(&genesis_hash) {
|
||||
Some(id) => *id,
|
||||
None => {
|
||||
let chain_id = self.chains.add(Chain::new(genesis_hash));
|
||||
let max_nodes = match chain::is_first_party_network(&genesis_hash) {
|
||||
true => usize::MAX,
|
||||
false => self.max_third_party_nodes,
|
||||
};
|
||||
let chain_id = self.chains.add(Chain::new(genesis_hash, max_nodes));
|
||||
self.chains_by_genesis_hash.insert(genesis_hash, chain_id);
|
||||
chain_id
|
||||
}
|
||||
@@ -290,7 +299,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn adding_a_node_returns_expected_response() {
|
||||
let mut state = State::new(None);
|
||||
let mut state = State::new(None, 1000);
|
||||
|
||||
let chain1_genesis = BlockHash::from_low_u64_be(1);
|
||||
|
||||
@@ -325,7 +334,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn adding_and_removing_nodes_updates_chain_label_mapping() {
|
||||
let mut state = State::new(None);
|
||||
let mut state = State::new(None, 1000);
|
||||
|
||||
let chain1_genesis = BlockHash::from_low_u64_be(1);
|
||||
let node_id0 = state
|
||||
@@ -385,7 +394,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn chain_removed_when_last_node_is() {
|
||||
let mut state = State::new(None);
|
||||
let mut state = State::new(None, 1000);
|
||||
|
||||
let chain1_genesis = BlockHash::from_low_u64_be(1);
|
||||
let node_id = state
|
||||
|
||||
@@ -39,13 +39,18 @@ cargo test e2e -- --ignored
|
||||
use common::node_types::BlockHash;
|
||||
use common::ws_client::SentMessage;
|
||||
use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use std::{str::FromStr, time::Duration};
|
||||
use test_utils::{
|
||||
assert_contains_matches,
|
||||
feed_message_de::{FeedMessage, NodeDetails},
|
||||
workspace::{start_server, start_server_debug, CoreOpts, ServerOpts, ShardOpts},
|
||||
};
|
||||
|
||||
fn polkadot_genesis_hash() -> BlockHash {
|
||||
BlockHash::from_str("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3")
|
||||
.expect("valid polkadot genesis hash")
|
||||
}
|
||||
|
||||
/// Helper for concise testing
|
||||
fn ghash(id: u64) -> BlockHash {
|
||||
BlockHash::from_low_u64_be(id)
|
||||
@@ -641,7 +646,7 @@ async fn e2e_slow_feeds_are_disconnected() {
|
||||
"authority":true,
|
||||
"chain":"Polkadot",
|
||||
"config":"",
|
||||
"genesis_hash": ghash(1),
|
||||
"genesis_hash": polkadot_genesis_hash(), // First party node connections aren't limited.
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", n),
|
||||
|
||||
Reference in New Issue
Block a user