mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-05-31 15:11:05 +00:00
Merge branch 'master' into dp-add-denylist-for-unwanted-networks
This commit is contained in:
Generated
+3
@@ -1,5 +1,7 @@
|
|||||||
# This file is automatically @generated by Cargo.
|
# This file is automatically @generated by Cargo.
|
||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "actix"
|
name = "actix"
|
||||||
version = "0.10.0"
|
version = "0.10.0"
|
||||||
@@ -2383,6 +2385,7 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"fnv",
|
"fnv",
|
||||||
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"parking_lot 0.11.0",
|
"parking_lot 0.11.0",
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ parking_lot = "0.11"
|
|||||||
reqwest = "0.9.18"
|
reqwest = "0.9.18"
|
||||||
rustc-hash = "1.1.0"
|
rustc-hash = "1.1.0"
|
||||||
clap = "3.0.0-beta.2"
|
clap = "3.0.0-beta.2"
|
||||||
|
lazy_static = "1"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
lto = true
|
lto = true
|
||||||
|
|||||||
+45
-14
@@ -1,5 +1,6 @@
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
use crate::node::connector::Initialize;
|
use crate::node::connector::Initialize;
|
||||||
use crate::feed::connector::{FeedConnector, Connected, FeedId};
|
use crate::feed::connector::{FeedConnector, Connected, FeedId};
|
||||||
@@ -22,9 +23,27 @@ pub struct ChainEntry {
|
|||||||
addr: Addr<Chain>,
|
addr: Addr<Chain>,
|
||||||
label: Label,
|
label: Label,
|
||||||
network_id: Option<Label>,
|
network_id: Option<Label>,
|
||||||
|
/// Node count
|
||||||
nodes: usize,
|
nodes: usize,
|
||||||
|
/// Maximum allowed nodes
|
||||||
|
max_nodes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
/// Labels of chains we consider "first party". These chains are allowed any
|
||||||
|
/// number of nodes to connect.
|
||||||
|
static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
|
||||||
|
let mut set = HashSet::new();
|
||||||
|
set.insert("Polkadot");
|
||||||
|
set.insert("Kusama");
|
||||||
|
set.insert("Westend");
|
||||||
|
set.insert("Rococo");
|
||||||
|
set
|
||||||
|
};
|
||||||
|
}
|
||||||
|
/// Max number of nodes allowed to connect to the telemetry server.
|
||||||
|
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
|
||||||
|
|
||||||
impl Aggregator {
|
impl Aggregator {
|
||||||
pub fn new(denylist: HashSet<String>) -> Self {
|
pub fn new(denylist: HashSet<String>) -> Self {
|
||||||
Aggregator {
|
Aggregator {
|
||||||
@@ -42,31 +61,28 @@ impl Aggregator {
|
|||||||
pub fn lazy_chain(
|
pub fn lazy_chain(
|
||||||
&mut self,
|
&mut self,
|
||||||
label: &str,
|
label: &str,
|
||||||
network: &Option<Label>,
|
|
||||||
ctx: &mut <Self as Actor>::Context,
|
ctx: &mut <Self as Actor>::Context,
|
||||||
) -> ChainId {
|
) -> ChainId {
|
||||||
let cid = match self.get_chain_id(label, network.as_ref()) {
|
let cid = match self.get_chain_id(label, None.as_ref()) {
|
||||||
Some(cid) => cid,
|
Some(cid) => cid,
|
||||||
None => {
|
None => {
|
||||||
self.serializer.push(feed::AddedChain(&label, 1));
|
self.serializer.push(feed::AddedChain(&label, 1));
|
||||||
|
|
||||||
let addr = ctx.address();
|
let addr = ctx.address();
|
||||||
|
let max_nodes = max_nodes(label);
|
||||||
let label: Label = label.into();
|
let label: Label = label.into();
|
||||||
let cid = self.chains.add_with(|cid| {
|
let cid = self.chains.add_with(|cid| {
|
||||||
ChainEntry {
|
ChainEntry {
|
||||||
addr: Chain::new(cid, addr, label.clone()).start(),
|
addr: Chain::new(cid, addr, label.clone()).start(),
|
||||||
label: label.clone(),
|
label: label.clone(),
|
||||||
network_id: network.clone(),
|
network_id: None, // TODO: this doesn't seem to be used anywhere. Can it be removed?
|
||||||
nodes: 1,
|
nodes: 1,
|
||||||
|
max_nodes,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
self.labels.insert(label, cid);
|
self.labels.insert(label, cid);
|
||||||
|
|
||||||
if let Some(network) = network {
|
|
||||||
self.networks.insert(network.clone(), cid);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.broadcast();
|
self.broadcast();
|
||||||
|
|
||||||
cid
|
cid
|
||||||
@@ -184,15 +200,19 @@ impl Handler<AddNode> for Aggregator {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let AddNode { node, conn_id, rec } = msg;
|
let AddNode { node, conn_id, rec } = msg;
|
||||||
|
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
|
||||||
|
|
||||||
let cid = self.lazy_chain(&node.chain, &None, ctx);
|
let cid = self.lazy_chain(&node.chain, ctx);
|
||||||
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");
|
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");
|
||||||
|
if chain.nodes < chain.max_nodes {
|
||||||
chain.addr.do_send(chain::AddNode {
|
chain.addr.do_send(chain::AddNode {
|
||||||
node,
|
node,
|
||||||
conn_id,
|
conn_id,
|
||||||
rec,
|
rec,
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -352,3 +372,14 @@ impl Handler<GetHealth> for Aggregator {
|
|||||||
self.chains.len()
|
self.chains.len()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
|
||||||
|
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
|
||||||
|
/// no more.
|
||||||
|
fn max_nodes(label: &str) -> usize {
|
||||||
|
if FIRST_PARTY_NETWORKS.contains(label) {
|
||||||
|
usize::MAX
|
||||||
|
} else {
|
||||||
|
THIRD_PARTY_NETWORKS_MAX_NODES
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -79,10 +79,8 @@ impl Chain {
|
|||||||
|
|
||||||
if &*self.label.0 == label {
|
if &*self.label.0 == label {
|
||||||
self.label.1 += 1;
|
self.label.1 += 1;
|
||||||
} else {
|
} else if count > self.label.1 {
|
||||||
if count > self.label.1 {
|
self.rename(label.into(), count);
|
||||||
self.rename(label.into(), count);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,7 +170,7 @@ impl Chain {
|
|||||||
self.block_times.reset();
|
self.block_times.reset();
|
||||||
self.timestamp = timestamp;
|
self.timestamp = timestamp;
|
||||||
|
|
||||||
self.serializer.push(feed::BestBlock(self.best.height, timestamp.unwrap_or_else(|| now), None));
|
self.serializer.push(feed::BestBlock(self.best.height, timestamp.unwrap_or(now), None));
|
||||||
self.serializer.push(feed::BestFinalized(finalized.height, finalized.hash));
|
self.serializer.push(feed::BestFinalized(finalized.height, finalized.hash));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -253,12 +251,13 @@ impl Handler<AddNode> for Chain {
|
|||||||
|
|
||||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||||
let AddNode { node, conn_id, rec } = msg;
|
let AddNode { node, conn_id, rec } = msg;
|
||||||
|
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
|
||||||
self.increment_label_count(&node.chain);
|
self.increment_label_count(&node.chain);
|
||||||
|
|
||||||
let nid = self.nodes.add(Node::new(node));
|
let nid = self.nodes.add(Node::new(node));
|
||||||
let chain = ctx.address();
|
let chain = ctx.address();
|
||||||
|
|
||||||
if let Err(_) = rec.do_send(Initialize { nid, conn_id, chain }) {
|
if rec.do_send(Initialize { nid, conn_id, chain }).is_err() {
|
||||||
self.nodes.remove(nid);
|
self.nodes.remove(nid);
|
||||||
} else if let Some(node) = self.nodes.get(nid) {
|
} else if let Some(node) = self.nodes.get(nid) {
|
||||||
self.serializer.push(feed::AddedNode(nid, node));
|
self.serializer.push(feed::AddedNode(nid, node));
|
||||||
@@ -286,7 +285,7 @@ impl Chain {
|
|||||||
if block.height > self.best.height {
|
if block.height > self.best.height {
|
||||||
self.best = *block;
|
self.best = *block;
|
||||||
log::info!(
|
log::info!(
|
||||||
"[{}] [{}/{}] new best block ({}) {:?}",
|
"[{}] [nodes={}/feeds={}] new best block={}/{:?}",
|
||||||
self.label.0,
|
self.label.0,
|
||||||
nodes_len,
|
nodes_len,
|
||||||
self.feeds.len(),
|
self.feeds.len(),
|
||||||
@@ -455,7 +454,7 @@ impl Handler<Subscribe> for Chain {
|
|||||||
self.serializer.push(feed::TimeSync(now()));
|
self.serializer.push(feed::TimeSync(now()));
|
||||||
self.serializer.push(feed::BestBlock(
|
self.serializer.push(feed::BestBlock(
|
||||||
self.best.height,
|
self.best.height,
|
||||||
self.timestamp.unwrap_or_else(|| 0),
|
self.timestamp.unwrap_or(0),
|
||||||
self.average_block_time,
|
self.average_block_time,
|
||||||
));
|
));
|
||||||
self.serializer.push(feed::BestFinalized(self.finalized.height, self.finalized.hash));
|
self.serializer.push(feed::BestFinalized(self.finalized.height, self.finalized.hash));
|
||||||
|
|||||||
+1
-1
@@ -46,7 +46,7 @@ impl FeedMessageSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn finalize(&mut self) -> Option<Serialized> {
|
pub fn finalize(&mut self) -> Option<Serialized> {
|
||||||
if self.buffer.len() == 0 {
|
if self.buffer.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+9
-9
@@ -22,10 +22,10 @@ use node::connector::NodeConnector;
|
|||||||
use types::NodeId;
|
use types::NodeId;
|
||||||
use util::{Locator, LocatorFactory};
|
use util::{Locator, LocatorFactory};
|
||||||
|
|
||||||
const VERSION: &'static str = env!("CARGO_PKG_VERSION");
|
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
const AUTHORS: &'static str = env!("CARGO_PKG_AUTHORS");
|
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||||
const NAME: &'static str = "Substrate Telemetry Backend";
|
const NAME: &str = "Substrate Telemetry Backend";
|
||||||
const ABOUT: &'static str = "This is the Telemetry Backend that injects and provide the data sent by Substrate/Polkadot nodes";
|
const ABOUT: &str = "This is the Telemetry Backend that injects and provide the data sent by Substrate/Polkadot nodes";
|
||||||
|
|
||||||
#[derive(Clap, Debug)]
|
#[derive(Clap, Debug)]
|
||||||
#[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)]
|
#[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)]
|
||||||
@@ -49,7 +49,7 @@ struct Opts {
|
|||||||
required = false,
|
required = false,
|
||||||
long = "log",
|
long = "log",
|
||||||
default_value = "info",
|
default_value = "info",
|
||||||
about = "Log level. Defaults to 'info'. Valid values are: error, warn, info, debug and trace"
|
about = "Log level."
|
||||||
)]
|
)]
|
||||||
log_level: LogLevel,
|
log_level: LogLevel,
|
||||||
}
|
}
|
||||||
@@ -63,9 +63,9 @@ enum LogLevel {
|
|||||||
Trace,
|
Trace,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Into<log::LevelFilter> for &LogLevel {
|
impl From<&LogLevel> for log::LevelFilter {
|
||||||
fn into(self) -> log::LevelFilter {
|
fn from(log_level: &LogLevel) -> Self {
|
||||||
match self {
|
match log_level {
|
||||||
LogLevel::Error => log::LevelFilter::Error,
|
LogLevel::Error => log::LevelFilter::Error,
|
||||||
LogLevel::Warn => log::LevelFilter::Warn,
|
LogLevel::Warn => log::LevelFilter::Warn,
|
||||||
LogLevel::Info => log::LevelFilter::Info,
|
LogLevel::Info => log::LevelFilter::Info,
|
||||||
@@ -84,7 +84,7 @@ async fn node_route(
|
|||||||
locator: web::Data<Addr<Locator>>,
|
locator: web::Data<Addr<Locator>>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let ip = req.connection_info().realip_remote_addr().and_then(|mut addr| {
|
let ip = req.connection_info().realip_remote_addr().and_then(|mut addr| {
|
||||||
if let Some(port_idx) = addr.find(":") {
|
if let Some(port_idx) = addr.find(':') {
|
||||||
addr = &addr[..port_idx];
|
addr = &addr[..port_idx];
|
||||||
}
|
}
|
||||||
addr.parse::<Ipv4Addr>().ok()
|
addr.parse::<Ipv4Addr>().ok()
|
||||||
|
|||||||
+1
-4
@@ -89,10 +89,7 @@ impl Node {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn location(&self) -> Option<&NodeLocation> {
|
pub fn location(&self) -> Option<&NodeLocation> {
|
||||||
match self.location {
|
self.location.as_deref()
|
||||||
Some(ref location) => Some(&**location),
|
|
||||||
None => None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_location(&mut self, location: Arc<NodeLocation>) {
|
pub fn update_location(&mut self, location: Arc<NodeLocation>) {
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ impl NodeConnector {
|
|||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
self.aggregator.do_send(AddNode { rec, conn_id, node });
|
self.aggregator.do_send(AddNode { node, conn_id, rec });
|
||||||
} else {
|
} else {
|
||||||
if backlog.len() >= 10 {
|
if backlog.len() >= 10 {
|
||||||
backlog.remove(0);
|
backlog.remove(0);
|
||||||
@@ -165,7 +165,7 @@ impl Handler<Initialize> for NodeConnector {
|
|||||||
|
|
||||||
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
|
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
|
||||||
let Initialize { nid, conn_id, chain } = msg;
|
let Initialize { nid, conn_id, chain } = msg;
|
||||||
|
log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id);
|
||||||
let mx = self.multiplex.entry(conn_id).or_default();
|
let mx = self.multiplex.entry(conn_id).or_default();
|
||||||
|
|
||||||
if let ConnMultiplex::Waiting { backlog } = mx {
|
if let ConnMultiplex::Waiting { backlog } = mx {
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ impl LocatorFactory {
|
|||||||
// Default entry for localhost
|
// Default entry for localhost
|
||||||
cache.insert(
|
cache.insert(
|
||||||
Ipv4Addr::new(127, 0, 0, 1),
|
Ipv4Addr::new(127, 0, 0, 1),
|
||||||
Some(Arc::new(NodeLocation { latitude: 52.5166667, longitude: 13.4, city: "Berlin".into() })),
|
Some(Arc::new(NodeLocation { latitude: 52.516_6667, longitude: 13.4, city: "Berlin".into() })),
|
||||||
);
|
);
|
||||||
|
|
||||||
LocatorFactory {
|
LocatorFactory {
|
||||||
@@ -64,7 +64,7 @@ impl IPApiLocate {
|
|||||||
fn into_node_location(self) -> Option<NodeLocation> {
|
fn into_node_location(self) -> Option<NodeLocation> {
|
||||||
let IPApiLocate { city, loc } = self;
|
let IPApiLocate { city, loc } = self;
|
||||||
|
|
||||||
let mut loc = loc.split(",").map(|n| n.parse());
|
let mut loc = loc.split(',').map(|n| n.parse());
|
||||||
|
|
||||||
let latitude = loc.next()?.ok()?;
|
let latitude = loc.next()?.ok()?;
|
||||||
let longitude = loc.next()?.ok()?;
|
let longitude = loc.next()?.ok()?;
|
||||||
|
|||||||
Reference in New Issue
Block a user