mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-30 08:28:00 +00:00
Set max_nodes in ChainEntry
Only add a new node if the current number of connected nodes is below the max Define "first party networks" to be: Polkadot, Kusama, Westend and Rococo. All other networks are capped to 500 nodes
This commit is contained in:
+46
-15
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use actix::prelude::*;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
use crate::node::connector::Initialize;
|
||||
use crate::feed::connector::{FeedConnector, Connected, FeedId};
|
||||
@@ -20,9 +21,27 @@ pub struct ChainEntry {
|
||||
addr: Addr<Chain>,
|
||||
label: Label,
|
||||
network_id: Option<Label>,
|
||||
/// Node count
|
||||
nodes: usize,
|
||||
/// Maximum allowed nodes
|
||||
max_nodes: usize,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
/// Labels of chains we consider "first party". These chains are allowed any
|
||||
/// number of nodes to connect.
|
||||
static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
|
||||
let mut set = HashSet::new();
|
||||
set.insert("Polkadot");
|
||||
set.insert("Kusama");
|
||||
set.insert("Westend");
|
||||
set.insert("Rococo");
|
||||
set
|
||||
};
|
||||
}
|
||||
/// Max number of nodes allowed to connect to the telemetry server.
|
||||
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
|
||||
|
||||
impl Aggregator {
|
||||
pub fn new() -> Self {
|
||||
Aggregator {
|
||||
@@ -39,31 +58,28 @@ impl Aggregator {
|
||||
pub fn lazy_chain(
|
||||
&mut self,
|
||||
label: &str,
|
||||
network: &Option<Label>,
|
||||
ctx: &mut <Self as Actor>::Context,
|
||||
) -> ChainId {
|
||||
let cid = match self.get_chain_id(label, network.as_ref()) {
|
||||
let cid = match self.get_chain_id(label, None.as_ref()) {
|
||||
Some(cid) => cid,
|
||||
None => {
|
||||
self.serializer.push(feed::AddedChain(&label, 1));
|
||||
|
||||
let addr = ctx.address();
|
||||
let max_nodes = max_nodes(label);
|
||||
let label: Label = label.into();
|
||||
let cid = self.chains.add_with(|cid| {
|
||||
ChainEntry {
|
||||
addr: Chain::new(cid, addr, label.clone()).start(),
|
||||
label: label.clone(),
|
||||
network_id: network.clone(),
|
||||
network_id: None, // TODO: this doesn't seem to be used anywhere. Can it be removed?
|
||||
nodes: 1,
|
||||
max_nodes,
|
||||
}
|
||||
});
|
||||
|
||||
self.labels.insert(label, cid);
|
||||
|
||||
if let Some(network) = network {
|
||||
self.networks.insert(network.clone(), cid);
|
||||
}
|
||||
|
||||
self.broadcast();
|
||||
|
||||
cid
|
||||
@@ -177,15 +193,19 @@ impl Handler<AddNode> for Aggregator {
|
||||
|
||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||
let AddNode { node, conn_id, rec } = msg;
|
||||
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
|
||||
|
||||
let cid = self.lazy_chain(&node.chain, &None, ctx);
|
||||
let cid = self.lazy_chain(&node.chain, ctx);
|
||||
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");
|
||||
|
||||
chain.addr.do_send(chain::AddNode {
|
||||
node,
|
||||
conn_id,
|
||||
rec,
|
||||
});
|
||||
if chain.nodes < chain.max_nodes {
|
||||
chain.addr.do_send(chain::AddNode {
|
||||
node,
|
||||
conn_id,
|
||||
rec,
|
||||
});
|
||||
} else {
|
||||
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,3 +365,14 @@ impl Handler<GetHealth> for Aggregator {
|
||||
self.chains.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
|
||||
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
|
||||
/// no more.
|
||||
fn max_nodes(label: &str) -> usize {
|
||||
if FIRST_PARTY_NETWORKS.contains(label) {
|
||||
usize::MAX
|
||||
} else {
|
||||
THIRD_PARTY_NETWORKS_MAX_NODES
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,6 +253,7 @@ impl Handler<AddNode> for Chain {
|
||||
|
||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||
let AddNode { node, conn_id, rec } = msg;
|
||||
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
|
||||
self.increment_label_count(&node.chain);
|
||||
|
||||
let nid = self.nodes.add(Node::new(node));
|
||||
@@ -286,7 +287,7 @@ impl Chain {
|
||||
if block.height > self.best.height {
|
||||
self.best = *block;
|
||||
log::info!(
|
||||
"[{}] [{}/{}] new best block ({}) {:?}",
|
||||
"[{}] [nodes={}/feeds={}] new best block={}/{:?}",
|
||||
self.label.0,
|
||||
nodes_len,
|
||||
self.feeds.len(),
|
||||
|
||||
+2
-1
@@ -127,7 +127,8 @@ async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse,
|
||||
/// This can be changed using the `PORT` and `BIND` ENV variables.
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
SimpleLogger::new().with_level(log::LevelFilter::Info).init().expect("Must be able to start a logger");
|
||||
// SimpleLogger::new().with_level(log::LevelFilter::Info).init().expect("Must be able to start a logger");
|
||||
SimpleLogger::new().with_level(log::LevelFilter::Debug).init().expect("Must be able to start a logger");
|
||||
|
||||
let opts: Opts = Opts::parse();
|
||||
let aggregator = Aggregator::new().start();
|
||||
|
||||
@@ -165,7 +165,7 @@ impl Handler<Initialize> for NodeConnector {
|
||||
|
||||
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
|
||||
let Initialize { nid, conn_id, chain } = msg;
|
||||
|
||||
log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id);
|
||||
let mx = self.multiplex.entry(conn_id).or_default();
|
||||
|
||||
if let ConnMultiplex::Waiting { backlog } = mx {
|
||||
|
||||
Reference in New Issue
Block a user