diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 459118b..05d9970 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; use actix::prelude::*; +use actix_web_actors::ws::{CloseReason, CloseCode}; use lazy_static::lazy_static; use crate::node::connector::{Initialize, Mute}; @@ -42,7 +43,7 @@ lazy_static! { }; } /// Max number of nodes allowed to connect to the telemetry server. -const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500; +const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 2; impl Aggregator { pub fn new(denylist: HashSet) -> Self { @@ -200,7 +201,8 @@ impl Handler for Aggregator { if self.denylist.contains(&*msg.node.chain) { log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); let AddNode { mute, .. } = msg; - let _ = mute.do_send(Mute {}); + let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) }; + let _ = mute.do_send(Mute { reason }); return; } let AddNode { node, conn_id, rec, mute } = msg; @@ -216,7 +218,8 @@ impl Handler for Aggregator { }); } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); - let _ = mute.do_send(Mute {}); + let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; + let _ = mute.do_send(Mute { reason }); } } } diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index dbc3fe2..eef8f2b 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -5,7 +5,7 @@ use std::mem; use bytes::{Bytes, BytesMut}; use actix::prelude::*; -use actix_web_actors::ws; +use actix_web_actors::ws::{self, CloseReason}; use actix_http::ws::Item; use crate::aggregator::{Aggregator, AddNode}; use crate::chain::{Chain, UpdateNode, RemoveNode}; @@ -160,12 +160,17 @@ impl NodeConnector { #[derive(Message)] #[rtype(result = "()")] -pub struct Mute; +pub struct Mute { + pub reason: CloseReason, +} impl Handler for NodeConnector { type Result = (); - fn handle(&mut self, _msg: Mute, ctx: &mut Self::Context) { - log::trace!(target: "NodeConnector::Mute", "Muting a node"); + fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) { + let Mute { reason } = msg; + log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description); + + ctx.close(Some(reason)); ctx.stop(); } } @@ -216,7 +221,8 @@ impl StreamHandler> for NodeConnector { Ok(ws::Message::Pong(_)) => return, Ok(ws::Message::Text(text)) => text.into_bytes(), Ok(ws::Message::Binary(data)) => data, - Ok(ws::Message::Close(_)) => { + Ok(ws::Message::Close(reason)) => { + ctx.close(reason); ctx.stop(); return; }