Give things unique ID types, not aliases, to prevent mixups

This commit is contained in:
James Wilson
2021-06-30 16:59:03 +01:00
parent 06bd660599
commit 4308359feb
14 changed files with 308 additions and 394 deletions
+36 -41
View File
@@ -1,7 +1,7 @@
use common::{
internal_messages::{
self,
LocalId,
ShardNodeId,
MuteReason
},
types::BlockHash,
@@ -16,11 +16,7 @@ use std::collections::{ HashMap, HashSet };
use crate::state::{ self, State, NodeId };
use crate::feed_message::{ self, FeedMessageSerializer };
use crate::find_location;
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
/// [`LocalId`] of messages to give us a global ID.
type ConnId = u64;
use super::aggregator::ConnId;
/// Incoming messages come via subscriptions, and end up looking like this.
#[derive(Clone,Debug)]
@@ -40,19 +36,19 @@ pub enum FromShardWebsocket {
},
/// Tell the aggregator about a new node.
Add {
local_id: LocalId,
local_id: ShardNodeId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
genesis_hash: common::types::BlockHash
},
/// Update/pass through details about a node.
Update {
local_id: LocalId,
local_id: ShardNodeId,
payload: node::Payload
},
/// Tell the aggregator that a node has been removed when it disconnects.
Remove {
local_id: LocalId,
local_id: ShardNodeId,
},
/// The shard is disconnected.
Disconnected
@@ -63,7 +59,7 @@ pub enum FromShardWebsocket {
pub enum ToShardWebsocket {
/// Mute messages to the core by passing the shard-local ID of them.
Mute {
local_id: LocalId,
local_id: ShardNodeId,
reason: internal_messages::MuteReason
}
}
@@ -129,7 +125,7 @@ pub struct InnerLoop {
node_state: State,
/// We maintain a mapping between NodeId and ConnId+LocalId, so that we know
/// which messages are about which nodes.
node_ids: BiMap<NodeId, (ConnId, LocalId)>,
node_ids: BiMap<NodeId, (ConnId, ShardNodeId)>,
/// Keep track of how to send messages out to feeds.
feed_channels: HashMap<ConnId, mpsc::UnboundedSender<ToFeedWebsocket>>,
@@ -194,7 +190,7 @@ impl InnerLoop {
if let Some(loc) = location {
let mut feed_message_serializer = FeedMessageSerializer::new();
feed_message_serializer.push(feed_message::LocatedNode(
node_id,
node_id.get_chain_node_id().into(),
loc.latitude,
loc.longitude,
&loc.city
@@ -212,7 +208,8 @@ impl InnerLoop {
/// Handle messages coming from shards.
async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) {
log::debug!("Message from shard ({}): {:?}", shard_conn_id, msg);
log::debug!("Message from shard ({:?}): {:?}", shard_conn_id, msg);
match msg {
FromShardWebsocket::Initialize { channel } => {
self.shard_channels.insert(shard_conn_id, channel);
@@ -249,7 +246,7 @@ impl InnerLoop {
// Tell chain subscribers about the node we've just added:
let mut feed_messages_for_chain = FeedMessageSerializer::new();
feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node));
feed_messages_for_chain.push(feed_message::AddedNode(node_id.get_chain_node_id().into(), &details.node));
self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_messages_for_chain).await;
// Tell everybody about the new node count and potential rename:
@@ -272,7 +269,7 @@ impl InnerLoop {
let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) {
Some((node_id, _)) => node_id,
None => {
log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id);
log::error!("Cannot find ID for node with shard/connectionId of {:?}/{:?}", shard_conn_id, local_id);
return
}
};
@@ -282,11 +279,22 @@ impl InnerLoop {
let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) {
Some(id) => *id,
None => {
log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id);
log::error!("Cannot find ID for node with shard/connectionId of {:?}/{:?}", shard_conn_id, local_id);
return
}
};
self.handle_from_shard_update(node_id, payload).await;
let mut feed_message_serializer = FeedMessageSerializer::new();
let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer);
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
let genesis_hash = *chain.genesis_hash();
if broadcast_finality {
self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await;
} else {
self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await;
}
}
},
FromShardWebsocket::Disconnected => {
// Find all nodes associated with this shard connection ID:
@@ -302,24 +310,9 @@ impl InnerLoop {
}
}
async fn handle_from_shard_update(&mut self, node_id: NodeId, payload: node::Payload) {
let mut feed_message_serializer = FeedMessageSerializer::new();
let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer);
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
let genesis_hash = *chain.genesis_hash();
if broadcast_finality {
self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await;
} else {
self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await;
}
}
}
/// Handle messages coming from feeds.
async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) {
log::debug!("Message from feed ({}): {:?}", feed_conn_id, msg);
log::debug!("Message from feed ({:?}): {:?}", feed_conn_id, msg);
match msg {
FromFeedWebsocket::Initialize { mut channel } => {
self.feed_channels.insert(feed_conn_id, channel.clone());
@@ -396,7 +389,9 @@ impl InnerLoop {
new_chain.finalized_block().height,
new_chain.finalized_block().hash
));
for (idx, (node_id, node)) in new_chain.iter_nodes().enumerate() {
for (idx, (chain_node_id, node)) in new_chain.iter_nodes().enumerate() {
let chain_node_id = chain_node_id.into();
// Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
@@ -404,14 +399,14 @@ impl InnerLoop {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
}
feed_serializer.push(feed_message::AddedNode(node_id, node));
feed_serializer.push(feed_message::AddedNode(chain_node_id, node));
feed_serializer.push(feed_message::FinalizedBlock(
node_id,
chain_node_id,
node.finalized().height,
node.finalized().hash,
));
if node.stale() {
feed_serializer.push(feed_message::StaleNode(node_id));
feed_serializer.push(feed_message::StaleNode(chain_node_id));
}
}
if let Some(bytes) = feed_serializer.into_finalized() {
@@ -480,9 +475,9 @@ impl InnerLoop {
self.node_ids.remove_by_left(&node_id);
let removed_details = match self.node_state.remove_node(node_id) {
Ok(remove_details) => remove_details,
Err(err) => {
log::error!("Error removing node {}: {}", node_id, err);
Some(remove_details) => remove_details,
None => {
log::error!("Could not find node {:?}", node_id);
return
}
};
@@ -504,7 +499,7 @@ impl InnerLoop {
// Assuming the chain hasn't gone away, tell chain subscribers about the node removal
if removed_details.chain_node_count != 0 {
feed_for_chain.push(
feed_message::RemovedNode(node_id)
feed_message::RemovedNode(node_id.get_chain_node_id().into())
);
}
}