diff --git a/backend/core/src/aggregator.rs b/backend/core/src/aggregator.rs index 8dd3d98..0c0b693 100644 --- a/backend/core/src/aggregator.rs +++ b/backend/core/src/aggregator.rs @@ -6,7 +6,6 @@ use crate::shard::connector::ShardConnector; use crate::chain::{self, Chain, ChainId, Label}; use crate::feed::connector::{Connected, FeedConnector, FeedId}; use crate::feed::{self, FeedMessageSerializer}; -use crate::node::connector::NodeConnector; use common::ws::MuteReason; use common::shard::ShardConnId; use common::types::{ConnId, NodeDetails, BlockHash}; @@ -131,13 +130,6 @@ pub struct AddNode { } pub enum NodeSource { - Direct { - /// Connection id used by the node connector for multiplexing parachains - conn_id: ConnId, - /// Address of the NodeConnector actor - node_connector: Addr, - }, - // TODO Shard { /// `ShardConnId` that identifies the node connection within a shard. sid: ShardConnId, @@ -202,9 +194,6 @@ pub struct GetHealth; impl NodeSource { pub fn mute(&self, reason: MuteReason) { match self { - NodeSource::Direct { node_connector, .. } => { - node_connector.do_send(reason); - }, // TODO NodeSource::Shard { shard_connector, .. } => { // shard_connector.do_send(Mute { reason }); diff --git a/backend/core/src/chain.rs b/backend/core/src/chain.rs index 6a0c4d7..c27393a 100644 --- a/backend/core/src/chain.rs +++ b/backend/core/src/chain.rs @@ -247,15 +247,6 @@ pub struct LocateNode { impl NodeSource { pub fn init(self, nid: NodeId, chain: Addr) -> bool { match self { - NodeSource::Direct { conn_id, node_connector } => { - node_connector - .try_send(crate::node::connector::Initialize { - nid, - conn_id, - chain, - }) - .is_ok() - }, NodeSource::Shard { sid, shard_connector } => { shard_connector .try_send(crate::shard::connector::Initialize { diff --git a/backend/core/src/feed/connector.rs b/backend/core/src/feed/connector.rs index 16e3a8e..8d7f984 100644 --- a/backend/core/src/feed/connector.rs +++ b/backend/core/src/feed/connector.rs @@ -78,6 +78,8 @@ impl FeedConnector { fn handle_cmd(&mut self, cmd: &str, payload: &str, ctx: &mut ::Context) { match cmd { "subscribe" => { + // Hash the chain label the frontend wants to subscribe to. + // If it's already subscribed to the same chain, nothing to do. match fnv(payload) { hash if hash == self.chain_label_hash => return, hash => self.chain_label_hash = hash, diff --git a/backend/core/src/feed.rs b/backend/core/src/feed/mod.rs similarity index 100% rename from backend/core/src/feed.rs rename to backend/core/src/feed/mod.rs diff --git a/backend/core/src/main.rs b/backend/core/src/main.rs index 958af9d..a0fb567 100644 --- a/backend/core/src/main.rs +++ b/backend/core/src/main.rs @@ -104,8 +104,10 @@ async fn feed_route( stream: web::Payload, aggregator: web::Data>, ) -> Result { + let aggregator = aggregator.get_ref().clone(); + ws::start( - FeedConnector::new(aggregator.get_ref().clone()), + FeedConnector::new(aggregator), &req, stream, ) diff --git a/backend/core/src/node.rs b/backend/core/src/node.rs index 666049e..613b5ab 100644 --- a/backend/core/src/node.rs +++ b/backend/core/src/node.rs @@ -7,8 +7,6 @@ use common::types::{ use common::util::now; use common::node::SystemInterval; -pub mod connector; - /// Minimum time between block below broadcasting updates to the browser gets throttled, in ms. const THROTTLE_THRESHOLD: u64 = 100; /// Minimum time of intervals for block updates sent to the browser when throttled, in ms. diff --git a/backend/core/src/node/connector.rs b/backend/core/src/node/connector.rs deleted file mode 100644 index 7015fcd..0000000 --- a/backend/core/src/node/connector.rs +++ /dev/null @@ -1,229 +0,0 @@ -use std::collections::BTreeMap; -use std::net::Ipv4Addr; -use std::time::{Duration, Instant}; - -use crate::aggregator::{AddNode, Aggregator, NodeSource}; -use crate::chain::{Chain, RemoveNode, UpdateNode}; -use crate::location::LocateRequest; -use crate::node::NodeId; -use actix::prelude::*; -use actix_web_actors::ws::{self, CloseReason}; -use bytes::Bytes; -use common::types::ConnId; -use common::ws::{MultipartHandler, WsMessage, MuteReason}; -use common::node::{NodeMessage, Payload}; - -/// How often heartbeat pings are sent -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); -/// How long before lack of client response causes a timeout -const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); - -pub struct NodeConnector { - /// Multiplexing connections by id - multiplex: BTreeMap, - /// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT), - hb: Instant, - /// Aggregator actor address - aggregator: Addr, - /// IP address of the node this connector is responsible for - ip: Option, - /// Actix address of location services - locator: Recipient, - /// Helper for handling continuation messages - multipart: MultipartHandler, -} - -enum ConnMultiplex { - Connected { - /// Id of the node this multiplex connector is responsible for handling - nid: NodeId, - /// Chain address to which this multiplex connector is delegating messages - chain: Addr, - }, - Waiting { - /// Backlog of messages to be sent once we get a recipient handle to the chain - backlog: Vec, - }, -} - -impl Default for ConnMultiplex { - fn default() -> Self { - ConnMultiplex::Waiting { - backlog: Vec::new(), - } - } -} - -impl Actor for NodeConnector { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.heartbeat(ctx); - } - - fn stopped(&mut self, _: &mut Self::Context) { - for mx in self.multiplex.values() { - if let ConnMultiplex::Connected { chain, nid } = mx { - chain.do_send(RemoveNode(*nid)); - } - } - } -} - -impl NodeConnector { - pub fn new( - aggregator: Addr, - locator: Recipient, - ip: Option, - ) -> Self { - Self { - multiplex: BTreeMap::new(), - hb: Instant::now(), - aggregator, - ip, - locator, - multipart: MultipartHandler::default(), - } - } - - fn heartbeat(&self, ctx: &mut ::Context) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // stop actor - ctx.close(Some(CloseReason { - code: ws::CloseCode::Abnormal, - description: Some("Missed heartbeat".into()), - })); - ctx.stop(); - } - }); - } - - fn handle_message( - &mut self, - msg: NodeMessage, - ctx: &mut ::Context, - ) { - let conn_id = msg.id(); - let payload = msg.into(); - - match self.multiplex.entry(conn_id).or_default() { - ConnMultiplex::Connected { nid, chain } => { - chain.do_send(UpdateNode { - nid: *nid, - payload, - }); - } - ConnMultiplex::Waiting { backlog } => { - if let Payload::SystemConnected(connected) = payload { - self.aggregator.do_send(AddNode { - node: connected.node, - genesis_hash: connected.genesis_hash, - source: NodeSource::Direct { - conn_id, - node_connector: ctx.address(), - }, - }); - } else { - if backlog.len() >= 10 { - backlog.remove(0); - } - - backlog.push(payload); - } - } - } - } -} - -impl Handler for NodeConnector { - type Result = (); - fn handle(&mut self, msg: MuteReason, ctx: &mut Self::Context) { - log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", msg); - - ctx.close(Some(msg.into())); - ctx.stop(); - } -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct Initialize { - pub nid: NodeId, - pub conn_id: ConnId, - pub chain: Addr, -} - -impl Handler for NodeConnector { - type Result = (); - - fn handle(&mut self, msg: Initialize, _: &mut Self::Context) { - let Initialize { - nid, - conn_id, - chain, - } = msg; - log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id); - let mx = self.multiplex.entry(conn_id).or_default(); - - if let ConnMultiplex::Waiting { backlog } = mx { - for payload in backlog.drain(..) { - chain.do_send(UpdateNode { - nid, - payload, - }); - } - - *mx = ConnMultiplex::Connected { - nid, - chain: chain.clone(), - }; - }; - - // Acquire the node's physical location - if let Some(ip) = self.ip { - let _ = self.locator.do_send(LocateRequest { ip, nid, chain }); - } - } -} - -impl StreamHandler> for NodeConnector { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - self.hb = Instant::now(); - - let data = match msg.map(|msg| self.multipart.handle(msg)) { - Ok(WsMessage::Nop) => return, - Ok(WsMessage::Ping(msg)) => { - ctx.pong(&msg); - return; - } - Ok(WsMessage::Data(data)) => data, - Ok(WsMessage::Close(reason)) => { - ctx.close(reason); - ctx.stop(); - return; - } - Err(error) => { - log::error!("{:?}", error); - ctx.stop(); - return; - } - }; - - match serde_json::from_slice(&data) { - Ok(msg) => self.handle_message(msg, ctx), - #[cfg(debug)] - Err(err) => { - let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); - log::warn!( - "Failed to parse node message: {} {}", - err, - std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8") - ) - } - #[cfg(not(debug))] - Err(_) => (), - } - } -} diff --git a/backend/core/src/shard.rs b/backend/core/src/shard.rs deleted file mode 100644 index 77bba25..0000000 --- a/backend/core/src/shard.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod connector; diff --git a/backend/core/src/shard/mod.rs b/backend/core/src/shard/mod.rs new file mode 100644 index 0000000..563893c --- /dev/null +++ b/backend/core/src/shard/mod.rs @@ -0,0 +1 @@ +pub mod connector; \ No newline at end of file diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index ba19559..8a7cd42 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -100,8 +100,6 @@ impl Chain { payload, }; - println!("Serialize {:?}", msg); - let bytes = bincode::options().serialize(&msg).unwrap(); println!("Sending update: {} bytes", bytes.len());