From f449dc66670db030903b46ec07293bb4e63f5096 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 26 Mar 2021 13:24:54 +0100 Subject: [PATCH 01/10] Add a `Mute` message Send a `Mute` message to `NodeConnector` when a node is from a chain on the denylist OR if the chain is overquota. (Also: dial down logging of finalized blocks a bit) --- backend/src/aggregator.rs | 13 +++++++++---- backend/src/chain.rs | 2 +- backend/src/node/connector.rs | 19 ++++++++++++++++--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 10b02a0..459118b 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use actix::prelude::*; use lazy_static::lazy_static; -use crate::node::connector::Initialize; +use crate::node::connector::{Initialize, Mute}; use crate::feed::connector::{FeedConnector, Connected, FeedId}; use crate::util::DenseMap; use crate::feed::{self, FeedMessageSerializer}; @@ -30,7 +30,7 @@ pub struct ChainEntry { } lazy_static! { - /// Labels of chains we consider "first party". These chains are allowed any + /// Labels of chains we consider "first party". These chains allow any /// number of nodes to connect. static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = { let mut set = HashSet::new(); @@ -131,6 +131,8 @@ pub struct AddNode { pub conn_id: ConnId, /// Recipient for the initialization message pub rec: Recipient, + /// Recipient for the mute message + pub mute: Recipient, } /// Message sent from the Chain to the Aggregator when the Chain loses all nodes @@ -196,10 +198,12 @@ impl Handler for Aggregator { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { if self.denylist.contains(&*msg.node.chain) { - log::debug!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); + log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); + let AddNode { mute, .. } = msg; + let _ = mute.do_send(Mute {}); return; } - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, rec, mute } = msg; log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain); let cid = self.lazy_chain(&node.chain, ctx); @@ -212,6 +216,7 @@ impl Handler for Aggregator { }); } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); + let _ = mute.do_send(Mute {}); } } } diff --git a/backend/src/chain.rs b/backend/src/chain.rs index 1f4ea4c..bcef302 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -284,7 +284,7 @@ impl Chain { if node.update_block(*block) { if block.height > self.best.height { self.best = *block; - log::info!( + log::debug!( "[{}] [nodes={}/feeds={}] new best block={}/{:?}", self.label.0, nodes_len, diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index c810b7b..3363751 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -109,8 +109,6 @@ impl NodeConnector { ConnMultiplex::Waiting { backlog } => { if let Payload::SystemConnected(connected) = msg.payload() { let mut node = connected.node.clone(); - let rec = ctx.address().recipient(); - // FIXME: Use genesis hash instead of names to avoid this mess match &*node.chain { "Kusama CC3" => node.chain = "Kusama".into(), @@ -123,7 +121,10 @@ impl NodeConnector { _ => () } - self.aggregator.do_send(AddNode { node, conn_id, rec }); + let rec = ctx.address().recipient(); + let mute = ctx.address().recipient(); + + self.aggregator.do_send(AddNode { node, conn_id, rec, mute }); } else { if backlog.len() >= 10 { backlog.remove(0); @@ -157,6 +158,18 @@ impl NodeConnector { } } +#[derive(Message)] +#[rtype(result = "()")] +pub struct Mute; + +impl Handler for NodeConnector { + type Result = (); + fn handle(&mut self, _msg: Mute, ctx: &mut Self::Context) { + log::trace!(target: "NodeConnector::Mute", "Muting a node"); + ctx.stop(); + } +} + #[derive(Message)] #[rtype(result = "()")] pub struct Initialize { From d39f6b561769f5b1d0564782fc394e283442f688 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 26 Mar 2021 21:59:36 +0100 Subject: [PATCH 02/10] Fix doc --- backend/src/node/connector.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index 3363751..dbc3fe2 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -24,7 +24,7 @@ const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024; pub struct NodeConnector { /// Multiplexing connections by id multiplex: BTreeMap, - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT), hb: Instant, /// Aggregator actor address aggregator: Addr, From 9b42bf4af00bac745c6a9514cd8d4e7d970e5a81 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 26 Mar 2021 23:06:52 +0100 Subject: [PATCH 03/10] Close websocket with a reason when muting --- backend/src/aggregator.rs | 9 ++++++--- backend/src/node/connector.rs | 16 +++++++++++----- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 459118b..05d9970 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; use actix::prelude::*; +use actix_web_actors::ws::{CloseReason, CloseCode}; use lazy_static::lazy_static; use crate::node::connector::{Initialize, Mute}; @@ -42,7 +43,7 @@ lazy_static! { }; } /// Max number of nodes allowed to connect to the telemetry server. -const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500; +const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 2; impl Aggregator { pub fn new(denylist: HashSet) -> Self { @@ -200,7 +201,8 @@ impl Handler for Aggregator { if self.denylist.contains(&*msg.node.chain) { log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); let AddNode { mute, .. } = msg; - let _ = mute.do_send(Mute {}); + let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) }; + let _ = mute.do_send(Mute { reason }); return; } let AddNode { node, conn_id, rec, mute } = msg; @@ -216,7 +218,8 @@ impl Handler for Aggregator { }); } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); - let _ = mute.do_send(Mute {}); + let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; + let _ = mute.do_send(Mute { reason }); } } } diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index dbc3fe2..eef8f2b 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -5,7 +5,7 @@ use std::mem; use bytes::{Bytes, BytesMut}; use actix::prelude::*; -use actix_web_actors::ws; +use actix_web_actors::ws::{self, CloseReason}; use actix_http::ws::Item; use crate::aggregator::{Aggregator, AddNode}; use crate::chain::{Chain, UpdateNode, RemoveNode}; @@ -160,12 +160,17 @@ impl NodeConnector { #[derive(Message)] #[rtype(result = "()")] -pub struct Mute; +pub struct Mute { + pub reason: CloseReason, +} impl Handler for NodeConnector { type Result = (); - fn handle(&mut self, _msg: Mute, ctx: &mut Self::Context) { - log::trace!(target: "NodeConnector::Mute", "Muting a node"); + fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) { + let Mute { reason } = msg; + log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description); + + ctx.close(Some(reason)); ctx.stop(); } } @@ -216,7 +221,8 @@ impl StreamHandler> for NodeConnector { Ok(ws::Message::Pong(_)) => return, Ok(ws::Message::Text(text)) => text.into_bytes(), Ok(ws::Message::Binary(data)) => data, - Ok(ws::Message::Close(_)) => { + Ok(ws::Message::Close(reason)) => { + ctx.close(reason); ctx.stop(); return; } From 3d6bdbcef6a3299afcc210f587ef88f3204c6e3a Mon Sep 17 00:00:00 2001 From: David Date: Sat, 27 Mar 2021 22:08:12 +0100 Subject: [PATCH 04/10] Update backend/src/aggregator.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- backend/src/aggregator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 05d9970..94c4df3 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -131,7 +131,7 @@ pub struct AddNode { /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, /// Recipient for the initialization message - pub rec: Recipient, + pub rec: Addr, /// Recipient for the mute message pub mute: Recipient, } From 5571cf85e5d2a5000e4547c83b8bcc73bcc7c84b Mon Sep 17 00:00:00 2001 From: David Date: Sat, 27 Mar 2021 22:08:25 +0100 Subject: [PATCH 05/10] Update backend/src/aggregator.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- backend/src/aggregator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 94c4df3..ad27b90 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -205,7 +205,7 @@ impl Handler for Aggregator { let _ = mute.do_send(Mute { reason }); return; } - let AddNode { node, conn_id, rec, mute } = msg; + let AddNode { node, conn_id, rec } = msg; log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain); let cid = self.lazy_chain(&node.chain, ctx); From 41a6060be70b09379691f2a0cc3d550352283b89 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 27 Mar 2021 22:08:32 +0100 Subject: [PATCH 06/10] Update backend/src/aggregator.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- backend/src/aggregator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index ad27b90..3d9c489 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -219,7 +219,7 @@ impl Handler for Aggregator { } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; - let _ = mute.do_send(Mute { reason }); + let _ = rec.do_send(Mute { reason }); } } } From 47f5df81047086fd813fbda43201931561b57efe Mon Sep 17 00:00:00 2001 From: David Date: Sat, 27 Mar 2021 22:08:43 +0100 Subject: [PATCH 07/10] Update backend/src/node/connector.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- backend/src/node/connector.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index eef8f2b..9a558da 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -121,10 +121,8 @@ impl NodeConnector { _ => () } - let rec = ctx.address().recipient(); - let mute = ctx.address().recipient(); - - self.aggregator.do_send(AddNode { node, conn_id, rec, mute }); + let rec = ctx.address(); + self.aggregator.do_send(AddNode { node, conn_id, rec }); } else { if backlog.len() >= 10 { backlog.remove(0); From 8d2441b7fb8d6824621f104207cfe87516198a99 Mon Sep 17 00:00:00 2001 From: David Palm Date: Sat, 27 Mar 2021 22:36:03 +0100 Subject: [PATCH 08/10] No need for duplicate refs to the NodeConnector Addr's version of `do_send` does not return `Result` --- backend/src/aggregator.rs | 18 ++++++++---------- backend/src/chain.rs | 10 +++++----- backend/src/node/connector.rs | 3 +-- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 3d9c489..253a2b8 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -3,7 +3,7 @@ use actix::prelude::*; use actix_web_actors::ws::{CloseReason, CloseCode}; use lazy_static::lazy_static; -use crate::node::connector::{Initialize, Mute}; +use crate::node::connector::{Mute, NodeConnector}; use crate::feed::connector::{FeedConnector, Connected, FeedId}; use crate::util::DenseMap; use crate::feed::{self, FeedMessageSerializer}; @@ -130,10 +130,8 @@ pub struct AddNode { pub node: NodeDetails, /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, - /// Recipient for the initialization message - pub rec: Addr, - /// Recipient for the mute message - pub mute: Recipient, + /// Address of the NodeConnector actor + pub node_connector: Addr, } /// Message sent from the Chain to the Aggregator when the Chain loses all nodes @@ -200,12 +198,12 @@ impl Handler for Aggregator { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { if self.denylist.contains(&*msg.node.chain) { log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); - let AddNode { mute, .. } = msg; + let AddNode { node_connector, .. } = msg; let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) }; - let _ = mute.do_send(Mute { reason }); + node_connector.do_send(Mute { reason }); return; } - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, node_connector } = msg; log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain); let cid = self.lazy_chain(&node.chain, ctx); @@ -214,12 +212,12 @@ impl Handler for Aggregator { chain.addr.do_send(chain::AddNode { node, conn_id, - rec, + node_connector, }); } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; - let _ = rec.do_send(Mute { reason }); + node_connector.do_send(Mute { reason }); } } } diff --git a/backend/src/chain.rs b/backend/src/chain.rs index bcef302..1db3e11 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use rustc_hash::FxHashMap; use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount}; -use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}}; +use crate::node::{Node, connector::{Initialize, NodeConnector}, message::{NodeMessage, Payload}}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::{self, FeedMessageSerializer}; use crate::util::{DenseMap, NumStats, now}; @@ -196,8 +196,8 @@ pub struct AddNode { pub node: NodeDetails, /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, - /// Recipient for the initialization message - pub rec: Recipient, + /// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages. + pub node_connector: Addr, } /// Message sent from the NodeConnector to the Chain when it receives new telemetry data @@ -250,14 +250,14 @@ impl Handler for Chain { type Result = (); fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, node_connector } = msg; log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1); self.increment_label_count(&node.chain); let nid = self.nodes.add(Node::new(node)); let chain = ctx.address(); - if rec.do_send(Initialize { nid, conn_id, chain }).is_err() { + if node_connector.try_send(Initialize { nid, conn_id, chain }).is_err() { self.nodes.remove(nid); } else if let Some(node) = self.nodes.get(nid) { self.serializer.push(feed::AddedNode(nid, node)); diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index 9a558da..d1aa5fd 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -121,8 +121,7 @@ impl NodeConnector { _ => () } - let rec = ctx.address(); - self.aggregator.do_send(AddNode { node, conn_id, rec }); + self.aggregator.do_send(AddNode { node, conn_id, node_connector: ctx.address() }); } else { if backlog.len() >= 10 { backlog.remove(0); From bbfe118965cdf4b8d1f4b6a7d217799c2e7ed75f Mon Sep 17 00:00:00 2001 From: David Palm Date: Sat, 27 Mar 2021 23:00:36 +0100 Subject: [PATCH 09/10] Use ctor instead of lazy_static --- backend/Cargo.lock | 20 +++++++++++++++----- backend/Cargo.toml | 2 +- backend/src/aggregator.rs | 28 ++++++++++++++-------------- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 8e11065..5a459e1 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -588,6 +588,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "ctor" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "derive_more" version = "0.99.11" @@ -1303,9 +1313,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" dependencies = [ "proc-macro2", ] @@ -1726,9 +1736,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.48" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac" +checksum = "f3a1d708c221c5a612956ef9f75b37e454e88d1f7b899fbd3a18d4252012d663" dependencies = [ "proc-macro2", "quote", @@ -1752,8 +1762,8 @@ dependencies = [ "bytes", "chrono", "clap", + "ctor", "fnv", - "lazy_static", "log", "num-traits", "parking_lot", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 70f5030..be0dcb6 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -23,7 +23,7 @@ parking_lot = "0.11" reqwest = { version = "0.11.1", features = ["blocking", "json"] } rustc-hash = "1.1.0" clap = "3.0.0-beta.2" -lazy_static = "1" +ctor = "0.1.20" [profile.release] lto = true diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 253a2b8..205899d 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use actix::prelude::*; use actix_web_actors::ws::{CloseReason, CloseCode}; -use lazy_static::lazy_static; +use ctor::ctor; use crate::node::connector::{Mute, NodeConnector}; use crate::feed::connector::{FeedConnector, Connected, FeedId}; @@ -30,20 +30,20 @@ pub struct ChainEntry { max_nodes: usize, } -lazy_static! { - /// Labels of chains we consider "first party". These chains allow any - /// number of nodes to connect. - static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = { - let mut set = HashSet::new(); - set.insert("Polkadot"); - set.insert("Kusama"); - set.insert("Westend"); - set.insert("Rococo"); - set - }; -} +#[ctor] +/// Labels of chains we consider "first party". These chains allow any +/// number of nodes to connect. +static FIRST_PARTY_NETWORKS: HashSet<&'static str> = { + let mut set = HashSet::new(); + set.insert("Polkadot"); + set.insert("Kusama"); + set.insert("Westend"); + set.insert("Rococo"); + set +}; + /// Max number of nodes allowed to connect to the telemetry server. -const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 2; +const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500; impl Aggregator { pub fn new(denylist: HashSet) -> Self { From f328b5f5d3f2c61a0621d25b9b59157fe38a07a7 Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 29 Mar 2021 22:13:14 +0200 Subject: [PATCH 10/10] =?UTF-8?q?No=20use=20denying=20"Earth"=20=E2=80=93?= =?UTF-8?q?=20they=20renamed=20Send=20Close=20frame=20with=20reason=20when?= =?UTF-8?q?=20the=20remote=20missed=20a=20heartbeat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/src/node/connector.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index d1aa5fd..fc71a19 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -90,6 +90,7 @@ impl NodeConnector { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // stop actor + ctx.close(Some(CloseReason { code: ws::CloseCode::Abnormal, description: Some("Missed heartbeat".into())})); ctx.stop(); } }); @@ -113,11 +114,6 @@ impl NodeConnector { match &*node.chain { "Kusama CC3" => node.chain = "Kusama".into(), "Polkadot CC1" => node.chain = "Polkadot".into(), - "Earth" => { - // Temp, there is too many of them - ctx.stop(); - return; - }, _ => () }