From 88bf3017351dcd0025a6fe3e0a2d90f847ea978f Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 20 Jan 2021 12:32:20 +0100 Subject: [PATCH] Wrap message's content into node's message (#306) --- .editorconfig | 10 +++-- backend/src/chain.rs | 24 +++++------ backend/src/node/connector.rs | 10 ++--- backend/src/node/message.rs | 76 ++++++++++++++++++++++++++++------- backend/src/types.rs | 2 +- 5 files changed, 87 insertions(+), 35 deletions(-) diff --git a/.editorconfig b/.editorconfig index 0043c7c..75782ed 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,8 +1,12 @@ root = true - [*] charset = utf-8 -indent_style = space -indent_size = 2 trim_trailing_whitespace = true insert_final_newline = true +indent_style = space + +[frontend/**] +indent_size = 2 + +[backend/**] +indent_size = 4 diff --git a/backend/src/chain.rs b/backend/src/chain.rs index 51d4238..abd85ff 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use rustc_hash::FxHashMap; use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount}; -use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}}; +use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::{self, FeedMessageSerializer}; use crate::util::{DenseMap, NumStats, now}; @@ -319,13 +319,13 @@ impl Handler for Chain { fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) { let UpdateNode { nid, msg, raw } = msg; - if let Some(block) = msg.details.best_block() { + if let Some(block) = msg.payload().best_block() { self.handle_block(block, nid); } if let Some(node) = self.nodes.get_mut(nid) { - match msg.details { - Details::SystemInterval(ref interval) => { + match msg.payload() { + Payload::SystemInterval(ref interval) => { if interval.network_state.is_some() { if let Some(raw) = raw { node.set_network_state(raw); @@ -344,17 +344,17 @@ impl Handler for Chain { self.serializer.push(feed::NodeIOUpdate(nid, io)); } } - Details::SystemNetworkState(_) => { + Payload::SystemNetworkState(_) => { if let Some(raw) = raw { node.set_network_state(raw); } } - Details::AfgAuthoritySet(authority) => { - node.set_validator_address(authority.authority_id); + Payload::AfgAuthoritySet(authority) => { + node.set_validator_address(authority.authority_id.clone()); self.broadcast(); return; } - Details::AfgFinalized(finalized) => { + Payload::AfgFinalized(finalized) => { if let Ok(finalized_number) = finalized.finalized_number.parse::() { if let Some(addr) = node.details().validator.clone() { self.serializer.push(feed::AfgFinalized(addr, finalized_number, @@ -364,7 +364,7 @@ impl Handler for Chain { } return; } - Details::AfgReceivedPrecommit(precommit) => { + Payload::AfgReceivedPrecommit(precommit) => { if let Ok(finalized_number) = precommit.received.target_number.parse::() { if let Some(addr) = node.details().validator.clone() { let voter = precommit.received.voter.clone(); @@ -375,7 +375,7 @@ impl Handler for Chain { } return; } - Details::AfgReceivedPrevote(prevote) => { + Payload::AfgReceivedPrevote(prevote) => { if let Ok(finalized_number) = prevote.received.target_number.parse::() { if let Some(addr) = node.details().validator.clone() { let voter = prevote.received.voter.clone(); @@ -386,12 +386,12 @@ impl Handler for Chain { } return; } - Details::AfgReceivedCommit(_) => { + Payload::AfgReceivedCommit(_) => { } _ => (), } - if let Some(block) = msg.details.finalized_block() { + if let Some(block) = msg.payload().finalized_block() { if let Some(finalized) = node.update_finalized(block) { self.serializer.push(feed::FinalizedBlock(nid, finalized.height, finalized.hash)); diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index c182482..70c4e24 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -10,7 +10,7 @@ use actix_http::ws::Item; use crate::aggregator::{Aggregator, AddNode}; use crate::chain::{Chain, UpdateNode, RemoveNode}; use crate::node::NodeId; -use crate::node::message::{NodeMessage, Details, SystemConnected}; +use crate::node::message::{NodeMessage, Payload}; use crate::util::LocateRequest; use crate::types::ConnId; @@ -96,7 +96,7 @@ impl NodeConnector { } fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut ::Context) { - let conn_id = msg.id.unwrap_or(0); + let conn_id = msg.id(); match self.multiplex.entry(conn_id).or_default() { ConnMultiplex::Connected { nid, chain } => { @@ -107,8 +107,8 @@ impl NodeConnector { }); } ConnMultiplex::Waiting { backlog } => { - if let Details::SystemConnected(connected) = msg.details { - let SystemConnected { network_id: _, mut node } = connected; + if let Payload::SystemConnected(connected) = msg.payload() { + let mut node = connected.node.clone(); let rec = ctx.address().recipient(); // FIXME: Use genesis hash instead of names to avoid this mess @@ -231,7 +231,7 @@ impl StreamHandler> for NodeConnector { #[cfg(debug)] Err(err) => { let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); - warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")) + log::warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")) }, #[cfg(not(debug))] Err(_) => (), diff --git a/backend/src/node/message.rs b/backend/src/node/message.rs index 3b6ff66..03f0d8a 100644 --- a/backend/src/node/message.rs +++ b/backend/src/node/message.rs @@ -1,5 +1,4 @@ use actix::prelude::*; -use chrono::{DateTime, Utc}; use serde::Deserialize; use serde::de::IgnoredAny; use crate::node::NodeDetails; @@ -7,18 +6,38 @@ use crate::types::{Block, BlockNumber, BlockHash, ConnId}; #[derive(Deserialize, Debug, Message)] #[rtype(result = "()")] -pub struct NodeMessage { - pub ts: DateTime, - pub id: Option, - #[serde(flatten)] - pub details: Details, +#[serde(untagged)] +pub enum NodeMessage { + V1 { + #[serde(flatten)] + payload: Payload, + }, + V2 { + id: ConnId, + payload: Payload, + }, +} + +impl NodeMessage { + /// Returns a reference to the payload. + pub fn payload(&self) -> &Payload { + match self { + NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload, + } + } + + /// Returns the connection ID or 0 if there is no ID. + pub fn id(&self) -> ConnId { + match self { + NodeMessage::V1 { .. } => 0, + NodeMessage::V2 { id, .. } => *id, + } + } } #[derive(Deserialize, Debug)] #[serde(tag = "msg")] -pub enum Details { - #[serde(rename = "node.start")] - NodeStart(Block), +pub enum Payload { #[serde(rename = "system.connected")] SystemConnected(SystemConnected), #[serde(rename = "system.interval")] @@ -124,24 +143,24 @@ impl Block { } } -impl Details { +impl Payload { pub fn best_block(&self) -> Option<&Block> { match self { - Details::BlockImport(block) => Some(block), - Details::SystemInterval(SystemInterval { block, .. }) => block.as_ref(), + Payload::BlockImport(block) => Some(block), + Payload::SystemInterval(SystemInterval { block, .. }) => block.as_ref(), _ => None, } } pub fn finalized_block(&self) -> Option { match self { - Details::SystemInterval(ref interval) => { + Payload::SystemInterval(ref interval) => { Some(Block { hash: interval.finalized_hash?, height: interval.finalized_height?, }) }, - Details::NotifyFinalized(ref finalized) => { + Payload::NotifyFinalized(ref finalized) => { Some(Block { hash: finalized.hash, height: finalized.height.parse().ok()? @@ -151,3 +170,32 @@ impl Details { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn message_v1() { + let json = r#"{"msg":"notify.finalized","level":"INFO","ts":"2021-01-13T12:38:25.410794650+01:00","best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb","height":"50"}"#; + assert!( + matches!( + serde_json::from_str::(json).unwrap(), + NodeMessage::V1 { .. }, + ), + "message did not match variant V1", + ); + } + + #[test] + fn message_v2() { + let json = r#"{"id":1,"ts":"2021-01-13T12:22:20.053527101+01:00","payload":{"best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d","height":"209","msg":"notify.finalized"}}"#; + assert!( + matches!( + serde_json::from_str::(json).unwrap(), + NodeMessage::V2 { .. }, + ), + "message did not match variant V2", + ); + } +} diff --git a/backend/src/types.rs b/backend/src/types.rs index 17d0a3b..42db6ac 100644 --- a/backend/src/types.rs +++ b/backend/src/types.rs @@ -10,7 +10,7 @@ pub type Timestamp = u64; pub type Address = Box; pub use primitive_types::H256 as BlockHash; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct NodeDetails { pub chain: Box, pub name: Box,