bimap to store global ID mappings: we'll assign them in node state

This commit is contained in:
James Wilson
2021-06-23 10:12:50 +01:00
parent 7dfc582a20
commit 2db2677217
5 changed files with 31 additions and 30 deletions
+8
View File
@@ -44,6 +44,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bimap"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07"
[[package]]
name = "bincode"
version = "1.3.3"
@@ -157,6 +163,7 @@ dependencies = [
name = "common"
version = "0.1.0"
dependencies = [
"bimap",
"bincode",
"bytes",
"fnv",
@@ -1187,6 +1194,7 @@ name = "telemetry"
version = "0.1.0"
dependencies = [
"anyhow",
"bimap",
"bincode",
"common",
"futures",
+1
View File
@@ -5,6 +5,7 @@ authors = ["Parity Technologies Ltd. <admin@parity.io>"]
edition = "2018"
[dependencies]
bimap = "0.6.1"
bytes = "1.0.1"
fnv = "1.0.7"
hex = "0.4.3"
+17 -26
View File
@@ -1,5 +1,6 @@
use std::{collections::HashMap, hash::Hash};
use std::hash::Hash;
use serde::{Serialize,Deserialize};
use bimap::BiMap;
#[derive(Clone,Copy,Debug,Hash,PartialEq,Eq,Serialize,Deserialize)]
pub struct Id(usize);
@@ -9,6 +10,11 @@ impl std::convert::From<Id> for usize {
id.0
}
}
impl std::convert::From<usize> for Id {
fn from(n: usize) -> Id {
Id(n)
}
}
/// A struct that allows you to assign ID to an arbitrary set of
/// details (so long as they are Eq+Hash+Clone), and then access
@@ -17,60 +23,45 @@ impl std::convert::From<Id> for usize {
#[derive(Debug)]
pub struct AssignId<Details> {
current_id: Id,
from_details: HashMap<Details, Id>,
from_id: HashMap<Id, Details>
mapping: BiMap<Id, Details>
}
impl <Details> AssignId<Details> where Details: Eq + Hash + Clone {
impl <Details> AssignId<Details> where Details: Eq + Hash {
pub fn new() -> Self {
Self {
current_id: Id(0),
from_details: HashMap::new(),
from_id: HashMap::new()
mapping: BiMap::new()
}
}
pub fn assign_id(&mut self, details: Details) -> Id {
let this_id = self.current_id;
self.current_id.0 += 1;
self.from_details.insert(details.clone(), this_id);
self.from_id.insert(this_id, details);
self.mapping.insert(this_id, details);
this_id
}
pub fn get_details(&mut self, id: Id) -> Option<&Details> {
self.from_id.get(&id)
self.mapping.get_by_left(&id)
}
pub fn get_id(&mut self, details: &Details) -> Option<Id> {
self.from_details.get(details).map(|id| *id)
self.mapping.get_by_right(details).map(|id| *id)
}
pub fn remove_by_id(&mut self, id: Id) -> Option<Details> {
if let Some(details) = self.from_id.remove(&id) {
self.from_details.remove(&details);
Some(details)
} else {
None
}
self.mapping.remove_by_left(&id).map(|(_,details)| details)
}
pub fn remove_by_details(&mut self, details: &Details) -> Option<Id> {
if let Some(id) = self.from_details.remove(&details) {
self.from_id.remove(&id);
Some(id)
} else {
None
}
self.mapping.remove_by_right(&details).map(|(id,_)| id)
}
pub fn clear(&mut self) {
*self = AssignId::new()
*self = AssignId::new();
}
pub fn iter(&self) -> impl Iterator<Item = (Id, &Details)> {
self.from_id.iter().map(|(id, details)| (*id, details))
self.mapping.iter().map(|(id, details)| (*id, details))
}
}
+1
View File
@@ -6,6 +6,7 @@ edition = "2018"
[dependencies]
anyhow = "1.0.41"
bimap = "0.6.1"
bincode = "1.3.3"
common = { path = "../common" }
futures = "0.3.15"
+4 -4
View File
@@ -1,9 +1,9 @@
use common::{
internal_messages::{GlobalId, LocalId},
node,
assign_id::AssignId,
util::now
};
use bimap::BiMap;
use std::{str::FromStr, sync::Arc};
use std::sync::atomic::AtomicU64;
use futures::channel::{ mpsc, oneshot };
@@ -155,7 +155,7 @@ impl Aggregator {
// Maintain mappings from the shard connection ID and local ID of messages to a global ID
// that uniquely identifies nodes in our node state.
let mut to_global_node_id = AssignId::new();
let mut global_ids: BiMap<GlobalId, (u64, LocalId)> = BiMap::new();
// Keep track of channels to communicate with feeds and shards:
let mut feed_channels = HashMap::new();
@@ -294,13 +294,13 @@ impl Aggregator {
// TODO: node_state.add_node. Every feed should know about node count changes.
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Remove { local_id }) => {
if let Some(id) = to_global_node_id.remove_by_details(&(shard_conn_id, local_id)) {
if let Some(id) = global_ids.remove_by_right(&(shard_conn_id, local_id)) {
// TODO: node_state.remove_node, Every feed should know about node count changes.
}
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => {
// TODO: Fill this all in...
let global_node_id = match to_global_node_id.get_id(&(shard_conn_id, local_id)) {
let global_node_id = match global_ids.get_by_right(&(shard_conn_id, local_id)) {
Some(id) => id,
None => continue
};