From 7f6bb3057b8b20ec344f4deceb5054031043602f Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Fri, 4 Jun 2021 09:54:36 +0200 Subject: [PATCH] Remove network_state handling from the BE --- backend/core/src/aggregator.rs | 19 ++------------- backend/core/src/chain.rs | 31 +----------------------- backend/core/src/main.rs | 38 +----------------------------- backend/core/src/node.rs | 31 ------------------------ backend/core/src/node/connector.rs | 5 +--- backend/core/src/node/message.rs | 3 --- 6 files changed, 5 insertions(+), 122 deletions(-) diff --git a/backend/core/src/aggregator.rs b/backend/core/src/aggregator.rs index e34a9ba..0694c8a 100644 --- a/backend/core/src/aggregator.rs +++ b/backend/core/src/aggregator.rs @@ -3,11 +3,11 @@ use actix_web_actors::ws::{CloseCode, CloseReason}; use ctor::ctor; use std::collections::{HashMap, HashSet}; -use crate::chain::{self, Chain, ChainId, GetNodeNetworkState, Label}; +use crate::chain::{self, Chain, ChainId, Label}; use crate::feed::connector::{Connected, FeedConnector, FeedId}; use crate::feed::{self, FeedMessageSerializer}; use crate::node::connector::{Mute, NodeConnector}; -use crate::types::{ConnId, NodeDetails, NodeId}; +use crate::types::{ConnId, NodeDetails}; use crate::util::{DenseMap, Hash}; pub struct Aggregator { @@ -178,11 +178,6 @@ pub struct Disconnect(pub FeedId); #[rtype(result = "()")] pub struct NodeCount(pub ChainId, pub usize); -/// Message sent to the Aggregator to get the network state of a particular node -#[derive(Message)] -#[rtype(result = "Option>")] -pub struct GetNetworkState(pub Box, pub NodeId); - /// Message sent to the Aggregator to get a health check #[derive(Message)] #[rtype(result = "usize")] @@ -368,16 +363,6 @@ impl Handler for Aggregator { } } -impl Handler for Aggregator { - type Result = ::Result; - - fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result { - let GetNetworkState(chain, nid) = msg; - - Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid))) - } -} - impl Handler for Aggregator { type Result = usize; diff --git a/backend/core/src/chain.rs b/backend/core/src/chain.rs index b4a1be1..aa5488f 100644 --- a/backend/core/src/chain.rs +++ b/backend/core/src/chain.rs @@ -1,5 +1,4 @@ use actix::prelude::*; -use bytes::Bytes; use rustc_hash::FxHashMap; use std::collections::HashMap; use std::sync::Arc; @@ -216,7 +215,6 @@ pub struct AddNode { #[rtype(result = "()")] pub struct UpdateNode { pub nid: NodeId, - pub raw: Option, pub payload: Payload, } @@ -251,12 +249,6 @@ pub struct LocateNode { pub location: Arc, } -pub struct GetNodeNetworkState(pub NodeId); - -impl Message for GetNodeNetworkState { - type Result = Option; -} - impl Handler for Chain { type Result = (); @@ -342,7 +334,7 @@ impl Handler for Chain { type Result = (); fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) { - let UpdateNode { nid, payload, raw } = msg; + let UpdateNode { nid, payload } = msg; if let Some(block) = payload.best_block() { self.handle_block(block, nid); @@ -351,12 +343,6 @@ impl Handler for Chain { if let Some(node) = self.nodes.get_mut(nid) { match payload { Payload::SystemInterval(ref interval) => { - if interval.network_state.is_some() { - if let Some(raw) = raw { - node.set_network_state(raw); - } - } - if node.update_hardware(interval) { self.serializer.push(feed::Hardware(nid, node.hardware())); } @@ -369,11 +355,6 @@ impl Handler for Chain { self.serializer.push(feed::NodeIOUpdate(nid, io)); } } - Payload::SystemNetworkState(_) => { - if let Some(raw) = raw { - node.set_network_state(raw); - } - } Payload::AfgAuthoritySet(authority) => { node.set_validator_address(authority.authority_id.clone()); self.broadcast(); @@ -581,13 +562,3 @@ impl Handler for Chain { self.finality_feeds.remove(&fid); } } - -impl Handler for Chain { - type Result = ::Result; - - fn handle(&mut self, msg: GetNodeNetworkState, _: &mut Self::Context) -> Self::Result { - let GetNodeNetworkState(nid) = msg; - - self.nodes.get(nid)?.network_state() - } -} diff --git a/backend/core/src/main.rs b/backend/core/src/main.rs index 3064cf1..9e373f9 100644 --- a/backend/core/src/main.rs +++ b/backend/core/src/main.rs @@ -17,11 +17,10 @@ mod shard; mod types; mod util; -use aggregator::{Aggregator, GetHealth, GetNetworkState}; +use aggregator::{Aggregator, GetHealth}; use feed::connector::FeedConnector; use node::connector::NodeConnector; use shard::connector::ShardConnector; -use types::NodeId; use util::{Locator, LocatorFactory}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -140,40 +139,6 @@ async fn feed_route( ) } -/// Entry point for network state dump -#[get("/network_state/{chain}/{nid}")] -async fn state_route( - path: web::Path<(Box, NodeId)>, - aggregator: web::Data>, -) -> Result { - let (chain, nid) = path.into_inner(); - - let res = match aggregator.send(GetNetworkState(chain, nid)).await { - Ok(Some(res)) => res.await, - Ok(None) => Ok(None), - Err(error) => Err(error), - }; - - match res { - Ok(Some(body)) => { - HttpResponse::Ok() - .content_type("application/json") - .body(body) - .await - } - Ok(None) => { - HttpResponse::Ok() - .body("Node has disconnected or has not submitted its network state yet") - .await - } - Err(error) => { - log::error!("Network state mailbox error: {:?}", error); - - HttpResponse::InternalServerError().await - } - } -} - /// Entry point for health check monitoring bots #[get("/health")] async fn health(aggregator: web::Data>) -> Result { @@ -214,7 +179,6 @@ async fn main() -> std::io::Result<()> { .data(locator.clone()) .service(node_route) .service(feed_route) - .service(state_route) .service(health) }) .bind(opts.socket)? diff --git a/backend/core/src/node.rs b/backend/core/src/node.rs index 23f3def..ac2ef2f 100644 --- a/backend/core/src/node.rs +++ b/backend/core/src/node.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use std::sync::Arc; use crate::types::{ @@ -38,8 +37,6 @@ pub struct Node { stale: bool, /// Unix timestamp for when node started up (falls back to connection time) startup_time: Option, - /// Network state - network_state: Option, } impl Node { @@ -60,7 +57,6 @@ impl Node { location: None, stale: false, startup_time, - network_state: None, } } @@ -211,33 +207,6 @@ impl Node { self.details.validator = Some(addr); } - pub fn set_network_state(&mut self, state: Bytes) { - self.network_state = Some(state); - } - - pub fn network_state(&self) -> Option { - use serde::Deserialize; - use serde_json::value::RawValue; - - #[derive(Deserialize)] - struct Wrapper<'a> { - #[serde(borrow)] - #[serde(alias = "network_state")] - state: &'a RawValue, - } - - let raw = self.network_state.as_ref()?; - let wrap: Wrapper = serde_json::from_slice(raw).ok()?; - let json = wrap.state.get(); - - // Handle old nodes that exposed network_state as stringified JSON - if let Ok(stringified) = serde_json::from_str::(json) { - Some(stringified.into()) - } else { - Some(json.to_owned().into()) - } - } - pub fn startup_time(&self) -> Option { self.startup_time } diff --git a/backend/core/src/node/connector.rs b/backend/core/src/node/connector.rs index 819d0f3..81fddd8 100644 --- a/backend/core/src/node/connector.rs +++ b/backend/core/src/node/connector.rs @@ -106,7 +106,6 @@ impl NodeConnector { fn handle_message( &mut self, msg: NodeMessage, - data: Bytes, ctx: &mut ::Context, ) { let conn_id = msg.id(); @@ -116,7 +115,6 @@ impl NodeConnector { ConnMultiplex::Connected { nid, chain } => { chain.do_send(UpdateNode { nid: *nid, - raw: Some(data), payload, }); } @@ -202,7 +200,6 @@ impl Handler for NodeConnector { for payload in backlog.drain(..) { chain.do_send(UpdateNode { nid, - raw: None, payload, }); } @@ -260,7 +257,7 @@ impl StreamHandler> for NodeConnector { }; match serde_json::from_slice(&data) { - Ok(msg) => self.handle_message(msg, data, ctx), + Ok(msg) => self.handle_message(msg, ctx), #[cfg(debug)] Err(err) => { let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); diff --git a/backend/core/src/node/message.rs b/backend/core/src/node/message.rs index 8738c8a..64cd8a5 100644 --- a/backend/core/src/node/message.rs +++ b/backend/core/src/node/message.rs @@ -44,8 +44,6 @@ pub enum Payload { SystemConnected(SystemConnected), #[serde(rename = "system.interval")] SystemInterval(SystemInterval), - #[serde(rename = "system.network_state")] - SystemNetworkState(IgnoredAny), #[serde(rename = "block.import")] BlockImport(Block), #[serde(rename = "notify.finalized")] @@ -87,7 +85,6 @@ pub struct SystemInterval { pub finalized_hash: Option, #[serde(flatten)] pub block: Option, - pub network_state: Option, pub used_state_cache_size: Option, }