mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-25 18:48:00 +00:00
wrap assigning local/global IDs into struct to avoid things getting out of sync
This commit is contained in:
@@ -0,0 +1,72 @@
|
||||
use std::{collections::HashMap, hash::Hash};
|
||||
use serde::{Serialize,Deserialize};
|
||||
|
||||
#[derive(Clone,Copy,Debug,Hash,PartialEq,Eq,Serialize,Deserialize)]
|
||||
pub struct Id(usize);
|
||||
|
||||
impl std::convert::From<Id> for usize {
|
||||
fn from(id: Id) -> usize {
|
||||
id.0
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct that allows you to assign ID to an arbitrary set of
|
||||
/// details (so long as they are Eq+Hash+Clone), and then access
|
||||
/// the assigned ID given those details or access the details given
|
||||
/// the ID.
|
||||
#[derive(Debug)]
|
||||
pub struct AssignId<Details> {
|
||||
current_id: Id,
|
||||
from_details: HashMap<Details, Id>,
|
||||
from_id: HashMap<Id, Details>
|
||||
}
|
||||
|
||||
impl <Details> AssignId<Details> where Details: Eq + Hash + Clone {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
current_id: Id(0),
|
||||
from_details: HashMap::new(),
|
||||
from_id: HashMap::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn assign_id(&mut self, details: Details) -> Id {
|
||||
let this_id = self.current_id;
|
||||
self.current_id.0 += 1;
|
||||
|
||||
self.from_details.insert(details.clone(), this_id);
|
||||
self.from_id.insert(this_id, details);
|
||||
|
||||
this_id
|
||||
}
|
||||
|
||||
pub fn get_details(&mut self, id: Id) -> Option<&Details> {
|
||||
self.from_id.get(&id)
|
||||
}
|
||||
|
||||
pub fn get_id(&mut self, details: &Details) -> Option<Id> {
|
||||
self.from_details.get(details).map(|id| *id)
|
||||
}
|
||||
|
||||
pub fn remove_by_id(&mut self, id: Id) -> Option<Details> {
|
||||
if let Some(details) = self.from_id.remove(&id) {
|
||||
self.from_details.remove(&details);
|
||||
Some(details)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_by_details(&mut self, details: &Details) -> Option<Id> {
|
||||
if let Some(id) = self.from_details.remove(&details) {
|
||||
self.from_id.remove(&id);
|
||||
Some(id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
*self = AssignId::new()
|
||||
}
|
||||
}
|
||||
@@ -2,14 +2,15 @@ use std::net::IpAddr;
|
||||
|
||||
use crate::node::Payload;
|
||||
use crate::types::{NodeDetails};
|
||||
use crate::assign_id::Id;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// The shard-local ID of a given node, where a single connection
|
||||
/// might send data on behalf of more than one chain.
|
||||
pub type LocalId = u64;
|
||||
pub type LocalId = Id;
|
||||
|
||||
/// A global ID assigned to messages from each different pair of ConnId+LocalId.
|
||||
pub type GlobalId = u64;
|
||||
pub type GlobalId = Id;
|
||||
|
||||
/// Message sent from the shard to the backend core
|
||||
#[derive(Deserialize, Serialize, Debug, Clone)]
|
||||
|
||||
@@ -3,4 +3,5 @@ pub mod internal_messages;
|
||||
pub mod types;
|
||||
pub mod util;
|
||||
pub mod json;
|
||||
pub mod log_level;
|
||||
pub mod log_level;
|
||||
pub mod assign_id;
|
||||
@@ -1,9 +1,9 @@
|
||||
use common::{internal_messages::{self, LocalId}, node};
|
||||
use common::{internal_messages::{self, LocalId}, node, assign_id::AssignId};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use futures::{channel::mpsc, future};
|
||||
use futures::{ Sink, SinkExt, StreamExt };
|
||||
use std::collections::{ HashMap, HashSet };
|
||||
use std::collections::{ HashSet };
|
||||
use crate::connection::{ create_ws_connection, Message };
|
||||
|
||||
/// A unique Id is assigned per websocket connection (or more accurately,
|
||||
@@ -100,8 +100,6 @@ impl Aggregator {
|
||||
async fn handle_messages(mut rx_from_external: mpsc::Receiver<ToAggregator>, mut tx_to_telemetry_core: mpsc::Sender<FromAggregator>) {
|
||||
use internal_messages::{ FromShardAggregator, FromTelemetryCore };
|
||||
|
||||
let mut next_local_id: LocalId = 1;
|
||||
|
||||
// Just as an optimisation, we can keep track of whether we're connected to the backend
|
||||
// or not, and ignore incoming messages while we aren't.
|
||||
let mut connected_to_telemetry_core = false;
|
||||
@@ -112,8 +110,7 @@ impl Aggregator {
|
||||
|
||||
// Maintain mappings from the connection ID and node message ID to the "local ID" which we
|
||||
// broadcast to the telemetry core.
|
||||
let mut to_local_id: HashMap<(ConnId, node::NodeMessageId), LocalId> = HashMap::new();
|
||||
let mut from_local_id: HashMap<LocalId, (ConnId, node::NodeMessageId)> = HashMap::new();
|
||||
let mut to_local_id = AssignId::new();
|
||||
|
||||
// Any messages coming from nodes that have been muted are ignored:
|
||||
let mut muted: HashSet<LocalId> = HashSet::new();
|
||||
@@ -132,9 +129,9 @@ impl Aggregator {
|
||||
|
||||
// We've told everything to disconnect. Now, reset our state:
|
||||
close_connections = vec![];
|
||||
to_local_id = HashMap::new();
|
||||
from_local_id = HashMap::new();
|
||||
muted = HashSet::new();
|
||||
to_local_id.clear();
|
||||
muted.clear();
|
||||
|
||||
connected_to_telemetry_core = true;
|
||||
log::info!("Connected to telemetry core");
|
||||
},
|
||||
@@ -151,12 +148,7 @@ impl Aggregator {
|
||||
if !connected_to_telemetry_core { continue }
|
||||
|
||||
// Generate a new "local ID" for messages from this connection:
|
||||
let local_id = next_local_id;
|
||||
next_local_id += 1;
|
||||
|
||||
// Store mapping to/from local_id to conn/message ID paid:
|
||||
to_local_id.insert((conn_id, message_id), local_id);
|
||||
from_local_id.insert(local_id, (conn_id, message_id));
|
||||
let local_id = to_local_id.assign_id((conn_id, message_id));
|
||||
|
||||
// Send the message to the telemetry core with this local ID:
|
||||
let _ = tx_to_telemetry_core.send(FromShardAggregator::AddNode {
|
||||
@@ -170,8 +162,8 @@ impl Aggregator {
|
||||
if !connected_to_telemetry_core { continue }
|
||||
|
||||
// Get the local ID, ignoring the message if none match:
|
||||
let local_id = match to_local_id.get(&(conn_id, message_id)) {
|
||||
Some(id) => *id,
|
||||
let local_id = match to_local_id.get_id(&(conn_id, message_id)) {
|
||||
Some(id) => id,
|
||||
None => continue
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use common::{internal_messages::{self, LocalId}, node};
|
||||
use common::{internal_messages::{GlobalId, LocalId}, node, assign_id::AssignId};
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use futures::channel::{ mpsc, oneshot };
|
||||
@@ -6,6 +6,7 @@ use futures::{ Sink, SinkExt, StreamExt };
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::compat::{ TokioAsyncReadCompatExt };
|
||||
use std::collections::{ HashMap, HashSet };
|
||||
use crate::state::State;
|
||||
|
||||
/// A unique Id is assigned per websocket connection (or more accurately,
|
||||
/// per feed socket and per shard socket). This can be combined with the
|
||||
@@ -137,6 +138,12 @@ impl Aggregator {
|
||||
// any more, this task will gracefully end.
|
||||
async fn handle_messages(mut rx_from_external: mpsc::Receiver<ToAggregator>, denylist: Vec<String>) {
|
||||
|
||||
let mut nodes_state = State::new();
|
||||
|
||||
// Maintain mappings from the shard connection ID and local ID of messages to a global ID
|
||||
// that uniquely identifies nodes in our node state.
|
||||
let mut to_global_id = AssignId::new();
|
||||
|
||||
// Temporary: if we drop channels to shards, they will be booted:
|
||||
let mut to_shards = vec![];
|
||||
|
||||
@@ -162,10 +169,13 @@ impl Aggregator {
|
||||
to_shards.push(channel);
|
||||
},
|
||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node }) => {
|
||||
|
||||
let global_id = to_global_id.assign_id((shard_conn_id, local_id));
|
||||
},
|
||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => {
|
||||
|
||||
let global_id = match to_global_id.get_id(&(shard_conn_id, local_id)) {
|
||||
Some(id) => id,
|
||||
None => continue
|
||||
};
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
mod aggregator;
|
||||
mod feed_message;
|
||||
mod node;
|
||||
mod state;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
pub struct Chain {
|
||||
|
||||
}
|
||||
@@ -5,7 +5,7 @@ use serde::ser::{SerializeTuple, Serializer};
|
||||
use serde::Serialize;
|
||||
use std::mem;
|
||||
|
||||
use crate::node::Node;
|
||||
use super::node::Node;
|
||||
use serde_json::to_writer;
|
||||
use common::types::{
|
||||
Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
|
||||
@@ -0,0 +1,7 @@
|
||||
mod node;
|
||||
mod chain;
|
||||
mod feed_message;
|
||||
|
||||
mod state;
|
||||
|
||||
pub use state::State;
|
||||
@@ -0,0 +1,14 @@
|
||||
use super::chain::Chain;
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub struct State {
|
||||
chains: HashMap<Box<str>, Chain>
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn new() -> State {
|
||||
State {
|
||||
chains: HashMap::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user