simplify feed sending a little

This commit is contained in:
James Wilson
2021-06-25 17:39:22 +01:00
parent 89dfad5bbe
commit 00c6e4f4c5
+23 -29
View File
@@ -197,17 +197,12 @@ impl InnerLoop {
&loc.city
));
if let Some(bytes) = feed_message_serializer.into_finalized() {
let chain_label = self.node_state
.get_node_chain(node_id)
.map(|chain| chain.label());
let chain_label = self.node_state
.get_node_chain(node_id)
.map(|chain| chain.label().to_owned());
if let Some(chain_label) = chain_label {
// Don't hold onto lifetime from self because we call a mut fn next:
let label = chain_label.to_owned();
// Update location for any feeds subscribed to the node's chain.
self.broadcast_to_chain_feeds(&label, ToFeedWebsocket::Bytes(bytes)).await;
}
if let Some(chain_label) = chain_label {
self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_message_serializer).await;
}
}
}
@@ -251,13 +246,7 @@ impl InnerLoop {
// Tell chain subscribers about the node we've just added:
let mut feed_messages_for_chain = FeedMessageSerializer::new();
feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node));
if let Some(bytes) = feed_messages_for_chain.into_finalized() {
self.broadcast_to_chain_feeds(
&new_chain_label,
ToFeedWebsocket::Bytes(bytes)
).await;
}
self.finalize_and_broadcast_to_chain_feeds(&old_chain_label, feed_messages_for_chain).await;
// Tell everybody about the new node count and potential rename:
let mut feed_messages_for_all = FeedMessageSerializer::new();
@@ -265,11 +254,7 @@ impl InnerLoop {
feed_messages_for_all.push(feed_message::RemovedChain(&old_chain_label));
}
feed_messages_for_all.push(feed_message::AddedChain(&new_chain_label, chain_node_count));
if let Some(bytes) = feed_messages_for_all.into_finalized() {
let msg = ToFeedWebsocket::Bytes(bytes);
self.broadcast_to_all_feeds(msg).await;
}
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all).await;
// Ask for the grographical location of the node.
// Currently we only geographically locate IPV4 addresses so ignore IPV6.
@@ -486,14 +471,9 @@ impl InnerLoop {
&mut feed_messages_for_all
);
}
if let Some(bytes) = feed_messages_for_chain.into_finalized() {
self.broadcast_to_chain_feeds(&chain_label, ToFeedWebsocket::Bytes(bytes)).await;
}
}
if let Some(bytes) = feed_messages_for_all.into_finalized() {
self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await;
self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_messages_for_chain).await;
}
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all).await;
}
/// Remove a single node by its ID, pushing any messages we'd want to send
@@ -538,6 +518,13 @@ impl InnerLoop {
}
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to feeds for the chain.
async fn finalize_and_broadcast_to_chain_feeds(&mut self, chain: &str, serializer: FeedMessageSerializer) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_chain_feeds(chain, ToFeedWebsocket::Bytes(bytes)).await;
}
}
/// Send a message to all chain feeds.
async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) {
@@ -551,6 +538,13 @@ impl InnerLoop {
}
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to all feeds
async fn finalize_and_broadcast_to_all_feeds(&mut self, serializer: FeedMessageSerializer) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await;
}
}
/// Send a message to everybody.
async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) {
for chan in self.feed_channels.values_mut() {