diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index ddd7a1b..e6d489d 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -1,65 +1,52 @@ -use std::{fmt::Display, hash::Hash}; -use serde::{Serialize,Deserialize}; +use std::hash::Hash; use bimap::BiMap; -#[derive(Clone,Copy,Debug,Hash,PartialEq,Eq,Serialize,Deserialize)] -pub struct Id(usize); - -impl std::convert::From for usize { - fn from(id: Id) -> usize { - id.0 - } -} -impl std::convert::From for Id { - fn from(n: usize) -> Id { - Id(n) - } -} -impl Display for Id { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - /// A struct that allows you to assign an ID to an arbitrary set of /// details (so long as they are Eq+Hash+Clone), and then access /// the assigned ID given those details or access the details given /// the ID. #[derive(Debug)] -pub struct AssignId
{ - current_id: Id, - mapping: BiMap +pub struct AssignId { + current_id: usize, + mapping: BiMap, + _id_type: std::marker::PhantomData } -impl
AssignId
where Details: Eq + Hash { +impl AssignId +where + Details: Eq + Hash, + Id: From + Copy, + usize: From +{ pub fn new() -> Self { Self { - current_id: Id(0), - mapping: BiMap::new() + current_id: 0, + mapping: BiMap::new(), + _id_type: std::marker::PhantomData } } pub fn assign_id(&mut self, details: Details) -> Id { let this_id = self.current_id; - self.current_id.0 += 1; + self.current_id += 1; self.mapping.insert(this_id, details); - this_id + this_id.into() } pub fn get_details(&mut self, id: Id) -> Option<&Details> { - self.mapping.get_by_left(&id) + self.mapping.get_by_left(&id.into()) } pub fn get_id(&mut self, details: &Details) -> Option { - self.mapping.get_by_right(details).map(|id| *id) + self.mapping.get_by_right(details).map(|&id| id.into()) } pub fn remove_by_id(&mut self, id: Id) -> Option
{ - self.mapping.remove_by_left(&id).map(|(_,details)| details) + self.mapping.remove_by_left(&id.into()).map(|(_,details)| details) } pub fn remove_by_details(&mut self, details: &Details) -> Option { - self.mapping.remove_by_right(&details).map(|(id,_)| id) + self.mapping.remove_by_right(&details).map(|(id,_)| id.into()) } pub fn clear(&mut self) { @@ -67,6 +54,6 @@ impl
AssignId
where Details: Eq + Hash { } pub fn iter(&self) -> impl Iterator { - self.mapping.iter().map(|(id, details)| (*id, details)) + self.mapping.iter().map(|(&id, details)| (id.into(), details)) } } \ No newline at end of file diff --git a/backend/common/src/id_type.rs b/backend/common/src/id_type.rs new file mode 100644 index 0000000..b573b98 --- /dev/null +++ b/backend/common/src/id_type.rs @@ -0,0 +1,56 @@ +/// Define a type that can be used as an ID, be converted from/to the inner type, +/// and serialized/deserialized transparently into the inner type. +#[macro_export] +macro_rules! id_type { + ($( #[$attrs:meta] )* $vis:vis $ty:ident ( $inner:ident ) $(;)? ) => { + #[derive(Debug,Clone,Copy,PartialEq,Eq,Hash)] + $( #[$attrs] )* + $vis struct $ty($inner); + + impl $ty { + #[allow(dead_code)] + pub fn new(inner: $inner) -> Self { + Self(inner) + } + } + + impl From<$inner> for $ty { + fn from(inner: $inner) -> Self { + Self(inner) + } + } + + impl From<$ty> for $inner { + fn from(ty: $ty) -> Self { + ty.0 + } + } + } +} + +#[cfg(test)] +mod test { + + #[test] + fn create_and_use_new_id_type() { + id_type!{ + Foo(usize) + }; + let _ = Foo::new(123); + let id = Foo::from(123); + let _: usize = id.into(); + + // Check that these don't lead to compile errors: + id_type!{ + Bar(usize); + }; + id_type!{ + pub Wibble(u64) + }; + id_type!{ + /// We can have doc strings, too + pub(crate) Wobble(u16) + }; + } + +} \ No newline at end of file diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index a9c1d7d..2325c3b 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -2,12 +2,16 @@ use std::net::IpAddr; use crate::node::Payload; use crate::types::{NodeDetails, BlockHash}; -use crate::assign_id::Id; +use crate::id_type; use serde::{Deserialize, Serialize}; -/// The shard-local ID of a given node, where a single connection -/// might send data on behalf of more than one chain. -pub type LocalId = Id; +id_type! { + /// The shard-local ID of a given node, where a single connection + /// might send data on behalf of more than one chain. + #[derive(serde::Serialize, serde::Deserialize)] + pub ShardNodeId(usize); +} + /// Message sent from the shard to the backend core #[derive(Deserialize, Serialize, Debug, Clone)] @@ -16,17 +20,17 @@ pub enum FromShardAggregator { AddNode { ip: Option, node: NodeDetails, - local_id: LocalId, + local_id: ShardNodeId, genesis_hash: BlockHash }, /// Send a message payload to update details for a node UpdateNode { - local_id: LocalId, + local_id: ShardNodeId, payload: Payload, }, /// Inform the core that a node has been removed RemoveNode { - local_id: LocalId + local_id: ShardNodeId } } @@ -34,7 +38,7 @@ pub enum FromShardAggregator { #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromTelemetryCore { Mute { - local_id: LocalId, + local_id: ShardNodeId, reason: MuteReason } } diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index f0c068c..2b1fb40 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -5,4 +5,5 @@ pub mod util; pub mod json; pub mod log_level; pub mod assign_id; -pub mod most_seen; \ No newline at end of file +pub mod most_seen; +pub mod id_type; \ No newline at end of file diff --git a/backend/common/src/types.rs b/backend/common/src/types.rs index bdd25ae..fb4b07d 100644 --- a/backend/common/src/types.rs +++ b/backend/common/src/types.rs @@ -4,10 +4,8 @@ use serde::{Deserialize, Serialize}; use crate::util::{now, MeanList}; use crate::json; -pub type NodeId = usize; pub type BlockNumber = u64; pub type Timestamp = u64; -pub type Address = Box; pub use primitive_types::H256 as BlockHash; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -106,21 +104,6 @@ pub struct NodeLocation { pub city: Box, } -// impl Serialize for NodeDetails { -// fn serialize(&self, serializer: S) -> Result -// where -// S: Serializer, -// { -// let mut tup = serializer.serialize_tuple(6)?; -// tup.serialize_element(&self.name)?; -// tup.serialize_element(&self.implementation)?; -// tup.serialize_element(&self.version)?; -// tup.serialize_element(&self.validator)?; -// tup.serialize_element(&self.network_id)?; -// tup.end() -// } -// } - impl Serialize for NodeStats { fn serialize(&self, serializer: S) -> Result where diff --git a/backend/common/src/util.rs b/backend/common/src/util.rs index 66dddae..8061edd 100644 --- a/backend/common/src/util.rs +++ b/backend/common/src/util.rs @@ -1,11 +1,9 @@ mod dense_map; mod mean_list; -mod null; mod num_stats; pub use dense_map::DenseMap; pub use mean_list::MeanList; -pub use null::NullAny; pub use num_stats::NumStats; pub fn fnv>(data: D) -> u64 { diff --git a/backend/common/src/util/dense_map.rs b/backend/common/src/util/dense_map.rs index 3d24669..7805c1e 100644 --- a/backend/common/src/util/dense_map.rs +++ b/backend/common/src/util/dense_map.rs @@ -1,17 +1,22 @@ -pub type Id = usize; - -pub struct DenseMap { +pub struct DenseMap { /// List of retired indexes that can be re-used - retired: Vec, + retired: Vec, /// All items items: Vec>, + /// Our ID type + _id_ty: std::marker::PhantomData } -impl DenseMap { +impl DenseMap +where + Id: From + Copy, + usize: From +{ pub fn new() -> Self { DenseMap { retired: Vec::new(), items: Vec::new(), + _id_ty: std::marker::PhantomData } } @@ -25,11 +30,12 @@ impl DenseMap { { match self.retired.pop() { Some(id) => { - self.items[id] = Some(f(id)); - id + let id_out = id.into(); + self.items[id] = Some(f(id_out)); + id_out } None => { - let id = self.items.len(); + let id = self.items.len().into(); self.items.push(Some(f(id))); id } @@ -37,14 +43,17 @@ impl DenseMap { } pub fn get(&self, id: Id) -> Option<&T> { + let id: usize = id.into(); self.items.get(id).and_then(|item| item.as_ref()) } pub fn get_mut(&mut self, id: Id) -> Option<&mut T> { + let id: usize = id.into(); self.items.get_mut(id).and_then(|item| item.as_mut()) } pub fn remove(&mut self, id: Id) -> Option { + let id: usize = id.into(); let old = self.items.get_mut(id).and_then(|item| item.take()); if old.is_some() { @@ -60,14 +69,14 @@ impl DenseMap { self.items .iter() .enumerate() - .filter_map(|(id, item)| Some((id, item.as_ref()?))) + .filter_map(|(id, item)| Some((id.into(), item.as_ref()?))) } pub fn iter_mut(&mut self) -> impl Iterator + '_ { self.items .iter_mut() .enumerate() - .filter_map(|(id, item)| Some((id, item.as_mut()?))) + .filter_map(|(id, item)| Some((id.into(), item.as_mut()?))) } pub fn len(&self) -> usize { diff --git a/backend/common/src/util/null.rs b/backend/common/src/util/null.rs deleted file mode 100644 index baf6e81..0000000 --- a/backend/common/src/util/null.rs +++ /dev/null @@ -1,136 +0,0 @@ -use serde::de::{Deserialize, Deserializer, IgnoredAny}; -use serde::ser::{Serialize, Serializer}; - -/// Alternative to `serde::de::IgnoreAny` that implements `Serialize`. -/// Will serialize to `null` in JSON, or empty data in bincode. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct NullAny; - -impl<'de> Deserialize<'de> for NullAny { - #[inline] - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - // `bincode` is going to throw an error here as it does not support `IgnoredAny`. - // - // When using `bincode` `NullAny` will always serialize to unit (aka no data), so - // this safely becomes a no-op. - let _ = deserializer.deserialize_ignored_any(IgnoredAny); - - Ok(NullAny) - } -} - -impl Serialize for NullAny { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_unit() - } -} - -#[cfg(test)] -mod tests { - use super::NullAny; - use bincode::Options; - use serde::{Deserialize, Serialize}; - - #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] - struct Dummy { - ignore: NullAny, - } - - #[test] - fn deserialize_json_null() { - let dummy: Dummy = serde_json::from_str(r#"{"ignore":null}"#).unwrap(); - - assert_eq!(dummy, Dummy { ignore: NullAny }); - } - - #[test] - fn deserialize_json_struct() { - let dummy: Dummy = serde_json::from_str(r#"{"ignore":{"foo":"bar"}}"#).unwrap(); - - assert_eq!(dummy, Dummy { ignore: NullAny }); - } - - #[test] - fn deserialize_json_struct_invalid() { - let dummy = serde_json::from_str::(r#"{"ignore":{"foo":"bar"}"#); - - assert!(dummy.is_err()); - } - - #[test] - fn deserialize_json_vec_any() { - let raw = [NullAny; 10]; - let json = r#"[null,true,false,10,{},[],[null],{"foo":"bar"},[9,9,9],"ten"]"#; - - let deserialized: Vec = serde_json::from_str(json).unwrap(); - - assert_eq!(&raw[..], &deserialized); - } - - #[test] - fn serialize_json_null() { - let dummy = Dummy { ignore: NullAny }; - - let json = serde_json::to_string(&dummy).unwrap(); - - assert_eq!(json, r#"{"ignore":null}"#); - } - - #[test] - fn bincode_vec() { - let raw = vec![NullAny; 10]; - - let bytes = bincode::options().serialize(&raw).unwrap(); - - assert_eq!(bytes, &[10u8]); - - let deserialized: Vec = bincode::options().deserialize(&bytes).unwrap(); - - assert_eq!(raw, deserialized); - } - - #[test] - fn bincode_tuple() { - let raw = (NullAny, "Hello world".to_string()); - - let bytes = bincode::options().serialize(&raw).unwrap(); - - assert_eq!(bytes, b"\x0BHello world"); // 0B = 11 = length of string - - let deserialized: (NullAny, String) = bincode::options().deserialize(&bytes).unwrap(); - - assert_eq!(raw, deserialized); - } - - #[test] - fn json_vec() { - let raw = vec![NullAny; 10]; - - let json = serde_json::to_string(&raw).unwrap(); - - assert_eq!(json, "[null,null,null,null,null,null,null,null,null,null]"); - - let deserialized: Vec = serde_json::from_str(&json).unwrap(); - - assert_eq!(raw, deserialized); - } - - #[test] - fn json_tuple() { - let raw = (NullAny, "Hello world".to_string()); - - let json = serde_json::to_string(&raw).unwrap(); - - assert_eq!(json, r#"[null,"Hello world"]"#); - - let deserialized: (NullAny, String) = serde_json::from_str(&json).unwrap(); - - assert_eq!(raw, deserialized); - } -} diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index c80df6e..bfeeec4 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -1,4 +1,4 @@ -use common::{internal_messages::{self, LocalId}, node, assign_id::AssignId, types::BlockHash}; +use common::{internal_messages::{self, ShardNodeId}, node, assign_id::AssignId, types::BlockHash}; use std::sync::Arc; use std::sync::atomic::AtomicU64; use futures::{channel::mpsc, future}; @@ -119,7 +119,7 @@ impl Aggregator { let mut to_local_id = AssignId::new(); // Any messages coming from nodes that have been muted are ignored: - let mut muted: HashSet = HashSet::new(); + let mut muted: HashSet = HashSet::new(); // Now, loop and receive messages to handle. while let Some(msg) = rx_from_external.next().await { diff --git a/backend/telemetry/src/aggregator/aggregator.rs b/backend/telemetry/src/aggregator/aggregator.rs index 6bc1963..dcba57c 100644 --- a/backend/telemetry/src/aggregator/aggregator.rs +++ b/backend/telemetry/src/aggregator/aggregator.rs @@ -5,12 +5,15 @@ use futures::{ future, Sink, SinkExt }; use super::inner_loop; use crate::find_location::find_location; use crate::state::NodeId; +use common::id_type; use std::net::Ipv4Addr; -/// A unique Id is assigned per websocket connection (or more accurately, -/// per feed socket and per shard socket). This can be combined with the -/// [`LocalId`] of messages to give us a global ID. -type ConnId = u64; +id_type! { + /// A unique Id is assigned per websocket connection (or more accurately, + /// per feed socket and per shard socket). This can be combined with the + /// [`LocalId`] of messages to give us a global ID. + pub ConnId(u64) +} #[derive(Clone)] pub struct Aggregator(Arc); @@ -64,13 +67,13 @@ impl Aggregator { pub fn subscribe_shard(&self) -> impl Sink + Unpin { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: - let shard_conn_id: ConnId = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let shard_conn_id = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let tx_to_aggregator = self.0.tx_to_aggregator.clone(); // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(inner_loop::ToAggregator::FromShardWebsocket(shard_conn_id, msg)) + Ok(inner_loop::ToAggregator::FromShardWebsocket(shard_conn_id.into(), msg)) })) } @@ -78,13 +81,13 @@ impl Aggregator { pub fn subscribe_feed(&self) -> impl Sink + Unpin { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: - let feed_conn_id: ConnId = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let feed_conn_id = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let tx_to_aggregator = self.0.tx_to_aggregator.clone(); // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(inner_loop::ToAggregator::FromFeedWebsocket(feed_conn_id, msg)) + Ok(inner_loop::ToAggregator::FromFeedWebsocket(feed_conn_id.into(), msg)) })) } diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 2d72acf..8094c24 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -1,7 +1,7 @@ use common::{ internal_messages::{ self, - LocalId, + ShardNodeId, MuteReason }, types::BlockHash, @@ -16,11 +16,7 @@ use std::collections::{ HashMap, HashSet }; use crate::state::{ self, State, NodeId }; use crate::feed_message::{ self, FeedMessageSerializer }; use crate::find_location; - -/// A unique Id is assigned per websocket connection (or more accurately, -/// per feed socket and per shard socket). This can be combined with the -/// [`LocalId`] of messages to give us a global ID. -type ConnId = u64; +use super::aggregator::ConnId; /// Incoming messages come via subscriptions, and end up looking like this. #[derive(Clone,Debug)] @@ -40,19 +36,19 @@ pub enum FromShardWebsocket { }, /// Tell the aggregator about a new node. Add { - local_id: LocalId, + local_id: ShardNodeId, ip: Option, node: common::types::NodeDetails, genesis_hash: common::types::BlockHash }, /// Update/pass through details about a node. Update { - local_id: LocalId, + local_id: ShardNodeId, payload: node::Payload }, /// Tell the aggregator that a node has been removed when it disconnects. Remove { - local_id: LocalId, + local_id: ShardNodeId, }, /// The shard is disconnected. Disconnected @@ -63,7 +59,7 @@ pub enum FromShardWebsocket { pub enum ToShardWebsocket { /// Mute messages to the core by passing the shard-local ID of them. Mute { - local_id: LocalId, + local_id: ShardNodeId, reason: internal_messages::MuteReason } } @@ -129,7 +125,7 @@ pub struct InnerLoop { node_state: State, /// We maintain a mapping between NodeId and ConnId+LocalId, so that we know /// which messages are about which nodes. - node_ids: BiMap, + node_ids: BiMap, /// Keep track of how to send messages out to feeds. feed_channels: HashMap>, @@ -194,7 +190,7 @@ impl InnerLoop { if let Some(loc) = location { let mut feed_message_serializer = FeedMessageSerializer::new(); feed_message_serializer.push(feed_message::LocatedNode( - node_id, + node_id.get_chain_node_id().into(), loc.latitude, loc.longitude, &loc.city @@ -212,7 +208,8 @@ impl InnerLoop { /// Handle messages coming from shards. async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) { - log::debug!("Message from shard ({}): {:?}", shard_conn_id, msg); + log::debug!("Message from shard ({:?}): {:?}", shard_conn_id, msg); + match msg { FromShardWebsocket::Initialize { channel } => { self.shard_channels.insert(shard_conn_id, channel); @@ -249,7 +246,7 @@ impl InnerLoop { // Tell chain subscribers about the node we've just added: let mut feed_messages_for_chain = FeedMessageSerializer::new(); - feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node)); + feed_messages_for_chain.push(feed_message::AddedNode(node_id.get_chain_node_id().into(), &details.node)); self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_messages_for_chain).await; // Tell everybody about the new node count and potential rename: @@ -272,7 +269,7 @@ impl InnerLoop { let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { Some((node_id, _)) => node_id, None => { - log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id); + log::error!("Cannot find ID for node with shard/connectionId of {:?}/{:?}", shard_conn_id, local_id); return } }; @@ -282,11 +279,22 @@ impl InnerLoop { let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) { Some(id) => *id, None => { - log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id); + log::error!("Cannot find ID for node with shard/connectionId of {:?}/{:?}", shard_conn_id, local_id); return } }; - self.handle_from_shard_update(node_id, payload).await; + + let mut feed_message_serializer = FeedMessageSerializer::new(); + let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer); + + if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { + let genesis_hash = *chain.genesis_hash(); + if broadcast_finality { + self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await; + } else { + self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await; + } + } }, FromShardWebsocket::Disconnected => { // Find all nodes associated with this shard connection ID: @@ -302,24 +310,9 @@ impl InnerLoop { } } - async fn handle_from_shard_update(&mut self, node_id: NodeId, payload: node::Payload) { - let mut feed_message_serializer = FeedMessageSerializer::new(); - - let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer); - - if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { - let genesis_hash = *chain.genesis_hash(); - if broadcast_finality { - self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await; - } else { - self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await; - } - } - } - /// Handle messages coming from feeds. async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) { - log::debug!("Message from feed ({}): {:?}", feed_conn_id, msg); + log::debug!("Message from feed ({:?}): {:?}", feed_conn_id, msg); match msg { FromFeedWebsocket::Initialize { mut channel } => { self.feed_channels.insert(feed_conn_id, channel.clone()); @@ -396,7 +389,9 @@ impl InnerLoop { new_chain.finalized_block().height, new_chain.finalized_block().hash )); - for (idx, (node_id, node)) in new_chain.iter_nodes().enumerate() { + for (idx, (chain_node_id, node)) in new_chain.iter_nodes().enumerate() { + let chain_node_id = chain_node_id.into(); + // Send subscription confirmation and chain head before doing all the nodes, // and continue sending batches of 32 nodes a time over the wire subsequently if idx % 32 == 0 { @@ -404,14 +399,14 @@ impl InnerLoop { let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; } } - feed_serializer.push(feed_message::AddedNode(node_id, node)); + feed_serializer.push(feed_message::AddedNode(chain_node_id, node)); feed_serializer.push(feed_message::FinalizedBlock( - node_id, + chain_node_id, node.finalized().height, node.finalized().hash, )); if node.stale() { - feed_serializer.push(feed_message::StaleNode(node_id)); + feed_serializer.push(feed_message::StaleNode(chain_node_id)); } } if let Some(bytes) = feed_serializer.into_finalized() { @@ -480,9 +475,9 @@ impl InnerLoop { self.node_ids.remove_by_left(&node_id); let removed_details = match self.node_state.remove_node(node_id) { - Ok(remove_details) => remove_details, - Err(err) => { - log::error!("Error removing node {}: {}", node_id, err); + Some(remove_details) => remove_details, + None => { + log::error!("Could not find node {:?}", node_id); return } }; @@ -504,7 +499,7 @@ impl InnerLoop { // Assuming the chain hasn't gone away, tell chain subscribers about the node removal if removed_details.chain_node_count != 0 { feed_for_chain.push( - feed_message::RemovedNode(node_id) + feed_message::RemovedNode(node_id.get_chain_node_id().into()) ); } } diff --git a/backend/telemetry/src/feed_message.rs b/backend/telemetry/src/feed_message.rs index 80d422e..40c34c0 100644 --- a/backend/telemetry/src/feed_message.rs +++ b/backend/telemetry/src/feed_message.rs @@ -7,10 +7,13 @@ use std::mem; use crate::state::Node; use serde_json::to_writer; use common::types::{ - Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats, + BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeStats, Timestamp }; +type Address = Box; +type FeedNodeId = usize; + pub trait FeedMessage { const ACTION: u8; } @@ -133,28 +136,28 @@ pub struct BestBlock(pub BlockNumber, pub Timestamp, pub Option); #[derive(Serialize)] pub struct BestFinalized(pub BlockNumber, pub BlockHash); -pub struct AddedNode<'a>(pub NodeId, pub &'a Node); +pub struct AddedNode<'a>(pub FeedNodeId, pub &'a Node); #[derive(Serialize)] -pub struct RemovedNode(pub NodeId); +pub struct RemovedNode(pub FeedNodeId); #[derive(Serialize)] -pub struct LocatedNode<'a>(pub NodeId, pub f32, pub f32, pub &'a str); +pub struct LocatedNode<'a>(pub FeedNodeId, pub f32, pub f32, pub &'a str); #[derive(Serialize)] -pub struct ImportedBlock<'a>(pub NodeId, pub &'a BlockDetails); +pub struct ImportedBlock<'a>(pub FeedNodeId, pub &'a BlockDetails); #[derive(Serialize)] -pub struct FinalizedBlock(pub NodeId, pub BlockNumber, pub BlockHash); +pub struct FinalizedBlock(pub FeedNodeId, pub BlockNumber, pub BlockHash); #[derive(Serialize)] -pub struct NodeStatsUpdate<'a>(pub NodeId, pub &'a NodeStats); +pub struct NodeStatsUpdate<'a>(pub FeedNodeId, pub &'a NodeStats); #[derive(Serialize)] -pub struct NodeIOUpdate<'a>(pub NodeId, pub &'a NodeIO); +pub struct NodeIOUpdate<'a>(pub FeedNodeId, pub &'a NodeIO); #[derive(Serialize)] -pub struct Hardware<'a>(pub NodeId, pub &'a NodeHardware); +pub struct Hardware<'a>(pub FeedNodeId, pub &'a NodeHardware); #[derive(Serialize)] pub struct TimeSync(pub u64); @@ -203,7 +206,7 @@ pub struct AfgAuthoritySet( ); #[derive(Serialize)] -pub struct StaleNode(pub NodeId); +pub struct StaleNode(pub FeedNodeId); impl FeedMessageWrite for AddedNode<'_> { fn write_to_feed(&self, ser: &mut FeedMessageSerializer) { diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index 8bdd8b9..be3b77e 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -5,11 +5,17 @@ use common::util::{now, DenseMap, NumStats}; use common::most_seen::MostSeen; use common::node::Payload; use once_cell::sync::Lazy; +use common::id_type; use crate::feed_message::{self, FeedMessageSerializer}; +use crate::find_location; use super::node::Node; -use super::NodeId; + +id_type!{ + /// A Node ID that is unique to the chain it's in. + pub ChainNodeId(usize) +} pub type Label = Box; @@ -20,7 +26,7 @@ pub struct Chain { /// the most commonly used label as nodes are added/removed. labels: MostSeen