diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs new file mode 100644 index 0000000..ed60d78 --- /dev/null +++ b/backend/common/src/assign_id.rs @@ -0,0 +1,72 @@ +use std::{collections::HashMap, hash::Hash}; +use serde::{Serialize,Deserialize}; + +#[derive(Clone,Copy,Debug,Hash,PartialEq,Eq,Serialize,Deserialize)] +pub struct Id(usize); + +impl std::convert::From for usize { + fn from(id: Id) -> usize { + id.0 + } +} + +/// A struct that allows you to assign ID to an arbitrary set of +/// details (so long as they are Eq+Hash+Clone), and then access +/// the assigned ID given those details or access the details given +/// the ID. +#[derive(Debug)] +pub struct AssignId
{ + current_id: Id, + from_details: HashMap, + from_id: HashMap +} + +impl
AssignId
where Details: Eq + Hash + Clone { + pub fn new() -> Self { + Self { + current_id: Id(0), + from_details: HashMap::new(), + from_id: HashMap::new() + } + } + + pub fn assign_id(&mut self, details: Details) -> Id { + let this_id = self.current_id; + self.current_id.0 += 1; + + self.from_details.insert(details.clone(), this_id); + self.from_id.insert(this_id, details); + + this_id + } + + pub fn get_details(&mut self, id: Id) -> Option<&Details> { + self.from_id.get(&id) + } + + pub fn get_id(&mut self, details: &Details) -> Option { + self.from_details.get(details).map(|id| *id) + } + + pub fn remove_by_id(&mut self, id: Id) -> Option
{ + if let Some(details) = self.from_id.remove(&id) { + self.from_details.remove(&details); + Some(details) + } else { + None + } + } + + pub fn remove_by_details(&mut self, details: &Details) -> Option { + if let Some(id) = self.from_details.remove(&details) { + self.from_id.remove(&id); + Some(id) + } else { + None + } + } + + pub fn clear(&mut self) { + *self = AssignId::new() + } +} \ No newline at end of file diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index 593ea9b..1aebab0 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -2,14 +2,15 @@ use std::net::IpAddr; use crate::node::Payload; use crate::types::{NodeDetails}; +use crate::assign_id::Id; use serde::{Deserialize, Serialize}; /// The shard-local ID of a given node, where a single connection /// might send data on behalf of more than one chain. -pub type LocalId = u64; +pub type LocalId = Id; /// A global ID assigned to messages from each different pair of ConnId+LocalId. -pub type GlobalId = u64; +pub type GlobalId = Id; /// Message sent from the shard to the backend core #[derive(Deserialize, Serialize, Debug, Clone)] diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 78c887a..ec5b967 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -3,4 +3,5 @@ pub mod internal_messages; pub mod types; pub mod util; pub mod json; -pub mod log_level; \ No newline at end of file +pub mod log_level; +pub mod assign_id; \ No newline at end of file diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index a2461a9..5c80a87 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -1,9 +1,9 @@ -use common::{internal_messages::{self, LocalId}, node}; +use common::{internal_messages::{self, LocalId}, node, assign_id::AssignId}; use std::sync::Arc; use std::sync::atomic::AtomicU64; use futures::{channel::mpsc, future}; use futures::{ Sink, SinkExt, StreamExt }; -use std::collections::{ HashMap, HashSet }; +use std::collections::{ HashSet }; use crate::connection::{ create_ws_connection, Message }; /// A unique Id is assigned per websocket connection (or more accurately, @@ -100,8 +100,6 @@ impl Aggregator { async fn handle_messages(mut rx_from_external: mpsc::Receiver, mut tx_to_telemetry_core: mpsc::Sender) { use internal_messages::{ FromShardAggregator, FromTelemetryCore }; - let mut next_local_id: LocalId = 1; - // Just as an optimisation, we can keep track of whether we're connected to the backend // or not, and ignore incoming messages while we aren't. let mut connected_to_telemetry_core = false; @@ -112,8 +110,7 @@ impl Aggregator { // Maintain mappings from the connection ID and node message ID to the "local ID" which we // broadcast to the telemetry core. - let mut to_local_id: HashMap<(ConnId, node::NodeMessageId), LocalId> = HashMap::new(); - let mut from_local_id: HashMap = HashMap::new(); + let mut to_local_id = AssignId::new(); // Any messages coming from nodes that have been muted are ignored: let mut muted: HashSet = HashSet::new(); @@ -132,9 +129,9 @@ impl Aggregator { // We've told everything to disconnect. Now, reset our state: close_connections = vec![]; - to_local_id = HashMap::new(); - from_local_id = HashMap::new(); - muted = HashSet::new(); + to_local_id.clear(); + muted.clear(); + connected_to_telemetry_core = true; log::info!("Connected to telemetry core"); }, @@ -151,12 +148,7 @@ impl Aggregator { if !connected_to_telemetry_core { continue } // Generate a new "local ID" for messages from this connection: - let local_id = next_local_id; - next_local_id += 1; - - // Store mapping to/from local_id to conn/message ID paid: - to_local_id.insert((conn_id, message_id), local_id); - from_local_id.insert(local_id, (conn_id, message_id)); + let local_id = to_local_id.assign_id((conn_id, message_id)); // Send the message to the telemetry core with this local ID: let _ = tx_to_telemetry_core.send(FromShardAggregator::AddNode { @@ -170,8 +162,8 @@ impl Aggregator { if !connected_to_telemetry_core { continue } // Get the local ID, ignoring the message if none match: - let local_id = match to_local_id.get(&(conn_id, message_id)) { - Some(id) => *id, + let local_id = match to_local_id.get_id(&(conn_id, message_id)) { + Some(id) => id, None => continue }; diff --git a/backend/telemetry/src/aggregator.rs b/backend/telemetry/src/aggregator.rs index ad86fc0..7f7c22c 100644 --- a/backend/telemetry/src/aggregator.rs +++ b/backend/telemetry/src/aggregator.rs @@ -1,4 +1,4 @@ -use common::{internal_messages::{self, LocalId}, node}; +use common::{internal_messages::{GlobalId, LocalId}, node, assign_id::AssignId}; use std::{str::FromStr, sync::Arc}; use std::sync::atomic::AtomicU64; use futures::channel::{ mpsc, oneshot }; @@ -6,6 +6,7 @@ use futures::{ Sink, SinkExt, StreamExt }; use tokio::net::TcpStream; use tokio_util::compat::{ TokioAsyncReadCompatExt }; use std::collections::{ HashMap, HashSet }; +use crate::state::State; /// A unique Id is assigned per websocket connection (or more accurately, /// per feed socket and per shard socket). This can be combined with the @@ -137,6 +138,12 @@ impl Aggregator { // any more, this task will gracefully end. async fn handle_messages(mut rx_from_external: mpsc::Receiver, denylist: Vec) { + let mut nodes_state = State::new(); + + // Maintain mappings from the shard connection ID and local ID of messages to a global ID + // that uniquely identifies nodes in our node state. + let mut to_global_id = AssignId::new(); + // Temporary: if we drop channels to shards, they will be booted: let mut to_shards = vec![]; @@ -162,10 +169,13 @@ impl Aggregator { to_shards.push(channel); }, ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node }) => { - + let global_id = to_global_id.assign_id((shard_conn_id, local_id)); }, ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => { - + let global_id = match to_global_id.get_id(&(shard_conn_id, local_id)) { + Some(id) => id, + None => continue + }; }, } } diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index 23fc200..de8f0f5 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -1,6 +1,5 @@ mod aggregator; -mod feed_message; -mod node; +mod state; use std::net::SocketAddr; use std::str::FromStr; diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs new file mode 100644 index 0000000..a44d50a --- /dev/null +++ b/backend/telemetry/src/state/chain.rs @@ -0,0 +1,3 @@ +pub struct Chain { + +} \ No newline at end of file diff --git a/backend/telemetry/src/feed_message.rs b/backend/telemetry/src/state/feed_message.rs similarity index 99% rename from backend/telemetry/src/feed_message.rs rename to backend/telemetry/src/state/feed_message.rs index e0ea5d6..9a02944 100644 --- a/backend/telemetry/src/feed_message.rs +++ b/backend/telemetry/src/state/feed_message.rs @@ -5,7 +5,7 @@ use serde::ser::{SerializeTuple, Serializer}; use serde::Serialize; use std::mem; -use crate::node::Node; +use super::node::Node; use serde_json::to_writer; use common::types::{ Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats, diff --git a/backend/telemetry/src/state/mod.rs b/backend/telemetry/src/state/mod.rs new file mode 100644 index 0000000..d4c16a9 --- /dev/null +++ b/backend/telemetry/src/state/mod.rs @@ -0,0 +1,7 @@ +mod node; +mod chain; +mod feed_message; + +mod state; + +pub use state::State; \ No newline at end of file diff --git a/backend/telemetry/src/node.rs b/backend/telemetry/src/state/node.rs similarity index 100% rename from backend/telemetry/src/node.rs rename to backend/telemetry/src/state/node.rs diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry/src/state/state.rs new file mode 100644 index 0000000..2b7060e --- /dev/null +++ b/backend/telemetry/src/state/state.rs @@ -0,0 +1,14 @@ +use super::chain::Chain; +use std::collections::HashMap; + +pub struct State { + chains: HashMap, Chain> +} + +impl State { + pub fn new() -> State { + State { + chains: HashMap::new() + } + } +} \ No newline at end of file