Wrap message's content into node's message (#306)

This commit is contained in:
Cecile Tonglet
2021-01-20 12:32:20 +01:00
committed by GitHub
parent 6a258c115a
commit 88bf301735
5 changed files with 87 additions and 35 deletions
+7 -3
View File
@@ -1,8 +1,12 @@
root = true root = true
[*] [*]
charset = utf-8 charset = utf-8
indent_style = space
indent_size = 2
trim_trailing_whitespace = true trim_trailing_whitespace = true
insert_final_newline = true insert_final_newline = true
indent_style = space
[frontend/**]
indent_size = 2
[backend/**]
indent_size = 4
+12 -12
View File
@@ -5,7 +5,7 @@ use bytes::Bytes;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount}; use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount};
use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}}; use crate::node::{Node, connector::Initialize, message::{NodeMessage, Payload}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now}; use crate::util::{DenseMap, NumStats, now};
@@ -319,13 +319,13 @@ impl Handler<UpdateNode> for Chain {
fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) { fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) {
let UpdateNode { nid, msg, raw } = msg; let UpdateNode { nid, msg, raw } = msg;
if let Some(block) = msg.details.best_block() { if let Some(block) = msg.payload().best_block() {
self.handle_block(block, nid); self.handle_block(block, nid);
} }
if let Some(node) = self.nodes.get_mut(nid) { if let Some(node) = self.nodes.get_mut(nid) {
match msg.details { match msg.payload() {
Details::SystemInterval(ref interval) => { Payload::SystemInterval(ref interval) => {
if interval.network_state.is_some() { if interval.network_state.is_some() {
if let Some(raw) = raw { if let Some(raw) = raw {
node.set_network_state(raw); node.set_network_state(raw);
@@ -344,17 +344,17 @@ impl Handler<UpdateNode> for Chain {
self.serializer.push(feed::NodeIOUpdate(nid, io)); self.serializer.push(feed::NodeIOUpdate(nid, io));
} }
} }
Details::SystemNetworkState(_) => { Payload::SystemNetworkState(_) => {
if let Some(raw) = raw { if let Some(raw) = raw {
node.set_network_state(raw); node.set_network_state(raw);
} }
} }
Details::AfgAuthoritySet(authority) => { Payload::AfgAuthoritySet(authority) => {
node.set_validator_address(authority.authority_id); node.set_validator_address(authority.authority_id.clone());
self.broadcast(); self.broadcast();
return; return;
} }
Details::AfgFinalized(finalized) => { Payload::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>() { if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() { if let Some(addr) = node.details().validator.clone() {
self.serializer.push(feed::AfgFinalized(addr, finalized_number, self.serializer.push(feed::AfgFinalized(addr, finalized_number,
@@ -364,7 +364,7 @@ impl Handler<UpdateNode> for Chain {
} }
return; return;
} }
Details::AfgReceivedPrecommit(precommit) => { Payload::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) = precommit.received.target_number.parse::<BlockNumber>() { if let Ok(finalized_number) = precommit.received.target_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() { if let Some(addr) = node.details().validator.clone() {
let voter = precommit.received.voter.clone(); let voter = precommit.received.voter.clone();
@@ -375,7 +375,7 @@ impl Handler<UpdateNode> for Chain {
} }
return; return;
} }
Details::AfgReceivedPrevote(prevote) => { Payload::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) = prevote.received.target_number.parse::<BlockNumber>() { if let Ok(finalized_number) = prevote.received.target_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() { if let Some(addr) = node.details().validator.clone() {
let voter = prevote.received.voter.clone(); let voter = prevote.received.voter.clone();
@@ -386,12 +386,12 @@ impl Handler<UpdateNode> for Chain {
} }
return; return;
} }
Details::AfgReceivedCommit(_) => { Payload::AfgReceivedCommit(_) => {
} }
_ => (), _ => (),
} }
if let Some(block) = msg.details.finalized_block() { if let Some(block) = msg.payload().finalized_block() {
if let Some(finalized) = node.update_finalized(block) { if let Some(finalized) = node.update_finalized(block) {
self.serializer.push(feed::FinalizedBlock(nid, finalized.height, finalized.hash)); self.serializer.push(feed::FinalizedBlock(nid, finalized.height, finalized.hash));
+5 -5
View File
@@ -10,7 +10,7 @@ use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode}; use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode}; use crate::chain::{Chain, UpdateNode, RemoveNode};
use crate::node::NodeId; use crate::node::NodeId;
use crate::node::message::{NodeMessage, Details, SystemConnected}; use crate::node::message::{NodeMessage, Payload};
use crate::util::LocateRequest; use crate::util::LocateRequest;
use crate::types::ConnId; use crate::types::ConnId;
@@ -96,7 +96,7 @@ impl NodeConnector {
} }
fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) { fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) {
let conn_id = msg.id.unwrap_or(0); let conn_id = msg.id();
match self.multiplex.entry(conn_id).or_default() { match self.multiplex.entry(conn_id).or_default() {
ConnMultiplex::Connected { nid, chain } => { ConnMultiplex::Connected { nid, chain } => {
@@ -107,8 +107,8 @@ impl NodeConnector {
}); });
} }
ConnMultiplex::Waiting { backlog } => { ConnMultiplex::Waiting { backlog } => {
if let Details::SystemConnected(connected) = msg.details { if let Payload::SystemConnected(connected) = msg.payload() {
let SystemConnected { network_id: _, mut node } = connected; let mut node = connected.node.clone();
let rec = ctx.address().recipient(); let rec = ctx.address().recipient();
// FIXME: Use genesis hash instead of names to avoid this mess // FIXME: Use genesis hash instead of names to avoid this mess
@@ -231,7 +231,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
#[cfg(debug)] #[cfg(debug)]
Err(err) => { Err(err) => {
let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); let data: &[u8] = data.get(..512).unwrap_or_else(|| &data);
warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")) log::warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8"))
}, },
#[cfg(not(debug))] #[cfg(not(debug))]
Err(_) => (), Err(_) => (),
+62 -14
View File
@@ -1,5 +1,4 @@
use actix::prelude::*; use actix::prelude::*;
use chrono::{DateTime, Utc};
use serde::Deserialize; use serde::Deserialize;
use serde::de::IgnoredAny; use serde::de::IgnoredAny;
use crate::node::NodeDetails; use crate::node::NodeDetails;
@@ -7,18 +6,38 @@ use crate::types::{Block, BlockNumber, BlockHash, ConnId};
#[derive(Deserialize, Debug, Message)] #[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct NodeMessage { #[serde(untagged)]
pub ts: DateTime<Utc>, pub enum NodeMessage {
pub id: Option<ConnId>, V1 {
#[serde(flatten)] #[serde(flatten)]
pub details: Details, payload: Payload,
},
V2 {
id: ConnId,
payload: Payload,
},
}
impl NodeMessage {
/// Returns a reference to the payload.
pub fn payload(&self) -> &Payload {
match self {
NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload,
}
}
/// Returns the connection ID or 0 if there is no ID.
pub fn id(&self) -> ConnId {
match self {
NodeMessage::V1 { .. } => 0,
NodeMessage::V2 { id, .. } => *id,
}
}
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(tag = "msg")] #[serde(tag = "msg")]
pub enum Details { pub enum Payload {
#[serde(rename = "node.start")]
NodeStart(Block),
#[serde(rename = "system.connected")] #[serde(rename = "system.connected")]
SystemConnected(SystemConnected), SystemConnected(SystemConnected),
#[serde(rename = "system.interval")] #[serde(rename = "system.interval")]
@@ -124,24 +143,24 @@ impl Block {
} }
} }
impl Details { impl Payload {
pub fn best_block(&self) -> Option<&Block> { pub fn best_block(&self) -> Option<&Block> {
match self { match self {
Details::BlockImport(block) => Some(block), Payload::BlockImport(block) => Some(block),
Details::SystemInterval(SystemInterval { block, .. }) => block.as_ref(), Payload::SystemInterval(SystemInterval { block, .. }) => block.as_ref(),
_ => None, _ => None,
} }
} }
pub fn finalized_block(&self) -> Option<Block> { pub fn finalized_block(&self) -> Option<Block> {
match self { match self {
Details::SystemInterval(ref interval) => { Payload::SystemInterval(ref interval) => {
Some(Block { Some(Block {
hash: interval.finalized_hash?, hash: interval.finalized_hash?,
height: interval.finalized_height?, height: interval.finalized_height?,
}) })
}, },
Details::NotifyFinalized(ref finalized) => { Payload::NotifyFinalized(ref finalized) => {
Some(Block { Some(Block {
hash: finalized.hash, hash: finalized.hash,
height: finalized.height.parse().ok()? height: finalized.height.parse().ok()?
@@ -151,3 +170,32 @@ impl Details {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message_v1() {
let json = r#"{"msg":"notify.finalized","level":"INFO","ts":"2021-01-13T12:38:25.410794650+01:00","best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb","height":"50"}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V1 { .. },
),
"message did not match variant V1",
);
}
#[test]
fn message_v2() {
let json = r#"{"id":1,"ts":"2021-01-13T12:22:20.053527101+01:00","payload":{"best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d","height":"209","msg":"notify.finalized"}}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V2 { .. },
),
"message did not match variant V2",
);
}
}
+1 -1
View File
@@ -10,7 +10,7 @@ pub type Timestamp = u64;
pub type Address = Box<str>; pub type Address = Box<str>;
pub use primitive_types::H256 as BlockHash; pub use primitive_types::H256 as BlockHash;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug, Clone)]
pub struct NodeDetails { pub struct NodeDetails {
pub chain: Box<str>, pub chain: Box<str>,
pub name: Box<str>, pub name: Box<str>,