Dev tweaks for testing

This commit is contained in:
James Wilson
2021-08-04 17:05:00 +01:00
parent 7d00d72baa
commit 6221cbfd17
+8 -5
View File
@@ -19,6 +19,7 @@ mod feed_message;
mod find_location; mod find_location;
mod state; mod state;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use aggregator::{ use aggregator::{
@@ -367,19 +368,21 @@ where
// Send messages to the feed: // Send messages to the feed:
let send_handle = tokio::spawn(async move { let send_handle = tokio::spawn(async move {
'outer: loop { 'outer: loop {
let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75));
let msgs = tokio::select! { let msgs = tokio::select! {
msgs = rx_from_aggregator_chunks.next() => msgs, msgs = rx_from_aggregator_chunks.next() => msgs,
_ = &mut send_closer_rx => { break } _ = &mut send_closer_rx => { break }
}; };
// End the loop when connection from aggregator ends: // End the loop when connection from aggregator ends:
let msgs = match msgs { let msgs = match msgs {
Some(msgs) => msgs, Some(msgs) => msgs,
None => break, None => break,
}; };
let total_val = unsafe { total.load(std::sync::atomic::Ordering::Relaxed) };
if msgs.len() > total_val {
unsafe { total.compare_exchange(total_val, msgs.len(), std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed); };
println!("Max msgs: {}", msgs.len());
}
// There is only one message type at the mo; bytes to send // There is only one message type at the mo; bytes to send
// to the websocket. collect them all up to dispatch in one shot. // to the websocket. collect them all up to dispatch in one shot.
let all_msg_bytes = msgs.into_iter().map(|msg| match msg { let all_msg_bytes = msgs.into_iter().map(|msg| match msg {
@@ -417,8 +420,6 @@ where
} }
Ok(_) => {} Ok(_) => {}
} }
debounce.await;
} }
drop(recv_closer_tx); // Kill the recv task if this send task ends drop(recv_closer_tx); // Kill the recv task if this send task ends
@@ -433,3 +434,5 @@ where
// loop ended; give socket back to parent: // loop ended; give socket back to parent:
(tx_to_aggregator, ws_send) (tx_to_aggregator, ws_send)
} }
static mut total: std::sync::atomic::AtomicUsize = AtomicUsize::new(0);