Remove network_state handling from the BE

This commit is contained in:
Maciej Hirsz
2021-06-04 09:54:36 +02:00
parent e1daa07c51
commit 7f6bb3057b
6 changed files with 5 additions and 122 deletions
+2 -17
View File
@@ -3,11 +3,11 @@ use actix_web_actors::ws::{CloseCode, CloseReason};
use ctor::ctor; use ctor::ctor;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use crate::chain::{self, Chain, ChainId, GetNodeNetworkState, Label}; use crate::chain::{self, Chain, ChainId, Label};
use crate::feed::connector::{Connected, FeedConnector, FeedId}; use crate::feed::connector::{Connected, FeedConnector, FeedId};
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::node::connector::{Mute, NodeConnector}; use crate::node::connector::{Mute, NodeConnector};
use crate::types::{ConnId, NodeDetails, NodeId}; use crate::types::{ConnId, NodeDetails};
use crate::util::{DenseMap, Hash}; use crate::util::{DenseMap, Hash};
pub struct Aggregator { pub struct Aggregator {
@@ -178,11 +178,6 @@ pub struct Disconnect(pub FeedId);
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct NodeCount(pub ChainId, pub usize); pub struct NodeCount(pub ChainId, pub usize);
/// Message sent to the Aggregator to get the network state of a particular node
#[derive(Message)]
#[rtype(result = "Option<Request<Chain, GetNodeNetworkState>>")]
pub struct GetNetworkState(pub Box<str>, pub NodeId);
/// Message sent to the Aggregator to get a health check /// Message sent to the Aggregator to get a health check
#[derive(Message)] #[derive(Message)]
#[rtype(result = "usize")] #[rtype(result = "usize")]
@@ -368,16 +363,6 @@ impl Handler<NodeCount> for Aggregator {
} }
} }
impl Handler<GetNetworkState> for Aggregator {
type Result = <GetNetworkState as Message>::Result;
fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result {
let GetNetworkState(chain, nid) = msg;
Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid)))
}
}
impl Handler<GetHealth> for Aggregator { impl Handler<GetHealth> for Aggregator {
type Result = usize; type Result = usize;
+1 -30
View File
@@ -1,5 +1,4 @@
use actix::prelude::*; use actix::prelude::*;
use bytes::Bytes;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@@ -216,7 +215,6 @@ pub struct AddNode {
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct UpdateNode { pub struct UpdateNode {
pub nid: NodeId, pub nid: NodeId,
pub raw: Option<Bytes>,
pub payload: Payload, pub payload: Payload,
} }
@@ -251,12 +249,6 @@ pub struct LocateNode {
pub location: Arc<NodeLocation>, pub location: Arc<NodeLocation>,
} }
pub struct GetNodeNetworkState(pub NodeId);
impl Message for GetNodeNetworkState {
type Result = Option<Bytes>;
}
impl Handler<AddNode> for Chain { impl Handler<AddNode> for Chain {
type Result = (); type Result = ();
@@ -342,7 +334,7 @@ impl Handler<UpdateNode> for Chain {
type Result = (); type Result = ();
fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) { fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) {
let UpdateNode { nid, payload, raw } = msg; let UpdateNode { nid, payload } = msg;
if let Some(block) = payload.best_block() { if let Some(block) = payload.best_block() {
self.handle_block(block, nid); self.handle_block(block, nid);
@@ -351,12 +343,6 @@ impl Handler<UpdateNode> for Chain {
if let Some(node) = self.nodes.get_mut(nid) { if let Some(node) = self.nodes.get_mut(nid) {
match payload { match payload {
Payload::SystemInterval(ref interval) => { Payload::SystemInterval(ref interval) => {
if interval.network_state.is_some() {
if let Some(raw) = raw {
node.set_network_state(raw);
}
}
if node.update_hardware(interval) { if node.update_hardware(interval) {
self.serializer.push(feed::Hardware(nid, node.hardware())); self.serializer.push(feed::Hardware(nid, node.hardware()));
} }
@@ -369,11 +355,6 @@ impl Handler<UpdateNode> for Chain {
self.serializer.push(feed::NodeIOUpdate(nid, io)); self.serializer.push(feed::NodeIOUpdate(nid, io));
} }
} }
Payload::SystemNetworkState(_) => {
if let Some(raw) = raw {
node.set_network_state(raw);
}
}
Payload::AfgAuthoritySet(authority) => { Payload::AfgAuthoritySet(authority) => {
node.set_validator_address(authority.authority_id.clone()); node.set_validator_address(authority.authority_id.clone());
self.broadcast(); self.broadcast();
@@ -581,13 +562,3 @@ impl Handler<Unsubscribe> for Chain {
self.finality_feeds.remove(&fid); self.finality_feeds.remove(&fid);
} }
} }
impl Handler<GetNodeNetworkState> for Chain {
type Result = <GetNodeNetworkState as Message>::Result;
fn handle(&mut self, msg: GetNodeNetworkState, _: &mut Self::Context) -> Self::Result {
let GetNodeNetworkState(nid) = msg;
self.nodes.get(nid)?.network_state()
}
}
+1 -37
View File
@@ -17,11 +17,10 @@ mod shard;
mod types; mod types;
mod util; mod util;
use aggregator::{Aggregator, GetHealth, GetNetworkState}; use aggregator::{Aggregator, GetHealth};
use feed::connector::FeedConnector; use feed::connector::FeedConnector;
use node::connector::NodeConnector; use node::connector::NodeConnector;
use shard::connector::ShardConnector; use shard::connector::ShardConnector;
use types::NodeId;
use util::{Locator, LocatorFactory}; use util::{Locator, LocatorFactory};
const VERSION: &str = env!("CARGO_PKG_VERSION"); const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -140,40 +139,6 @@ async fn feed_route(
) )
} }
/// Entry point for network state dump
#[get("/network_state/{chain}/{nid}")]
async fn state_route(
path: web::Path<(Box<str>, NodeId)>,
aggregator: web::Data<Addr<Aggregator>>,
) -> Result<HttpResponse, Error> {
let (chain, nid) = path.into_inner();
let res = match aggregator.send(GetNetworkState(chain, nid)).await {
Ok(Some(res)) => res.await,
Ok(None) => Ok(None),
Err(error) => Err(error),
};
match res {
Ok(Some(body)) => {
HttpResponse::Ok()
.content_type("application/json")
.body(body)
.await
}
Ok(None) => {
HttpResponse::Ok()
.body("Node has disconnected or has not submitted its network state yet")
.await
}
Err(error) => {
log::error!("Network state mailbox error: {:?}", error);
HttpResponse::InternalServerError().await
}
}
}
/// Entry point for health check monitoring bots /// Entry point for health check monitoring bots
#[get("/health")] #[get("/health")]
async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse, Error> { async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse, Error> {
@@ -214,7 +179,6 @@ async fn main() -> std::io::Result<()> {
.data(locator.clone()) .data(locator.clone())
.service(node_route) .service(node_route)
.service(feed_route) .service(feed_route)
.service(state_route)
.service(health) .service(health)
}) })
.bind(opts.socket)? .bind(opts.socket)?
-31
View File
@@ -1,4 +1,3 @@
use bytes::Bytes;
use std::sync::Arc; use std::sync::Arc;
use crate::types::{ use crate::types::{
@@ -38,8 +37,6 @@ pub struct Node {
stale: bool, stale: bool,
/// Unix timestamp for when node started up (falls back to connection time) /// Unix timestamp for when node started up (falls back to connection time)
startup_time: Option<Timestamp>, startup_time: Option<Timestamp>,
/// Network state
network_state: Option<Bytes>,
} }
impl Node { impl Node {
@@ -60,7 +57,6 @@ impl Node {
location: None, location: None,
stale: false, stale: false,
startup_time, startup_time,
network_state: None,
} }
} }
@@ -211,33 +207,6 @@ impl Node {
self.details.validator = Some(addr); self.details.validator = Some(addr);
} }
pub fn set_network_state(&mut self, state: Bytes) {
self.network_state = Some(state);
}
pub fn network_state(&self) -> Option<Bytes> {
use serde::Deserialize;
use serde_json::value::RawValue;
#[derive(Deserialize)]
struct Wrapper<'a> {
#[serde(borrow)]
#[serde(alias = "network_state")]
state: &'a RawValue,
}
let raw = self.network_state.as_ref()?;
let wrap: Wrapper = serde_json::from_slice(raw).ok()?;
let json = wrap.state.get();
// Handle old nodes that exposed network_state as stringified JSON
if let Ok(stringified) = serde_json::from_str::<String>(json) {
Some(stringified.into())
} else {
Some(json.to_owned().into())
}
}
pub fn startup_time(&self) -> Option<Timestamp> { pub fn startup_time(&self) -> Option<Timestamp> {
self.startup_time self.startup_time
} }
+1 -4
View File
@@ -106,7 +106,6 @@ impl NodeConnector {
fn handle_message( fn handle_message(
&mut self, &mut self,
msg: NodeMessage, msg: NodeMessage,
data: Bytes,
ctx: &mut <Self as Actor>::Context, ctx: &mut <Self as Actor>::Context,
) { ) {
let conn_id = msg.id(); let conn_id = msg.id();
@@ -116,7 +115,6 @@ impl NodeConnector {
ConnMultiplex::Connected { nid, chain } => { ConnMultiplex::Connected { nid, chain } => {
chain.do_send(UpdateNode { chain.do_send(UpdateNode {
nid: *nid, nid: *nid,
raw: Some(data),
payload, payload,
}); });
} }
@@ -202,7 +200,6 @@ impl Handler<Initialize> for NodeConnector {
for payload in backlog.drain(..) { for payload in backlog.drain(..) {
chain.do_send(UpdateNode { chain.do_send(UpdateNode {
nid, nid,
raw: None,
payload, payload,
}); });
} }
@@ -260,7 +257,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
}; };
match serde_json::from_slice(&data) { match serde_json::from_slice(&data) {
Ok(msg) => self.handle_message(msg, data, ctx), Ok(msg) => self.handle_message(msg, ctx),
#[cfg(debug)] #[cfg(debug)]
Err(err) => { Err(err) => {
let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); let data: &[u8] = data.get(..512).unwrap_or_else(|| &data);
-3
View File
@@ -44,8 +44,6 @@ pub enum Payload {
SystemConnected(SystemConnected), SystemConnected(SystemConnected),
#[serde(rename = "system.interval")] #[serde(rename = "system.interval")]
SystemInterval(SystemInterval), SystemInterval(SystemInterval),
#[serde(rename = "system.network_state")]
SystemNetworkState(IgnoredAny),
#[serde(rename = "block.import")] #[serde(rename = "block.import")]
BlockImport(Block), BlockImport(Block),
#[serde(rename = "notify.finalized")] #[serde(rename = "notify.finalized")]
@@ -87,7 +85,6 @@ pub struct SystemInterval {
pub finalized_hash: Option<BlockHash>, pub finalized_hash: Option<BlockHash>,
#[serde(flatten)] #[serde(flatten)]
pub block: Option<Block>, pub block: Option<Block>,
pub network_state: Option<IgnoredAny>,
pub used_state_cache_size: Option<f32>, pub used_state_cache_size: Option<f32>,
} }