From 8d2441b7fb8d6824621f104207cfe87516198a99 Mon Sep 17 00:00:00 2001 From: David Palm Date: Sat, 27 Mar 2021 22:36:03 +0100 Subject: [PATCH] No need for duplicate refs to the NodeConnector Addr's version of `do_send` does not return `Result` --- backend/src/aggregator.rs | 18 ++++++++---------- backend/src/chain.rs | 10 +++++----- backend/src/node/connector.rs | 3 +-- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 3d9c489..253a2b8 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -3,7 +3,7 @@ use actix::prelude::*; use actix_web_actors::ws::{CloseReason, CloseCode}; use lazy_static::lazy_static; -use crate::node::connector::{Initialize, Mute}; +use crate::node::connector::{Mute, NodeConnector}; use crate::feed::connector::{FeedConnector, Connected, FeedId}; use crate::util::DenseMap; use crate::feed::{self, FeedMessageSerializer}; @@ -130,10 +130,8 @@ pub struct AddNode { pub node: NodeDetails, /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, - /// Recipient for the initialization message - pub rec: Addr, - /// Recipient for the mute message - pub mute: Recipient, + /// Address of the NodeConnector actor + pub node_connector: Addr, } /// Message sent from the Chain to the Aggregator when the Chain loses all nodes @@ -200,12 +198,12 @@ impl Handler for Aggregator { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { if self.denylist.contains(&*msg.node.chain) { log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); - let AddNode { mute, .. } = msg; + let AddNode { node_connector, .. } = msg; let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) }; - let _ = mute.do_send(Mute { reason }); + node_connector.do_send(Mute { reason }); return; } - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, node_connector } = msg; log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain); let cid = self.lazy_chain(&node.chain, ctx); @@ -214,12 +212,12 @@ impl Handler for Aggregator { chain.addr.do_send(chain::AddNode { node, conn_id, - rec, + node_connector, }); } else { log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; - let _ = rec.do_send(Mute { reason }); + node_connector.do_send(Mute { reason }); } } } diff --git a/backend/src/chain.rs b/backend/src/chain.rs index bcef302..1db3e11 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use rustc_hash::FxHashMap; use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount}; -use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}}; +use crate::node::{Node, connector::{Initialize, NodeConnector}, message::{NodeMessage, Payload}}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::{self, FeedMessageSerializer}; use crate::util::{DenseMap, NumStats, now}; @@ -196,8 +196,8 @@ pub struct AddNode { pub node: NodeDetails, /// Connection id used by the node connector for multiplexing parachains pub conn_id: ConnId, - /// Recipient for the initialization message - pub rec: Recipient, + /// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages. + pub node_connector: Addr, } /// Message sent from the NodeConnector to the Chain when it receives new telemetry data @@ -250,14 +250,14 @@ impl Handler for Chain { type Result = (); fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { - let AddNode { node, conn_id, rec } = msg; + let AddNode { node, conn_id, node_connector } = msg; log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1); self.increment_label_count(&node.chain); let nid = self.nodes.add(Node::new(node)); let chain = ctx.address(); - if rec.do_send(Initialize { nid, conn_id, chain }).is_err() { + if node_connector.try_send(Initialize { nid, conn_id, chain }).is_err() { self.nodes.remove(nid); } else if let Some(node) = self.nodes.get(nid) { self.serializer.push(feed::AddedNode(nid, node)); diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index 9a558da..d1aa5fd 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -121,8 +121,7 @@ impl NodeConnector { _ => () } - let rec = ctx.address(); - self.aggregator.do_send(AddNode { node, conn_id, rec }); + self.aggregator.do_send(AddNode { node, conn_id, node_connector: ctx.address() }); } else { if backlog.len() >= 10 { backlog.remove(0);