diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 9cf3f1a..f9225c6 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -33,6 +33,11 @@ impl Connection { /// Get hold of send and receive channels for this connection. /// These channels are cancel-safe. /// + /// This spawns a couple of tasks for pulling/pushing messages onto the + /// connection, and so messages will be pushed onto the receiving channel + /// without any further polling. use [`Connection::into_raw`] if you need + /// more precise control over when messages are pulled from the socket. + /// /// # Panics /// /// This will panic if not called within the context of a tokio runtime. diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 02060f3..251a3a1 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -3,6 +3,7 @@ mod feed_message; mod find_location; mod state; use std::str::FromStr; +use tokio::time::{ Duration, Instant }; use aggregator::{ Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, @@ -304,7 +305,7 @@ where // Send messages to the feed: let send_handle = tokio::spawn(async move { loop { - let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75)); + let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75)); let msgs = tokio::select! { msgs = rx_from_aggregator_chunks.next() => msgs, @@ -325,6 +326,11 @@ where } }); + // We have 10 seconds to send and flush messages. If the client isn't keeping up with our + // messages, the number we obtain from `ReadyChunksAll` will gradually increase and eventually + // we'll hit this deadline and the client will be booted. + let message_send_deadline = Instant::now() + Duration::from_secs(10); + for bytes in all_msg_bytes { if let Err(e) = ws_send.send_binary(&bytes).await { log::warn!("Closing feed websocket due to error sending data: {}", e); @@ -332,7 +338,7 @@ where } } - if let Err(e) = ws_send.flush().await { + if let Err(e) = tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await { log::warn!("Closing feed websocket due to error flushing data: {}", e); break; }