From 00c6e4f4c59c6109fa6d929aa7269b0001d4451b Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 25 Jun 2021 17:39:22 +0100 Subject: [PATCH] simplify feed sending a little --- .../telemetry/src/aggregator/inner_loop.rs | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 8ebb584..01aef94 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -197,17 +197,12 @@ impl InnerLoop { &loc.city )); - if let Some(bytes) = feed_message_serializer.into_finalized() { - let chain_label = self.node_state - .get_node_chain(node_id) - .map(|chain| chain.label()); + let chain_label = self.node_state + .get_node_chain(node_id) + .map(|chain| chain.label().to_owned()); - if let Some(chain_label) = chain_label { - // Don't hold onto lifetime from self because we call a mut fn next: - let label = chain_label.to_owned(); - // Update location for any feeds subscribed to the node's chain. - self.broadcast_to_chain_feeds(&label, ToFeedWebsocket::Bytes(bytes)).await; - } + if let Some(chain_label) = chain_label { + self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_message_serializer).await; } } } @@ -251,13 +246,7 @@ impl InnerLoop { // Tell chain subscribers about the node we've just added: let mut feed_messages_for_chain = FeedMessageSerializer::new(); feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node)); - - if let Some(bytes) = feed_messages_for_chain.into_finalized() { - self.broadcast_to_chain_feeds( - &new_chain_label, - ToFeedWebsocket::Bytes(bytes) - ).await; - } + self.finalize_and_broadcast_to_chain_feeds(&old_chain_label, feed_messages_for_chain).await; // Tell everybody about the new node count and potential rename: let mut feed_messages_for_all = FeedMessageSerializer::new(); @@ -265,11 +254,7 @@ impl InnerLoop { feed_messages_for_all.push(feed_message::RemovedChain(&old_chain_label)); } feed_messages_for_all.push(feed_message::AddedChain(&new_chain_label, chain_node_count)); - - if let Some(bytes) = feed_messages_for_all.into_finalized() { - let msg = ToFeedWebsocket::Bytes(bytes); - self.broadcast_to_all_feeds(msg).await; - } + self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all).await; // Ask for the grographical location of the node. // Currently we only geographically locate IPV4 addresses so ignore IPV6. @@ -486,14 +471,9 @@ impl InnerLoop { &mut feed_messages_for_all ); } - if let Some(bytes) = feed_messages_for_chain.into_finalized() { - self.broadcast_to_chain_feeds(&chain_label, ToFeedWebsocket::Bytes(bytes)).await; - } - } - - if let Some(bytes) = feed_messages_for_all.into_finalized() { - self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await; + self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_messages_for_chain).await; } + self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all).await; } /// Remove a single node by its ID, pushing any messages we'd want to send @@ -538,6 +518,13 @@ impl InnerLoop { } } + /// Finalize a [`FeedMessageSerializer`] and broadcast the result to feeds for the chain. + async fn finalize_and_broadcast_to_chain_feeds(&mut self, chain: &str, serializer: FeedMessageSerializer) { + if let Some(bytes) = serializer.into_finalized() { + self.broadcast_to_chain_feeds(chain, ToFeedWebsocket::Bytes(bytes)).await; + } + } + /// Send a message to all chain feeds. async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) { if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) { @@ -551,6 +538,13 @@ impl InnerLoop { } } + /// Finalize a [`FeedMessageSerializer`] and broadcast the result to all feeds + async fn finalize_and_broadcast_to_all_feeds(&mut self, serializer: FeedMessageSerializer) { + if let Some(bytes) = serializer.into_finalized() { + self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await; + } + } + /// Send a message to everybody. async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) { for chan in self.feed_channels.values_mut() {