diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 11663d4..811aa41 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -539,6 +539,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "ctor" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "derive_more" version = "0.99.11" @@ -1232,9 +1242,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" dependencies = [ "proc-macro2", ] @@ -1655,9 +1665,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.48" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac" +checksum = "f3a1d708c221c5a612956ef9f75b37e454e88d1f7b899fbd3a18d4252012d663" dependencies = [ "proc-macro2", "quote", @@ -1681,8 +1691,8 @@ dependencies = [ "bytes", "chrono", "clap", + "ctor", "fnv", - "lazy_static", "log", "num-traits", "parking_lot", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 681e40c..f5e05d3 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -23,7 +23,7 @@ parking_lot = "0.11" reqwest = { version = "0.11.1", features = ["blocking", "json"] } rustc-hash = "1.1.0" clap = "3.0.0-beta.2" -lazy_static = "1" +ctor = "0.1.20" [profile.release] lto = true diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 10b02a0..205899d 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -1,8 +1,9 @@ use std::collections::{HashMap, HashSet}; use actix::prelude::*; -use lazy_static::lazy_static; +use actix_web_actors::ws::{CloseReason, CloseCode}; +use ctor::ctor; -use crate::node::connector::Initialize; +use crate::node::connector::{Mute, NodeConnector}; use crate::feed::connector::{FeedConnector, Connected, FeedId}; use crate::util::DenseMap; use crate::feed::{self, FeedMessageSerializer}; @@ -29,18 +30,18 @@ pub struct ChainEntry { max_nodes: usize, } -lazy_static! { - /// Labels of chains we consider "first party". These chains are allowed any - /// number of nodes to connect. - static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = { - let mut set = HashSet::new(); - set.insert("Polkadot"); - set.insert("Kusama"); - set.insert("Westend"); - set.insert("Rococo"); - set - }; -} +#[ctor] +/// Labels of chains we consider "first party". These chains allow any +/// number of nodes to connect. +static FIRST_PARTY_NETWORKS: HashSet<&'static str> = { + let mut set = HashSet::new(); + set.insert("Polkadot"); + set.insert("Kusama"); + set.insert("Westend"); + set.insert("Rococo"); + set +}; + /// Max number of nodes allowed to connect to the telemetry server. const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500; @@ -129,8 +130,8 @@ pub struct AddNode { pub node: NodeDetails, /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, - /// Recipient for the initialization message - pub rec: Recipient, + /// Address of the NodeConnector actor + pub node_connector: Addr, } /// Message sent from the Chain to the Aggregator when the Chain loses all nodes @@ -196,10 +197,13 @@ impl Handler for Aggregator { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { if self.denylist.contains(&*msg.node.chain) { - log::debug!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); + log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); + let AddNode { node_connector, .. } = msg; + let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) }; + node_connector.do_send(Mute { reason }); return; } - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, node_connector } = msg; log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain); let cid = self.lazy_chain(&node.chain, ctx); @@ -208,10 +212,12 @@ impl Handler for Aggregator { chain.addr.do_send(chain::AddNode { node, conn_id, - rec, + node_connector, }); } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); + let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; + node_connector.do_send(Mute { reason }); } } } diff --git a/backend/src/chain.rs b/backend/src/chain.rs index 1f4ea4c..1db3e11 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use rustc_hash::FxHashMap; use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount}; -use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}}; +use crate::node::{Node, connector::{Initialize, NodeConnector}, message::{NodeMessage, Payload}}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::{self, FeedMessageSerializer}; use crate::util::{DenseMap, NumStats, now}; @@ -196,8 +196,8 @@ pub struct AddNode { pub node: NodeDetails, /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, - /// Recipient for the initialization message - pub rec: Recipient, + /// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages. + pub node_connector: Addr, } /// Message sent from the NodeConnector to the Chain when it receives new telemetry data @@ -250,14 +250,14 @@ impl Handler for Chain { type Result = (); fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, node_connector } = msg; log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1); self.increment_label_count(&node.chain); let nid = self.nodes.add(Node::new(node)); let chain = ctx.address(); - if rec.do_send(Initialize { nid, conn_id, chain }).is_err() { + if node_connector.try_send(Initialize { nid, conn_id, chain }).is_err() { self.nodes.remove(nid); } else if let Some(node) = self.nodes.get(nid) { self.serializer.push(feed::AddedNode(nid, node)); @@ -284,7 +284,7 @@ impl Chain { if node.update_block(*block) { if block.height > self.best.height { self.best = *block; - log::info!( + log::debug!( "[{}] [nodes={}/feeds={}] new best block={}/{:?}", self.label.0, nodes_len, diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index c810b7b..fc71a19 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -5,7 +5,7 @@ use std::mem; use bytes::{Bytes, BytesMut}; use actix::prelude::*; -use actix_web_actors::ws; +use actix_web_actors::ws::{self, CloseReason}; use actix_http::ws::Item; use crate::aggregator::{Aggregator, AddNode}; use crate::chain::{Chain, UpdateNode, RemoveNode}; @@ -24,7 +24,7 @@ const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024; pub struct NodeConnector { /// Multiplexing connections by id multiplex: BTreeMap, - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT), hb: Instant, /// Aggregator actor address aggregator: Addr, @@ -90,6 +90,7 @@ impl NodeConnector { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // stop actor + ctx.close(Some(CloseReason { code: ws::CloseCode::Abnormal, description: Some("Missed heartbeat".into())})); ctx.stop(); } }); @@ -109,21 +110,14 @@ impl NodeConnector { ConnMultiplex::Waiting { backlog } => { if let Payload::SystemConnected(connected) = msg.payload() { let mut node = connected.node.clone(); - let rec = ctx.address().recipient(); - // FIXME: Use genesis hash instead of names to avoid this mess match &*node.chain { "Kusama CC3" => node.chain = "Kusama".into(), "Polkadot CC1" => node.chain = "Polkadot".into(), - "Earth" => { - // Temp, there is too many of them - ctx.stop(); - return; - }, _ => () } - self.aggregator.do_send(AddNode { node, conn_id, rec }); + self.aggregator.do_send(AddNode { node, conn_id, node_connector: ctx.address() }); } else { if backlog.len() >= 10 { backlog.remove(0); @@ -157,6 +151,23 @@ impl NodeConnector { } } +#[derive(Message)] +#[rtype(result = "()")] +pub struct Mute { + pub reason: CloseReason, +} + +impl Handler for NodeConnector { + type Result = (); + fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) { + let Mute { reason } = msg; + log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description); + + ctx.close(Some(reason)); + ctx.stop(); + } +} + #[derive(Message)] #[rtype(result = "()")] pub struct Initialize { @@ -203,7 +214,8 @@ impl StreamHandler> for NodeConnector { Ok(ws::Message::Pong(_)) => return, Ok(ws::Message::Text(text)) => text.into_bytes(), Ok(ws::Message::Binary(data)) => data, - Ok(ws::Message::Close(_)) => { + Ok(ws::Message::Close(reason)) => { + ctx.close(reason); ctx.stop(); return; }