Handle removing a node, and a shard disconnecting (bulk remove)

This commit is contained in:
James Wilson
2021-06-25 17:21:24 +01:00
parent 4f60453689
commit 89dfad5bbe
6 changed files with 350 additions and 81 deletions
+134 -18
View File
@@ -238,21 +238,41 @@ impl InnerLoop {
},
state::AddNodeResult::NodeAddedToChain(details) => {
let node_id = details.id;
// Note the ID so that we know what node other messages are referring to:
// Record ID <-> (shardId,localId) for future messages:
self.node_ids.insert(node_id, (shard_conn_id, local_id));
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::AddedNode(node_id, details.node));
let chain_label = details.chain.label().to_owned();
// Don't hold onto details too long because we want &mut self later:
let old_chain_label = details.old_chain_label.to_owned();
let new_chain_label = details.new_chain_label.to_owned();
let chain_node_count = details.chain_node_count;
let has_chain_label_changed = details.has_chain_label_changed;
if let Some(bytes) = feed_serializer.into_finalized() {
// Tell chain subscribers about the node we've just added:
let mut feed_messages_for_chain = FeedMessageSerializer::new();
feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node));
if let Some(bytes) = feed_messages_for_chain.into_finalized() {
self.broadcast_to_chain_feeds(
&chain_label,
&new_chain_label,
ToFeedWebsocket::Bytes(bytes)
).await
).await;
}
// Currently we only geographically locate IPV4 addresses so ignore IPV6;
// Tell everybody about the new node count and potential rename:
let mut feed_messages_for_all = FeedMessageSerializer::new();
if has_chain_label_changed {
feed_messages_for_all.push(feed_message::RemovedChain(&old_chain_label));
}
feed_messages_for_all.push(feed_message::AddedChain(&new_chain_label, chain_node_count));
if let Some(bytes) = feed_messages_for_all.into_finalized() {
let msg = ToFeedWebsocket::Bytes(bytes);
self.broadcast_to_all_feeds(msg).await;
}
// Ask for the grographical location of the node.
// Currently we only geographically locate IPV4 addresses so ignore IPV6.
if let Some(IpAddr::V4(ip_v4)) = ip {
let _ = self.tx_to_locator.send((node_id, ip_v4)).await;
}
@@ -260,9 +280,14 @@ impl InnerLoop {
}
},
FromShardWebsocket::Remove { local_id } => {
if let Some(node_id) = self.node_ids.remove_by_right(&(shard_conn_id, local_id)) {
// TODO: node_state.remove_node, Every feed should know about node count changes.
}
let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) {
Some((node_id, _)) => node_id,
None => {
log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id);
return
}
};
self.remove_nodes_and_broadcast_result(Some(node_id)).await;
},
FromShardWebsocket::Update { local_id, payload } => {
// TODO: Fill this all in...
@@ -309,8 +334,15 @@ impl InnerLoop {
// TODO: node_state.update_node, then handle returned diffs
},
FromShardWebsocket::Disconnected => {
// The shard has disconnected; remove the shard channel, but also
// remove any nodes associated with the shard, firing the relevant feed messages.
// Find all nodes associated with this shard connection ID:
let node_ids_to_remove: Vec<NodeId> = self.node_ids
.iter()
.filter(|(_, &(this_shard_conn_id, _))| shard_conn_id == this_shard_conn_id)
.map(|(&node_id,_)| node_id)
.collect();
// ... and remove them:
self.remove_nodes_and_broadcast_result(node_ids_to_remove).await;
}
}
}
@@ -388,7 +420,7 @@ impl InnerLoop {
chain.finalized_block().height,
chain.finalized_block().hash
));
for (idx, (gid, node)) in chain.nodes().enumerate() {
for (idx, (node_id, node)) in chain.iter_nodes().enumerate() {
// Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
@@ -396,14 +428,14 @@ impl InnerLoop {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
}
feed_serializer.push(feed_message::AddedNode(gid, node));
feed_serializer.push(feed_message::AddedNode(node_id, node));
feed_serializer.push(feed_message::FinalizedBlock(
gid,
node_id,
node.finalized().height,
node.finalized().hash,
));
if node.stale() {
feed_serializer.push(feed_message::StaleNode(gid));
feed_serializer.push(feed_message::StaleNode(node_id));
}
}
if let Some(bytes) = feed_serializer.into_finalized() {
@@ -431,6 +463,81 @@ impl InnerLoop {
}
}
/// Remove all of the node IDs provided and broadcast messages to feeds as needed.
async fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator<Item=NodeId>) {
// Group by chain to simplify the handling of feed messages:
let mut node_ids_per_chain: HashMap<String,Vec<NodeId>> = HashMap::new();
for node_id in node_ids.into_iter() {
if let Some(chain) = self.node_state.get_node_chain(node_id) {
let chain_label = chain.label().to_owned();
node_ids_per_chain.entry(chain_label).or_default().push(node_id);
}
}
// Remove the nodes for each chain
let mut feed_messages_for_all = FeedMessageSerializer::new();
for (chain_label, node_ids) in node_ids_per_chain {
let mut feed_messages_for_chain = FeedMessageSerializer::new();
for node_id in node_ids {
self.remove_node(
node_id,
&mut feed_messages_for_chain,
&mut feed_messages_for_all
);
}
if let Some(bytes) = feed_messages_for_chain.into_finalized() {
self.broadcast_to_chain_feeds(&chain_label, ToFeedWebsocket::Bytes(bytes)).await;
}
}
if let Some(bytes) = feed_messages_for_all.into_finalized() {
self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await;
}
}
/// Remove a single node by its ID, pushing any messages we'd want to send
/// out to feeds onto the provided feed serializers. Doesn't actually send
/// anything to the feeds; just updates state as needed.
fn remove_node(
&mut self,
node_id: NodeId,
feed_for_chain: &mut FeedMessageSerializer,
feed_for_all: &mut FeedMessageSerializer
) {
// Remove our top level association (this may already have been done).
self.node_ids.remove_by_left(&node_id);
let removed_details = match self.node_state.remove_node(node_id) {
Ok(remove_details) => remove_details,
Err(err) => {
log::error!("Error removing node {}: {}", node_id, err);
return
}
};
// The chain has been removed (no nodes left in it, or it was renamed):
if removed_details.chain_node_count == 0 || removed_details.has_chain_label_changed {
feed_for_all.push(feed_message::RemovedChain(
&removed_details.old_chain_label
));
}
// If the chain still exists, tell everybody about the new label or updated node count:
if removed_details.chain_node_count != 0 {
feed_for_all.push(
feed_message::AddedChain(&removed_details.new_chain_label, removed_details.chain_node_count)
);
}
// Assuming the chain hasn't gone away, tell chain subscribers about the node removal
if removed_details.chain_node_count != 0 {
feed_for_chain.push(
feed_message::RemovedNode(node_id)
);
}
}
/// Send a message to all chain feeds.
async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) {
@@ -438,9 +545,18 @@ impl InnerLoop {
// How much faster would it be if we processed these in parallel?
// Is it practical to do so given lifetimes and such?
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
chan.send(message.clone()).await;
let _ = chan.send(message.clone()).await;
}
}
}
}
/// Send a message to everybody.
async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) {
for chan in self.feed_channels.values_mut() {
// How much faster would it be if we processed these in parallel?
// Is it practical to do so given lifetimes and such?
let _ = chan.send(message.clone()).await;
}
}
}