From 6221cbfd17d09697ea9b09dff4060a02e6a05f35 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 4 Aug 2021 17:05:00 +0100 Subject: [PATCH] Dev tweaks for testing --- backend/telemetry_core/src/main.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index b342953..1088068 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -19,6 +19,7 @@ mod feed_message; mod find_location; mod state; use std::str::FromStr; +use std::sync::atomic::AtomicUsize; use tokio::time::{Duration, Instant}; use aggregator::{ @@ -367,19 +368,21 @@ where // Send messages to the feed: let send_handle = tokio::spawn(async move { 'outer: loop { - let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75)); - let msgs = tokio::select! { msgs = rx_from_aggregator_chunks.next() => msgs, _ = &mut send_closer_rx => { break } }; - // End the loop when connection from aggregator ends: let msgs = match msgs { Some(msgs) => msgs, None => break, }; +let total_val = unsafe { total.load(std::sync::atomic::Ordering::Relaxed) }; +if msgs.len() > total_val { + unsafe { total.compare_exchange(total_val, msgs.len(), std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed); }; + println!("Max msgs: {}", msgs.len()); +} // There is only one message type at the mo; bytes to send // to the websocket. collect them all up to dispatch in one shot. let all_msg_bytes = msgs.into_iter().map(|msg| match msg { @@ -417,8 +420,6 @@ where } Ok(_) => {} } - - debounce.await; } drop(recv_closer_tx); // Kill the recv task if this send task ends @@ -433,3 +434,5 @@ where // loop ended; give socket back to parent: (tx_to_aggregator, ws_send) } + +static mut total: std::sync::atomic::AtomicUsize = AtomicUsize::new(0);