Add periodic interval to core loop and print debug info

This commit is contained in:
James Wilson
2021-08-10 14:52:28 +01:00
parent f72f8c1fd5
commit 3319709f7b
@@ -37,6 +37,10 @@ pub enum ToAggregator {
FromShardWebsocket(ConnId, FromShardWebsocket),
FromFeedWebsocket(ConnId, FromFeedWebsocket),
FromFindLocation(NodeId, find_location::Location),
/// This message is sent periodically and allows us to monitor
/// or cleanup things in our inner loop. The channel provided
/// is notified when the interval has been handled.
Interval(flume::Sender<()>),
}
/// An incoming shard connection can send these messages to the aggregator.
@@ -193,11 +197,31 @@ impl InnerLoop {
}
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location)
},
ToAggregator::Interval(tx) => {
self.handle_interval(tx)
}
}
}
});
// Periodically send interval messages for cleanup/monitoring. At most 1
// every 60 seconds, but if the message queue is backed up it may take longer.
tokio::spawn({
let metered_tx = metered_tx.clone();
async move {
loop {
let now = tokio::time::Instant::now();
let (tx, rx) = flume::unbounded();
let _ = metered_tx.send_async(ToAggregator::Interval(tx)).await;
let _ = rx.recv_async().await;
tokio::time::sleep_until(now + tokio::time::Duration::from_secs(60)).await;
}
}
});
// TEMP: let's monitor message queue len out of interest
let tx_len = metered_tx.clone();
std::thread::spawn(move || {
@@ -233,6 +257,39 @@ impl InnerLoop {
}
}
/// The periodic interval message gives us a chance to do some tidy up or monitoring.
fn handle_interval(&mut self, rx: flume::Sender<()>) {
let node_ids = self.node_ids.len();
let node_count: usize = self.node_state
.iter_chains()
.map(|c| c.node_count())
.sum();
let shard_cound = self.shard_channels.len();
let connected_feeds = self.feed_channels.len();
let finality_feeds = self.feed_conn_id_finality.len();
let feed_to_chain = self.feed_conn_id_to_chain.len();
let num_subscribed_chains = self.chain_to_feed_conn_ids.len();
let num_subscribed_chain_feeds: usize = self.chain_to_feed_conn_ids
.values()
.map(|c| c.len())
.sum();
println!("Periodic update at {:?}:", std::time::SystemTime::now());
dbg!(node_ids);
dbg!(node_count);
dbg!(shard_cound);
dbg!(connected_feeds);
dbg!(finality_feeds);
dbg!(feed_to_chain);
dbg!(num_subscribed_chains);
dbg!(num_subscribed_chain_feeds);
drop(rx);
}
/// Handle messages that come from the node geographical locator.
fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) {
self.node_state