diff --git a/backend/Cargo.lock b/backend/Cargo.lock index db29d1c..4d1fffd 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1658,6 +1658,7 @@ dependencies = [ "clap", "common", "log", + "primitive-types", "rustc-hash", "serde", "serde_json", diff --git a/backend/common/src/util/hash.rs b/backend/common/src/json/hash.rs similarity index 96% rename from backend/common/src/util/hash.rs rename to backend/common/src/json/hash.rs index 23b10be..01fbe1e 100644 --- a/backend/common/src/util/hash.rs +++ b/backend/common/src/json/hash.rs @@ -8,10 +8,16 @@ use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor, SeqAccess} const HASH_BYTES: usize = 32; /// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`. -// We could use primitive_types::H256 here, but opted for a custom type to avoid more dependencies. +/// This can deserialize from a JSON string or array. #[derive(Hash, PartialEq, Eq, Clone, Copy)] pub struct Hash([u8; HASH_BYTES]); +impl From for crate::types::BlockHash { + fn from(hash: Hash) -> Self { + hash.0.into() + } +} + struct HashVisitor; impl<'de> Visitor<'de> for HashVisitor { diff --git a/backend/common/src/json/mod.rs b/backend/common/src/json/mod.rs new file mode 100644 index 0000000..22f233f --- /dev/null +++ b/backend/common/src/json/mod.rs @@ -0,0 +1,7 @@ +//! This module contains the types we need to deserialize JSON messages from nodes + +mod hash; +mod node_message; + +pub use node_message::*; +pub use hash::Hash; \ No newline at end of file diff --git a/backend/common/src/json/node_message.rs b/backend/common/src/json/node_message.rs new file mode 100644 index 0000000..b23ece5 --- /dev/null +++ b/backend/common/src/json/node_message.rs @@ -0,0 +1,192 @@ +//! The structs and enums defined in this module are largely identical to those +//! we'll use elsewhere internally, but are kept separate so that the JSON structure +//! is defined (almost) from just this file, and we don't have to worry about breaking +//! compatibility with the input data when we make changes to our internal data +//! structures (for example, to support bincode better). +use super::hash::Hash; +use serde::{Deserialize}; + +/// This struct represents a telemetry message sent from a node as +/// a JSON payload. Since JSON is self describing, we can use attributes +/// like serde(untagged) and serde(flatten) without issue. +/// +/// Internally, we want to minimise the amount of data sent from shards to +/// the core node. For that reason, we use a non-self-describing serialization +/// format like bincode, which doesn't support things like `[serde(flatten)]` (which +/// internally wants to serialize to a map of unknown length) or `[serde(tag/untagged)]` +/// (which relies on the data to know which variant to deserialize to.) +/// +/// So, this can be converted fairly cheaply into an enum we'll use internally +/// which is compatible with formats like bincode. +#[derive(Deserialize, Debug)] +#[serde(untagged)] +pub enum NodeMessage { + V1 { + #[serde(flatten)] + payload: Payload, + }, + V2 { + id: ConnId, + payload: Payload, + }, +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "msg")] +pub enum Payload { + #[serde(rename = "system.connected")] + SystemConnected(SystemConnected), + #[serde(rename = "system.interval")] + SystemInterval(SystemInterval), + #[serde(rename = "block.import")] + BlockImport(Block), + #[serde(rename = "notify.finalized")] + NotifyFinalized(Finalized), + #[serde(rename = "txpool.import")] + TxPoolImport, + #[serde(rename = "afg.finalized")] + AfgFinalized(AfgFinalized), + #[serde(rename = "afg.received_precommit")] + AfgReceivedPrecommit(AfgReceivedPrecommit), + #[serde(rename = "afg.received_prevote")] + AfgReceivedPrevote(AfgReceivedPrevote), + #[serde(rename = "afg.received_commit")] + AfgReceivedCommit(AfgReceivedCommit), + #[serde(rename = "afg.authority_set")] + AfgAuthoritySet(AfgAuthoritySet), + #[serde(rename = "afg.finalized_blocks_up_to")] + AfgFinalizedBlocksUpTo, + #[serde(rename = "aura.pre_sealed_block")] + AuraPreSealedBlock, + #[serde(rename = "prepared_block_for_proposing")] + PreparedBlockForProposing, +} + +#[derive(Deserialize, Debug)] +pub struct SystemConnected { + pub genesis_hash: Hash, + #[serde(flatten)] + pub node: NodeDetails, +} + +#[derive(Deserialize, Debug)] +pub struct SystemInterval { + pub peers: Option, + pub txcount: Option, + pub bandwidth_upload: Option, + pub bandwidth_download: Option, + pub finalized_height: Option, + pub finalized_hash: Option, + #[serde(flatten)] + pub block: Option, + pub used_state_cache_size: Option, +} + +#[derive(Deserialize, Debug)] +pub struct Finalized { + #[serde(rename = "best")] + pub hash: Hash, + pub height: Box, +} + +#[derive(Deserialize, Debug)] +pub struct AfgAuthoritySet { + pub authority_id: Box, + pub authorities: Box, + pub authority_set_id: Box, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgFinalized { + pub finalized_hash: Hash, + pub finalized_number: Box, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceived { + pub target_hash: Hash, + pub target_number: Box, + pub voter: Option>, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceivedPrecommit { + #[serde(flatten)] + pub received: AfgReceived, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceivedPrevote { + #[serde(flatten)] + pub received: AfgReceived, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceivedCommit { + #[serde(flatten)] + pub received: AfgReceived, +} + +#[derive(Deserialize, Debug, Clone, Copy)] +pub struct Block { + #[serde(rename = "best")] + pub hash: Hash, + pub height: BlockNumber, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct NodeDetails { + pub chain: Box, + pub name: Box, + pub implementation: Box, + pub version: Box, + pub validator: Option>, + pub network_id: Option>, + pub startup_time: Option>, +} + +type ConnId = u64; +type BlockNumber = u64; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn message_v1() { + let json = r#"{ + "msg":"notify.finalized", + "level":"INFO", + "ts":"2021-01-13T12:38:25.410794650+01:00", + "best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb", + "height":"50" + }"#; + assert!( + matches!( + serde_json::from_str::(json).unwrap(), + NodeMessage::V1 { .. }, + ), + "message did not match variant V1", + ); + } + + #[test] + fn message_v2() { + let json = r#"{ + "id":1, + "ts":"2021-01-13T12:22:20.053527101+01:00", + "payload":{ + "best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d", + "height":"209", + "msg":"notify.finalized" + } + }"#; + assert!( + matches!( + serde_json::from_str::(json).unwrap(), + NodeMessage::V2 { .. }, + ), + "message did not match variant V2", + ); + } +} diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index dc147b4..a18980d 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -3,3 +3,4 @@ pub mod shard; pub mod types; pub mod util; pub mod ws; +pub mod json; \ No newline at end of file diff --git a/backend/common/src/node.rs b/backend/common/src/node.rs index 420e568..ad66da2 100644 --- a/backend/common/src/node.rs +++ b/backend/common/src/node.rs @@ -1,15 +1,13 @@ use crate::types::{Block, BlockHash, BlockNumber, ConnId, NodeDetails}; -use crate::util::{Hash, NullAny}; +use crate::json; + use actix::prelude::*; use serde::{Deserialize, Serialize}; -use serde::ser::Serializer; -#[derive(Deserialize, Debug, Message)] +#[derive(Serialize, Deserialize, Debug, Message)] #[rtype(result = "()")] -#[serde(untagged)] pub enum NodeMessage { V1 { - #[serde(flatten)] payload: Payload, }, V2 { @@ -36,64 +34,98 @@ impl From for Payload { } } -#[derive(Deserialize, Debug)] -#[serde(tag = "msg")] -pub enum Payload { - #[serde(rename = "system.connected")] - SystemConnected(SystemConnected), - #[serde(rename = "system.interval")] - SystemInterval(SystemInterval), - #[serde(rename = "block.import")] - BlockImport(Block), - #[serde(rename = "notify.finalized")] - NotifyFinalized(Finalized), - #[serde(rename = "txpool.import")] - TxPoolImport(NullAny), - // #[serde(rename = "afg.finalized")] - // AfgFinalized(AfgFinalized), - // #[serde(rename = "afg.received_precommit")] - // AfgReceivedPrecommit(AfgReceivedPrecommit), - // #[serde(rename = "afg.received_prevote")] - // AfgReceivedPrevote(AfgReceivedPrevote), - // #[serde(rename = "afg.received_commit")] - // AfgReceivedCommit(AfgReceivedCommit), - // #[serde(rename = "afg.authority_set")] - // AfgAuthoritySet(AfgAuthoritySet), - // #[serde(rename = "afg.finalized_blocks_up_to")] - // AfgFinalizedBlocksUpTo(NullAny), - // #[serde(rename = "aura.pre_sealed_block")] - // AuraPreSealedBlock(NullAny), - #[serde(rename = "prepared_block_for_proposing")] - PreparedBlockForProposing(NullAny), -} - -impl Serialize for Payload { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - use Payload::*; - - match self { - SystemConnected(val) => serializer.serialize_newtype_variant("Payload", 0, "system.connected", val), - SystemInterval(val) => serializer.serialize_newtype_variant("Payload", 1, "system.interval", val), - BlockImport(val) => serializer.serialize_newtype_variant("Payload", 3, "block.import", val), - NotifyFinalized(val) => serializer.serialize_newtype_variant("Payload", 4, "notify.finalized", val), - TxPoolImport(_) => serializer.serialize_unit_variant("Payload", 3, "txpool.import"), - PreparedBlockForProposing(_) => serializer.serialize_unit_variant("Payload", 4, "prepared_block_for_proposing"), - _ => unimplemented!() +impl From for NodeMessage { + fn from(msg: json::NodeMessage) -> Self { + match msg { + json::NodeMessage::V1 { payload } => { + NodeMessage::V1 { payload: payload.into() } + }, + json::NodeMessage::V2 { id, payload } => { + NodeMessage::V2 { id, payload: payload.into() } + }, } } } -#[derive(Deserialize, Serialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] +pub enum Payload { + SystemConnected(SystemConnected), + SystemInterval(SystemInterval), + BlockImport(Block), + NotifyFinalized(Finalized), + TxPoolImport, + AfgFinalized(AfgFinalized), + AfgReceivedPrecommit(AfgReceived), + AfgReceivedPrevote(AfgReceived), + AfgReceivedCommit(AfgReceived), + AfgAuthoritySet(AfgAuthoritySet), + AfgFinalizedBlocksUpTo, + AuraPreSealedBlock, + PreparedBlockForProposing, +} + +impl From for Payload { + fn from(msg: json::Payload) -> Self { + match msg { + json::Payload::SystemConnected(m) => { + Payload::SystemConnected(m.into()) + }, + json::Payload::SystemInterval(m) => { + Payload::SystemInterval(m.into()) + }, + json::Payload::BlockImport(m) => { + Payload::BlockImport(m.into()) + }, + json::Payload::NotifyFinalized(m) => { + Payload::NotifyFinalized(m.into()) + }, + json::Payload::TxPoolImport => { + Payload::TxPoolImport + }, + json::Payload::AfgFinalized(m) => { + Payload::AfgFinalized(m.into()) + }, + json::Payload::AfgReceivedPrecommit(m) => { + Payload::AfgReceivedPrecommit(m.received.into()) + }, + json::Payload::AfgReceivedPrevote(m) => { + Payload::AfgReceivedPrevote(m.received.into()) + }, + json::Payload::AfgReceivedCommit(m) => { + Payload::AfgReceivedCommit(m.received.into()) + }, + json::Payload::AfgAuthoritySet(m) => { + Payload::AfgAuthoritySet(m.into()) + }, + json::Payload::AfgFinalizedBlocksUpTo => { + Payload::AfgFinalizedBlocksUpTo + }, + json::Payload::AuraPreSealedBlock => { + Payload::AuraPreSealedBlock + }, + json::Payload::PreparedBlockForProposing => { + Payload::PreparedBlockForProposing + }, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] pub struct SystemConnected { - pub genesis_hash: Hash, - #[serde(flatten)] + pub genesis_hash: BlockHash, pub node: NodeDetails, } -#[derive(Deserialize, Serialize, Debug)] +impl From for SystemConnected { + fn from(msg: json::SystemConnected) -> Self { + SystemConnected { + genesis_hash: msg.genesis_hash.into(), + node: msg.node.into() + } + } +} + +#[derive(Serialize, Deserialize, Debug)] pub struct SystemInterval { pub peers: Option, pub txcount: Option, @@ -101,54 +133,87 @@ pub struct SystemInterval { pub bandwidth_download: Option, pub finalized_height: Option, pub finalized_hash: Option, - #[serde(flatten)] pub block: Option, pub used_state_cache_size: Option, } -#[derive(Deserialize, Serialize, Debug)] +impl From for SystemInterval { + fn from(msg: json::SystemInterval) -> Self { + SystemInterval { + peers: msg.peers, + txcount: msg.txcount, + bandwidth_upload: msg.bandwidth_upload, + bandwidth_download: msg.bandwidth_download, + finalized_height: msg.finalized_height, + finalized_hash: msg.finalized_hash.map(|h| h.into()), + block: msg.block.map(|b| b.into()), + used_state_cache_size: msg.used_state_cache_size, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] pub struct Finalized { - #[serde(rename = "best")] pub hash: BlockHash, pub height: Box, } -#[derive(Deserialize, Serialize, Debug)] -pub struct AfgAuthoritySet { - pub authority_id: Box, - pub authorities: Box, - pub authority_set_id: Box, +impl From for Finalized { + fn from(msg: json::Finalized) -> Self { + Finalized { + hash: msg.hash.into(), + height: msg.height, + } + } } -#[derive(Deserialize, Serialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct AfgFinalized { pub finalized_hash: BlockHash, pub finalized_number: Box, } -#[derive(Deserialize, Serialize, Debug, Clone)] +impl From for AfgFinalized { + fn from(msg: json::AfgFinalized) -> Self { + AfgFinalized { + finalized_hash: msg.finalized_hash.into(), + finalized_number: msg.finalized_number, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct AfgReceived { pub target_hash: BlockHash, pub target_number: Box, pub voter: Option>, } -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct AfgReceivedPrecommit { - #[serde(flatten)] - pub received: AfgReceived, +impl From for AfgReceived { + fn from(msg: json::AfgReceived) -> Self { + AfgReceived { + target_hash: msg.target_hash.into(), + target_number: msg.target_number, + voter: msg.voter, + } + } } -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct AfgReceivedPrevote { - #[serde(flatten)] - pub received: AfgReceived, +#[derive(Serialize, Deserialize, Debug)] +pub struct AfgAuthoritySet { + pub authority_id: Box, + pub authorities: Box, + pub authority_set_id: Box, } -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct AfgReceivedCommit { - #[serde(flatten)] - pub received: AfgReceived, +impl From for AfgAuthoritySet { + fn from(msg: json::AfgAuthoritySet) -> Self { + AfgAuthoritySet { + authority_id: msg.authority_id, + authorities: msg.authorities, + authority_set_id: msg.authority_set_id, + } + } } impl Payload { @@ -180,27 +245,123 @@ mod tests { use super::*; use bincode::Options; + // Without adding a derive macro and marker trait (and enforcing their use), we don't really + // know whether things can (de)serialize to bincode or not at runtime without failing unless + // we test the different types we want to (de)serialize ourselves. We just need to test each + // type, not each variant. + fn bincode_can_serialize_and_deserialize<'de, T>(item: T) + where T: Serialize + serde::de::DeserializeOwned + { + let bytes = bincode::serialize(&item).expect("Serialization should work"); + let _: T = bincode::deserialize(&bytes).expect("Deserialization should work"); + } + #[test] - fn message_v1() { - let json = r#"{"msg":"notify.finalized","level":"INFO","ts":"2021-01-13T12:38:25.410794650+01:00","best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb","height":"50"}"#; - assert!( - matches!( - serde_json::from_str::(json).unwrap(), - NodeMessage::V1 { .. }, - ), - "message did not match variant V1", + fn bincode_can_serialize_and_deserialize_node_message_system_connected() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::SystemConnected(SystemConnected { + genesis_hash: BlockHash::zero(), + node: NodeDetails { + chain: "foo".into(), + name: "foo".into(), + implementation: "foo".into(), + version: "foo".into(), + validator: None, + network_id: None, + startup_time: None, + }, + }) + } ); } #[test] - fn message_v2() { - let json = r#"{"id":1,"ts":"2021-01-13T12:22:20.053527101+01:00","payload":{"best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d","height":"209","msg":"notify.finalized"}}"#; - assert!( - matches!( - serde_json::from_str::(json).unwrap(), - NodeMessage::V2 { .. }, - ), - "message did not match variant V2", + fn bincode_can_serialize_and_deserialize_node_message_system_interval() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::SystemInterval(SystemInterval { + peers: None, + txcount: None, + bandwidth_upload: None, + bandwidth_download: None, + finalized_height: None, + finalized_hash: None, + block: None, + used_state_cache_size: None, + }) + } + ); + } + + #[test] + fn bincode_can_serialize_and_deserialize_node_message_block_import() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::BlockImport(Block { + hash: BlockHash([0; 32]), + height: 0, + }) + } + ); + } + + #[test] + fn bincode_can_serialize_and_deserialize_node_message_notify_finalized() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::NotifyFinalized(Finalized { + hash: BlockHash::zero(), + height: "foo".into(), + }) + } + ); + } + + #[test] + fn bincode_can_serialize_and_deserialize_node_message_tx_pool_import() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::TxPoolImport + } + ); + } + + #[test] + fn bincode_can_serialize_and_deserialize_node_message_afg_finalized() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::AfgFinalized(AfgFinalized { + finalized_hash: BlockHash::zero(), + finalized_number: "foo".into(), + }) + } + ); + } + + #[test] + fn bincode_can_serialize_and_deserialize_node_message_afg_received() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::AfgReceivedPrecommit(AfgReceived { + target_hash: BlockHash::zero(), + target_number: "foo".into(), + voter: None, + }) + } + ); + } + + #[test] + fn bincode_can_serialize_and_deserialize_node_message_afg_authority_set() { + bincode_can_serialize_and_deserialize( + NodeMessage::V1 { + payload: Payload::AfgAuthoritySet(AfgAuthoritySet { + authority_id: "foo".into(), + authorities: "foo".into(), + authority_set_id: "foo".into(), + }) + } ); } diff --git a/backend/common/src/types.rs b/backend/common/src/types.rs index a16ce5a..aa85996 100644 --- a/backend/common/src/types.rs +++ b/backend/common/src/types.rs @@ -2,6 +2,7 @@ use serde::ser::{SerializeTuple, Serializer}; use serde::{Deserialize, Serialize}; use crate::util::{now, MeanList}; +use crate::json; pub type NodeId = usize; pub type ConnId = u64; @@ -21,6 +22,20 @@ pub struct NodeDetails { pub startup_time: Option>, } +impl From for NodeDetails { + fn from(details: json::NodeDetails) -> Self { + NodeDetails { + chain: details.chain, + name: details.name, + implementation: details.implementation, + version: details.version, + validator: details.validator, + network_id: details.network_id, + startup_time: details.startup_time, + } + } +} + #[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct NodeStats { pub peers: u64, @@ -34,11 +49,19 @@ pub struct NodeIO { #[derive(Deserialize, Serialize, Debug, Clone, Copy)] pub struct Block { - #[serde(rename = "best")] pub hash: BlockHash, pub height: BlockNumber, } +impl From for Block { + fn from(block: json::Block) -> Self { + Block { + hash: block.hash.into(), + height: block.height + } + } +} + impl Block { pub fn zero() -> Self { Block { diff --git a/backend/common/src/util.rs b/backend/common/src/util.rs index a78310e..66dddae 100644 --- a/backend/common/src/util.rs +++ b/backend/common/src/util.rs @@ -1,11 +1,9 @@ mod dense_map; -mod hash; mod mean_list; mod null; mod num_stats; pub use dense_map::DenseMap; -pub use hash::Hash; pub use mean_list::MeanList; pub use null::NullAny; pub use num_stats::NumStats; diff --git a/backend/core/src/aggregator.rs b/backend/core/src/aggregator.rs index 4e1a289..8dd3d98 100644 --- a/backend/core/src/aggregator.rs +++ b/backend/core/src/aggregator.rs @@ -9,11 +9,11 @@ use crate::feed::{self, FeedMessageSerializer}; use crate::node::connector::NodeConnector; use common::ws::MuteReason; use common::shard::ShardConnId; -use common::types::{ConnId, NodeDetails}; -use common::util::{DenseMap, Hash}; +use common::types::{ConnId, NodeDetails, BlockHash}; +use common::util::{DenseMap}; pub struct Aggregator { - genesis_hashes: HashMap, + genesis_hashes: HashMap, labels: HashMap, chains: DenseMap, feeds: DenseMap>, @@ -26,7 +26,7 @@ pub struct ChainEntry { /// Address to the `Chain` agent addr: Addr, /// Genesis [`Hash`] of the chain - genesis_hash: Hash, + genesis_hash: BlockHash, /// String name of the chain label: Label, /// Node count @@ -66,7 +66,7 @@ impl Aggregator { /// or the address is disconnected (actor dropped), create a new one. pub fn lazy_chain( &mut self, - genesis_hash: Hash, + genesis_hash: BlockHash, label: &str, ctx: &mut ::Context, ) -> ChainId { @@ -125,7 +125,7 @@ pub struct AddNode { /// Details of the node being added to the aggregator pub node: NodeDetails, /// Genesis [`Hash`] of the chain the node is being added to. - pub genesis_hash: Hash, + pub genesis_hash: BlockHash, /// Source from which this node is being added (Direct | Shard) pub source: NodeSource, } diff --git a/backend/core/src/main.rs b/backend/core/src/main.rs index 241a577..958af9d 100644 --- a/backend/core/src/main.rs +++ b/backend/core/src/main.rs @@ -1,6 +1,5 @@ use std::collections::HashSet; use std::iter::FromIterator; -use std::net::Ipv4Addr; use actix::prelude::*; use actix_http::ws::Codec; @@ -19,7 +18,6 @@ mod shard; use aggregator::{Aggregator, GetHealth}; use feed::connector::FeedConnector; use location::{Locator, LocatorFactory}; -use node::connector::NodeConnector; use shard::connector::ShardConnector; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -74,35 +72,6 @@ impl From<&LogLevel> for log::LevelFilter { } } -/// Entry point for connecting nodes -#[get("/submit")] -async fn node_route( - req: HttpRequest, - stream: web::Payload, - aggregator: web::Data>, - locator: web::Data>, -) -> Result { - let ip = req - .connection_info() - .realip_remote_addr() - .and_then(|mut addr| { - if let Some(port_idx) = addr.find(':') { - addr = &addr[..port_idx]; - } - addr.parse::().ok() - }); - - let mut res = ws::handshake(&req)?; - let aggregator = aggregator.get_ref().clone(); - let locator = locator.get_ref().clone().recipient(); - - Ok(res.streaming(ws::WebsocketContext::with_codec( - NodeConnector::new(aggregator, locator, ip), - stream, - Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit - ))) -} - #[get("/shard_submit/{chain_hash}")] async fn shard_route( req: HttpRequest, @@ -112,7 +81,7 @@ async fn shard_route( path: web::Path>, ) -> Result { let hash_str = path.into_inner(); - let genesis_hash = hash_str.parse()?; + let genesis_hash = hash_str.parse::()?.into(); println!("Genesis hash {}", genesis_hash); @@ -180,7 +149,6 @@ async fn main() -> std::io::Result<()> { .wrap(middleware::NormalizePath::default()) .data(aggregator.clone()) .data(locator.clone()) - .service(node_route) .service(feed_route) .service(shard_route) .service(health) diff --git a/backend/core/src/shard/connector.rs b/backend/core/src/shard/connector.rs index 62414cd..e83912a 100644 --- a/backend/core/src/shard/connector.rs +++ b/backend/core/src/shard/connector.rs @@ -8,8 +8,7 @@ use crate::location::LocateRequest; use actix::prelude::*; use actix_web_actors::ws::{self, CloseReason}; use bincode::Options; -use common::types::NodeId; -use common::util::Hash; +use common::types::{BlockHash, NodeId}; use common::ws::{MultipartHandler, WsMessage}; use common::shard::{ShardMessage, ShardConnId, BackendMessage}; @@ -24,7 +23,7 @@ pub struct ShardConnector { /// Aggregator actor address aggregator: Addr, /// Genesis hash of the chain this connection will be submitting data for - genesis_hash: Hash, + genesis_hash: BlockHash, /// Chain address to which this shard connector is delegating messages chain: Option>, /// Transient mapping of `ShardConnId` to external IP address. @@ -57,7 +56,7 @@ impl ShardConnector { pub fn new( aggregator: Addr, locator: Recipient, - genesis_hash: Hash, + genesis_hash: BlockHash, ) -> Self { Self { hb: Instant::now(), diff --git a/backend/shard/Cargo.toml b/backend/shard/Cargo.toml index b327178..ccfc2bf 100644 --- a/backend/shard/Cargo.toml +++ b/backend/shard/Cargo.toml @@ -17,6 +17,7 @@ log = "0.4" rustc-hash = "1.1.0" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["raw_value"] } +primitive-types = { version = "0.9.0", features = ["serde"] } common = { path = "../common" } simple_logger = "1.11.0" soketto = "0.4.2" diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index 1557c32..031a44d 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -6,8 +6,8 @@ use actix::prelude::*; use actix_http::http::Uri; use bincode::Options; use rustc_hash::FxHashMap; -use common::util::{Hash, DenseMap}; -use common::types::{ConnId, NodeDetails, NodeId}; +use common::util::{DenseMap}; +use common::types::{ConnId, NodeDetails, NodeId, BlockHash}; use common::node::Payload; use common::shard::{ShardConnId, ShardMessage, BackendMessage}; use soketto::handshake::{Client, ServerResponse}; @@ -22,7 +22,7 @@ type WsReceiver = soketto::Receiver>; #[derive(Default)] pub struct Aggregator { url: Uri, - chains: FxHashMap>, + chains: FxHashMap>, } impl Actor for Aggregator { @@ -42,13 +42,13 @@ pub struct Chain { /// Base URL of Backend Core url: Uri, /// Genesis hash of the chain, required to construct the URL to connect to the Backend Core - genesis_hash: Hash, + genesis_hash: BlockHash, /// Dense mapping of SharedConnId -> Addr + multiplexing ConnId sent from the node. nodes: DenseMap<(Addr, ConnId)>, } impl Chain { - pub fn new(url: Uri, genesis_hash: Hash) -> Self { + pub fn new(url: Uri, genesis_hash: BlockHash) -> Self { Chain { url, genesis_hash, @@ -188,7 +188,7 @@ impl Actor for Chain { #[rtype(result = "()")] pub struct AddNode { pub ip: Option, - pub genesis_hash: Hash, + pub genesis_hash: BlockHash, pub node: NodeDetails, pub conn_id: ConnId, pub node_connector: Addr, diff --git a/backend/shard/src/node.rs b/backend/shard/src/node.rs index 9a985a7..f6f3474 100644 --- a/backend/shard/src/node.rs +++ b/backend/shard/src/node.rs @@ -8,6 +8,7 @@ use actix::prelude::*; use actix_web_actors::ws::{self, CloseReason}; use common::node::{NodeMessage, Payload}; use common::types::{ConnId, NodeId}; +use common::json; use common::ws::{MultipartHandler, WsMessage}; use tokio::sync::mpsc::UnboundedSender; @@ -93,9 +94,10 @@ impl NodeConnector { fn handle_message( &mut self, - msg: NodeMessage, + msg: json::NodeMessage, ctx: &mut ::Context, ) { + let msg: NodeMessage = msg.into(); let conn_id = msg.id(); let payload = msg.into();