Merge pull request #314 from paritytech/dp-cap-third-party-nodes

Cap number of nodes allowed to connect for third party networks
This commit is contained in:
David
2021-03-25 13:36:44 +01:00
committed by GitHub
9 changed files with 102 additions and 41 deletions
+3
View File
@@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix"
version = "0.10.0"
@@ -2383,6 +2385,7 @@ dependencies = [
"chrono",
"clap",
"fnv",
"lazy_static",
"log",
"num-traits",
"parking_lot 0.11.0",
+2 -1
View File
@@ -23,10 +23,11 @@ parking_lot = "0.11"
reqwest = "0.9.18"
rustc-hash = "1.1.0"
clap = "3.0.0-beta.2"
lazy_static = "1"
[profile.release]
lto = true
panic = "abort"
[patch.crates-io]
actix-web = { git = "https://github.com/maciejhirsz/actix-web", branch = "no-panic-normalize" }
actix-web = { git = "https://github.com/maciejhirsz/actix-web", branch = "no-panic-normalize" }
+46 -15
View File
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use actix::prelude::*;
use lazy_static::lazy_static;
use crate::node::connector::Initialize;
use crate::feed::connector::{FeedConnector, Connected, FeedId};
@@ -20,9 +21,27 @@ pub struct ChainEntry {
addr: Addr<Chain>,
label: Label,
network_id: Option<Label>,
/// Node count
nodes: usize,
/// Maximum allowed nodes
max_nodes: usize,
}
lazy_static! {
/// Labels of chains we consider "first party". These chains are allowed any
/// number of nodes to connect.
static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
let mut set = HashSet::new();
set.insert("Polkadot");
set.insert("Kusama");
set.insert("Westend");
set.insert("Rococo");
set
};
}
/// Max number of nodes allowed to connect to the telemetry server.
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
impl Aggregator {
pub fn new() -> Self {
Aggregator {
@@ -39,31 +58,28 @@ impl Aggregator {
pub fn lazy_chain(
&mut self,
label: &str,
network: &Option<Label>,
ctx: &mut <Self as Actor>::Context,
) -> ChainId {
let cid = match self.get_chain_id(label, network.as_ref()) {
let cid = match self.get_chain_id(label, None.as_ref()) {
Some(cid) => cid,
None => {
self.serializer.push(feed::AddedChain(&label, 1));
let addr = ctx.address();
let max_nodes = max_nodes(label);
let label: Label = label.into();
let cid = self.chains.add_with(|cid| {
ChainEntry {
addr: Chain::new(cid, addr, label.clone()).start(),
label: label.clone(),
network_id: network.clone(),
network_id: None, // TODO: this doesn't seem to be used anywhere. Can it be removed?
nodes: 1,
max_nodes,
}
});
self.labels.insert(label, cid);
if let Some(network) = network {
self.networks.insert(network.clone(), cid);
}
self.broadcast();
cid
@@ -177,15 +193,19 @@ impl Handler<AddNode> for Aggregator {
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, conn_id, rec } = msg;
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
let cid = self.lazy_chain(&node.chain, &None, ctx);
let cid = self.lazy_chain(&node.chain, ctx);
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");
chain.addr.do_send(chain::AddNode {
node,
conn_id,
rec,
});
if chain.nodes < chain.max_nodes {
chain.addr.do_send(chain::AddNode {
node,
conn_id,
rec,
});
} else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
}
}
}
@@ -345,3 +365,14 @@ impl Handler<GetHealth> for Aggregator {
self.chains.len()
}
}
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
/// no more.
fn max_nodes(label: &str) -> usize {
if FIRST_PARTY_NETWORKS.contains(label) {
usize::MAX
} else {
THIRD_PARTY_NETWORKS_MAX_NODES
}
}
+7 -8
View File
@@ -79,10 +79,8 @@ impl Chain {
if &*self.label.0 == label {
self.label.1 += 1;
} else {
if count > self.label.1 {
self.rename(label.into(), count);
}
} else if count > self.label.1 {
self.rename(label.into(), count);
}
}
@@ -172,7 +170,7 @@ impl Chain {
self.block_times.reset();
self.timestamp = timestamp;
self.serializer.push(feed::BestBlock(self.best.height, timestamp.unwrap_or_else(|| now), None));
self.serializer.push(feed::BestBlock(self.best.height, timestamp.unwrap_or(now), None));
self.serializer.push(feed::BestFinalized(finalized.height, finalized.hash));
}
}
@@ -253,12 +251,13 @@ impl Handler<AddNode> for Chain {
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, conn_id, rec } = msg;
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
self.increment_label_count(&node.chain);
let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();
if let Err(_) = rec.do_send(Initialize { nid, conn_id, chain }) {
if rec.do_send(Initialize { nid, conn_id, chain }).is_err() {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node));
@@ -286,7 +285,7 @@ impl Chain {
if block.height > self.best.height {
self.best = *block;
log::info!(
"[{}] [{}/{}] new best block ({}) {:?}",
"[{}] [nodes={}/feeds={}] new best block={}/{:?}",
self.label.0,
nodes_len,
self.feeds.len(),
@@ -455,7 +454,7 @@ impl Handler<Subscribe> for Chain {
self.serializer.push(feed::TimeSync(now()));
self.serializer.push(feed::BestBlock(
self.best.height,
self.timestamp.unwrap_or_else(|| 0),
self.timestamp.unwrap_or(0),
self.average_block_time,
));
self.serializer.push(feed::BestFinalized(self.finalized.height, self.finalized.hash));
+1 -1
View File
@@ -46,7 +46,7 @@ impl FeedMessageSerializer {
}
pub fn finalize(&mut self) -> Option<Serialized> {
if self.buffer.len() == 0 {
if self.buffer.is_empty() {
return None;
}
+37 -7
View File
@@ -20,10 +20,10 @@ use node::connector::NodeConnector;
use types::NodeId;
use util::{Locator, LocatorFactory};
const VERSION: &'static str = env!("CARGO_PKG_VERSION");
const AUTHORS: &'static str = env!("CARGO_PKG_AUTHORS");
const NAME: &'static str = "Substrate Telemetry Backend";
const ABOUT: &'static str = "This is the Telemetry Backend that injects and provide the data sent by Substrate/Polkadot nodes";
const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
const NAME: &str = "Substrate Telemetry Backend";
const ABOUT: &str = "This is the Telemetry Backend that injects and provide the data sent by Substrate/Polkadot nodes";
#[derive(Clap)]
#[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)]
@@ -35,6 +35,35 @@ struct Opts {
about = "This is the socket address Telemetry is listening to. This is restricted localhost (127.0.0.1) by default and should be fine for most use cases. If you are using Telemetry in a container, you likely want to set this to '0.0.0.0:8000'"
)]
socket: std::net::SocketAddr,
#[clap(
arg_enum,
required = false,
long = "log",
default_value = "info",
about = "Log level."
)]
log_level: LogLevel,
}
#[derive(Clap, Debug, PartialEq)]
enum LogLevel {
Error,
Warn,
Info,
Debug,
Trace,
}
impl From<&LogLevel> for log::LevelFilter {
fn from(log_level: &LogLevel) -> Self {
match log_level {
LogLevel::Error => log::LevelFilter::Error,
LogLevel::Warn => log::LevelFilter::Warn,
LogLevel::Info => log::LevelFilter::Info,
LogLevel::Debug => log::LevelFilter::Debug,
LogLevel::Trace => log::LevelFilter::Trace,
}
}
}
/// Entry point for connecting nodes
@@ -46,7 +75,7 @@ async fn node_route(
locator: web::Data<Addr<Locator>>,
) -> Result<HttpResponse, Error> {
let ip = req.connection_info().realip_remote_addr().and_then(|mut addr| {
if let Some(port_idx) = addr.find(":") {
if let Some(port_idx) = addr.find(':') {
addr = &addr[..port_idx];
}
addr.parse::<Ipv4Addr>().ok()
@@ -127,9 +156,10 @@ async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse,
/// This can be changed using the `PORT` and `BIND` ENV variables.
#[actix_web::main]
async fn main() -> std::io::Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init().expect("Must be able to start a logger");
let opts: Opts = Opts::parse();
let log_level = &opts.log_level;
SimpleLogger::new().with_level(log_level.into()).init().expect("Must be able to start a logger");
let aggregator = Aggregator::new().start();
let factory = LocatorFactory::new();
let locator = SyncArbiter::start(4, move || factory.create());
+1 -4
View File
@@ -89,10 +89,7 @@ impl Node {
}
pub fn location(&self) -> Option<&NodeLocation> {
match self.location {
Some(ref location) => Some(&**location),
None => None
}
self.location.as_deref()
}
pub fn update_location(&mut self, location: Arc<NodeLocation>) {
+2 -2
View File
@@ -118,7 +118,7 @@ impl NodeConnector {
_ => (),
}
self.aggregator.do_send(AddNode { rec, conn_id, node });
self.aggregator.do_send(AddNode { node, conn_id, rec });
} else {
if backlog.len() >= 10 {
backlog.remove(0);
@@ -165,7 +165,7 @@ impl Handler<Initialize> for NodeConnector {
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
let Initialize { nid, conn_id, chain } = msg;
log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id);
let mx = self.multiplex.entry(conn_id).or_default();
if let ConnMultiplex::Waiting { backlog } = mx {
+3 -3
View File
@@ -26,7 +26,7 @@ impl LocatorFactory {
// Default entry for localhost
cache.insert(
Ipv4Addr::new(127, 0, 0, 1),
Some(Arc::new(NodeLocation { latitude: 52.5166667, longitude: 13.4, city: "Berlin".into() })),
Some(Arc::new(NodeLocation { latitude: 52.516_6667, longitude: 13.4, city: "Berlin".into() })),
);
LocatorFactory {
@@ -64,7 +64,7 @@ impl IPApiLocate {
fn into_node_location(self) -> Option<NodeLocation> {
let IPApiLocate { city, loc } = self;
let mut loc = loc.split(",").map(|n| n.parse());
let mut loc = loc.split(',').map(|n| n.parse());
let latitude = loc.next()?.ok()?;
let longitude = loc.next()?.ok()?;
@@ -176,4 +176,4 @@ mod tests {
assert!(location.is_none());
}
}
}