From 3319709f7be7f5d20748cb0e755a9b5a70f687d9 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 14:52:28 +0100 Subject: [PATCH] Add periodic interval to core loop and print debug info --- .../src/aggregator/inner_loop.rs | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 9ae511e..4a1e314 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -37,6 +37,10 @@ pub enum ToAggregator { FromShardWebsocket(ConnId, FromShardWebsocket), FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), + /// This message is sent periodically and allows us to monitor + /// or cleanup things in our inner loop. The channel provided + /// is notified when the interval has been handled. + Interval(flume::Sender<()>), } /// An incoming shard connection can send these messages to the aggregator. @@ -193,11 +197,31 @@ impl InnerLoop { } ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) + }, + ToAggregator::Interval(tx) => { + self.handle_interval(tx) } } } }); + // Periodically send interval messages for cleanup/monitoring. At most 1 + // every 60 seconds, but if the message queue is backed up it may take longer. + tokio::spawn({ + let metered_tx = metered_tx.clone(); + async move { + loop { + let now = tokio::time::Instant::now(); + let (tx, rx) = flume::unbounded(); + + let _ = metered_tx.send_async(ToAggregator::Interval(tx)).await; + let _ = rx.recv_async().await; + + tokio::time::sleep_until(now + tokio::time::Duration::from_secs(60)).await; + } + } + }); + // TEMP: let's monitor message queue len out of interest let tx_len = metered_tx.clone(); std::thread::spawn(move || { @@ -233,6 +257,39 @@ impl InnerLoop { } } + /// The periodic interval message gives us a chance to do some tidy up or monitoring. + fn handle_interval(&mut self, rx: flume::Sender<()>) { + + let node_ids = self.node_ids.len(); + let node_count: usize = self.node_state + .iter_chains() + .map(|c| c.node_count()) + .sum(); + + let shard_cound = self.shard_channels.len(); + let connected_feeds = self.feed_channels.len(); + let finality_feeds = self.feed_conn_id_finality.len(); + let feed_to_chain = self.feed_conn_id_to_chain.len(); + + let num_subscribed_chains = self.chain_to_feed_conn_ids.len(); + let num_subscribed_chain_feeds: usize = self.chain_to_feed_conn_ids + .values() + .map(|c| c.len()) + .sum(); + + println!("Periodic update at {:?}:", std::time::SystemTime::now()); + dbg!(node_ids); + dbg!(node_count); + dbg!(shard_cound); + dbg!(connected_feeds); + dbg!(finality_feeds); + dbg!(feed_to_chain); + dbg!(num_subscribed_chains); + dbg!(num_subscribed_chain_feeds); + + drop(rx); + } + /// Handle messages that come from the node geographical locator. fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { self.node_state