Close websocket with a reason when muting

This commit is contained in:
David Palm
2021-03-26 23:06:52 +01:00
parent 8ed38deaa8
commit 9b42bf4af0
2 changed files with 17 additions and 8 deletions
+6 -3
View File
@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};
use actix::prelude::*;
use actix_web_actors::ws::{CloseReason, CloseCode};
use lazy_static::lazy_static;
use crate::node::connector::{Initialize, Mute};
@@ -42,7 +43,7 @@ lazy_static! {
};
}
/// Max number of nodes allowed to connect to the telemetry server.
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 2;
impl Aggregator {
pub fn new(denylist: HashSet<String>) -> Self {
@@ -200,7 +201,8 @@ impl Handler<AddNode> for Aggregator {
if self.denylist.contains(&*msg.node.chain) {
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
let AddNode { mute, .. } = msg;
let _ = mute.do_send(Mute {});
let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) };
let _ = mute.do_send(Mute { reason });
return;
}
let AddNode { node, conn_id, rec, mute } = msg;
@@ -216,7 +218,8 @@ impl Handler<AddNode> for Aggregator {
});
} else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
let _ = mute.do_send(Mute {});
let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) };
let _ = mute.do_send(Mute { reason });
}
}
}
+11 -5
View File
@@ -5,7 +5,7 @@ use std::mem;
use bytes::{Bytes, BytesMut};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_web_actors::ws::{self, CloseReason};
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode};
@@ -160,12 +160,17 @@ impl NodeConnector {
#[derive(Message)]
#[rtype(result = "()")]
pub struct Mute;
pub struct Mute {
pub reason: CloseReason,
}
impl Handler<Mute> for NodeConnector {
type Result = ();
fn handle(&mut self, _msg: Mute, ctx: &mut Self::Context) {
log::trace!(target: "NodeConnector::Mute", "Muting a node");
fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) {
let Mute { reason } = msg;
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description);
ctx.close(Some(reason));
ctx.stop();
}
}
@@ -216,7 +221,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
Ok(ws::Message::Pong(_)) => return,
Ok(ws::Message::Text(text)) => text.into_bytes(),
Ok(ws::Message::Binary(data)) => data,
Ok(ws::Message::Close(_)) => {
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
return;
}