No need for duplicate refs to the NodeConnector

Addr's version of `do_send` does not return `Result`
This commit is contained in:
David Palm
2021-03-27 22:36:03 +01:00
parent 47f5df8104
commit 8d2441b7fb
3 changed files with 14 additions and 17 deletions
+8 -10
View File
@@ -3,7 +3,7 @@ use actix::prelude::*;
use actix_web_actors::ws::{CloseReason, CloseCode};
use lazy_static::lazy_static;
use crate::node::connector::{Initialize, Mute};
use crate::node::connector::{Mute, NodeConnector};
use crate::feed::connector::{FeedConnector, Connected, FeedId};
use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer};
@@ -130,10 +130,8 @@ pub struct AddNode {
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Addr<NodeConnector>,
/// Recipient for the mute message
pub mute: Recipient<Mute>,
/// Address of the NodeConnector actor
pub node_connector: Addr<NodeConnector>,
}
/// Message sent from the Chain to the Aggregator when the Chain loses all nodes
@@ -200,12 +198,12 @@ impl Handler<AddNode> for Aggregator {
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
if self.denylist.contains(&*msg.node.chain) {
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
let AddNode { mute, .. } = msg;
let AddNode { node_connector, .. } = msg;
let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) };
let _ = mute.do_send(Mute { reason });
node_connector.do_send(Mute { reason });
return;
}
let AddNode { node, conn_id, rec } = msg;
let AddNode { node, conn_id, node_connector } = msg;
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
let cid = self.lazy_chain(&node.chain, ctx);
@@ -214,12 +212,12 @@ impl Handler<AddNode> for Aggregator {
chain.addr.do_send(chain::AddNode {
node,
conn_id,
rec,
node_connector,
});
} else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) };
let _ = rec.do_send(Mute { reason });
node_connector.do_send(Mute { reason });
}
}
}
+5 -5
View File
@@ -5,7 +5,7 @@ use bytes::Bytes;
use rustc_hash::FxHashMap;
use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount};
use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}};
use crate::node::{Node, connector::{Initialize, NodeConnector}, message::{NodeMessage, Payload}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now};
@@ -196,8 +196,8 @@ pub struct AddNode {
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
/// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages.
pub node_connector: Addr<NodeConnector>,
}
/// Message sent from the NodeConnector to the Chain when it receives new telemetry data
@@ -250,14 +250,14 @@ impl Handler<AddNode> for Chain {
type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, conn_id, rec } = msg;
let AddNode { node, conn_id, node_connector } = msg;
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
self.increment_label_count(&node.chain);
let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();
if rec.do_send(Initialize { nid, conn_id, chain }).is_err() {
if node_connector.try_send(Initialize { nid, conn_id, chain }).is_err() {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node));
+1 -2
View File
@@ -121,8 +121,7 @@ impl NodeConnector {
_ => ()
}
let rec = ctx.address();
self.aggregator.do_send(AddNode { node, conn_id, rec });
self.aggregator.do_send(AddNode { node, conn_id, node_connector: ctx.address() });
} else {
if backlog.len() >= 10 {
backlog.remove(0);