Merge pull request #322 from paritytech/dp-mute-node-connector-when-denied-or-overquota

Mute denied nodes
This commit is contained in:
David
2021-03-29 22:15:04 +02:00
committed by GitHub
5 changed files with 70 additions and 42 deletions
+15 -5
View File
@@ -539,6 +539,16 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "ctor"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "derive_more"
version = "0.99.11"
@@ -1232,9 +1242,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.7"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7"
dependencies = [
"proc-macro2",
]
@@ -1655,9 +1665,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.48"
version = "1.0.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac"
checksum = "f3a1d708c221c5a612956ef9f75b37e454e88d1f7b899fbd3a18d4252012d663"
dependencies = [
"proc-macro2",
"quote",
@@ -1681,8 +1691,8 @@ dependencies = [
"bytes",
"chrono",
"clap",
"ctor",
"fnv",
"lazy_static",
"log",
"num-traits",
"parking_lot",
+1 -1
View File
@@ -23,7 +23,7 @@ parking_lot = "0.11"
reqwest = { version = "0.11.1", features = ["blocking", "json"] }
rustc-hash = "1.1.0"
clap = "3.0.0-beta.2"
lazy_static = "1"
ctor = "0.1.20"
[profile.release]
lto = true
+25 -19
View File
@@ -1,8 +1,9 @@
use std::collections::{HashMap, HashSet};
use actix::prelude::*;
use lazy_static::lazy_static;
use actix_web_actors::ws::{CloseReason, CloseCode};
use ctor::ctor;
use crate::node::connector::Initialize;
use crate::node::connector::{Mute, NodeConnector};
use crate::feed::connector::{FeedConnector, Connected, FeedId};
use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer};
@@ -29,18 +30,18 @@ pub struct ChainEntry {
max_nodes: usize,
}
lazy_static! {
/// Labels of chains we consider "first party". These chains are allowed any
/// number of nodes to connect.
static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
let mut set = HashSet::new();
set.insert("Polkadot");
set.insert("Kusama");
set.insert("Westend");
set.insert("Rococo");
set
};
}
#[ctor]
/// Labels of chains we consider "first party". These chains allow any
/// number of nodes to connect.
static FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
let mut set = HashSet::new();
set.insert("Polkadot");
set.insert("Kusama");
set.insert("Westend");
set.insert("Rococo");
set
};
/// Max number of nodes allowed to connect to the telemetry server.
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
@@ -129,8 +130,8 @@ pub struct AddNode {
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
/// Address of the NodeConnector actor
pub node_connector: Addr<NodeConnector>,
}
/// Message sent from the Chain to the Aggregator when the Chain loses all nodes
@@ -196,10 +197,13 @@ impl Handler<AddNode> for Aggregator {
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
if self.denylist.contains(&*msg.node.chain) {
log::debug!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
let AddNode { node_connector, .. } = msg;
let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) };
node_connector.do_send(Mute { reason });
return;
}
let AddNode { node, conn_id, rec } = msg;
let AddNode { node, conn_id, node_connector } = msg;
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
let cid = self.lazy_chain(&node.chain, ctx);
@@ -208,10 +212,12 @@ impl Handler<AddNode> for Aggregator {
chain.addr.do_send(chain::AddNode {
node,
conn_id,
rec,
node_connector,
});
} else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) };
node_connector.do_send(Mute { reason });
}
}
}
+6 -6
View File
@@ -5,7 +5,7 @@ use bytes::Bytes;
use rustc_hash::FxHashMap;
use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount};
use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}};
use crate::node::{Node, connector::{Initialize, NodeConnector}, message::{NodeMessage, Payload}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now};
@@ -196,8 +196,8 @@ pub struct AddNode {
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
/// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages.
pub node_connector: Addr<NodeConnector>,
}
/// Message sent from the NodeConnector to the Chain when it receives new telemetry data
@@ -250,14 +250,14 @@ impl Handler<AddNode> for Chain {
type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, conn_id, rec } = msg;
let AddNode { node, conn_id, node_connector } = msg;
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
self.increment_label_count(&node.chain);
let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();
if rec.do_send(Initialize { nid, conn_id, chain }).is_err() {
if node_connector.try_send(Initialize { nid, conn_id, chain }).is_err() {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node));
@@ -284,7 +284,7 @@ impl Chain {
if node.update_block(*block) {
if block.height > self.best.height {
self.best = *block;
log::info!(
log::debug!(
"[{}] [nodes={}/feeds={}] new best block={}/{:?}",
self.label.0,
nodes_len,
+23 -11
View File
@@ -5,7 +5,7 @@ use std::mem;
use bytes::{Bytes, BytesMut};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_web_actors::ws::{self, CloseReason};
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode};
@@ -24,7 +24,7 @@ const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
pub struct NodeConnector {
/// Multiplexing connections by id
multiplex: BTreeMap<ConnId, ConnMultiplex>,
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT),
hb: Instant,
/// Aggregator actor address
aggregator: Addr<Aggregator>,
@@ -90,6 +90,7 @@ impl NodeConnector {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// stop actor
ctx.close(Some(CloseReason { code: ws::CloseCode::Abnormal, description: Some("Missed heartbeat".into())}));
ctx.stop();
}
});
@@ -109,21 +110,14 @@ impl NodeConnector {
ConnMultiplex::Waiting { backlog } => {
if let Payload::SystemConnected(connected) = msg.payload() {
let mut node = connected.node.clone();
let rec = ctx.address().recipient();
// FIXME: Use genesis hash instead of names to avoid this mess
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
"Polkadot CC1" => node.chain = "Polkadot".into(),
"Earth" => {
// Temp, there is too many of them
ctx.stop();
return;
},
_ => ()
}
self.aggregator.do_send(AddNode { node, conn_id, rec });
self.aggregator.do_send(AddNode { node, conn_id, node_connector: ctx.address() });
} else {
if backlog.len() >= 10 {
backlog.remove(0);
@@ -157,6 +151,23 @@ impl NodeConnector {
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Mute {
pub reason: CloseReason,
}
impl Handler<Mute> for NodeConnector {
type Result = ();
fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) {
let Mute { reason } = msg;
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description);
ctx.close(Some(reason));
ctx.stop();
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Initialize {
@@ -203,7 +214,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
Ok(ws::Message::Pong(_)) => return,
Ok(ws::Message::Text(text)) => text.into_bytes(),
Ok(ws::Message::Binary(data)) => data,
Ok(ws::Message::Close(_)) => {
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
return;
}