diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index fa6f466..6f1d00e 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -240,9 +240,19 @@ where // Loop, handling new messages from the shard or from the aggregator: loop { - // Without any special handling, if messages come in every ~10ms to each feed, the select! loop - // has to wake up 100 times a second to poll things. If we have 1000 feeds, that's 100,000 waksups - // per second. Even without any work in the loop, that uses a lot of CPU. + // Without any special handling, if messages come in every ~2.5ms to each feed, the select! loop + // has to wake up 400 times a second to poll things. If we have 1000 feeds, that's 400,000 wakeups + // per second. Even without any work in the loop, that uses a bunch of CPU. As an example, try + // replacing the loop with this: + // + // ``` + // let s = tokio::time::sleep(tokio::time::Duration::from_micros(2500)); + // tokio::select! { + // _ = s => {}, + // _ = websocket.next() => {} + // } + // continue; + // ``` // // To combat this, we add a small wait to reduce how often the select loop will be woken up under high load. We // buffer messages to feeds so that we do as much work as possible during each wakeup, and if the @@ -253,31 +263,10 @@ where // Increasing the wait to 100ms or more doesn't seem to have much more of a positive impact anyway. let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75)); - tokio::select! { - // AGGREGATOR -> FRONTEND (buffer messages to the UI) - msgs = rx_from_aggregator_chunks.next() => { - // End the loop when connection from aggregator ends: - let msgs = match msgs { - Some(msgs) => msgs, - None => break - }; + tokio::select! {biased; - // There is only one message type at the mo; bytes to send - // to the websocket. collect them all up to dispatch in one shot. - let all_ws_msgs = msgs.into_iter().map(|msg| { - let bytes = match msg { - ToFeedWebsocket::Bytes(bytes) => bytes - }; - Ok(ws::Message::binary(&*bytes)) - }); - - if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await { - log::warn!("Closing feed websocket due to error: {}", e); - break; - } - } - - // FRONTEND -> AGGREGATOR (relay messages to the aggregator) + // FRONTEND -> AGGREGATOR (relay messages to the aggregator). Biased, so messages + // from the UI will have priority (especially important with our debounce delay). msg = websocket.next() => { // End the loop when connection from feed ends: let msg = match msg { @@ -318,6 +307,29 @@ where break; } } + + // AGGREGATOR -> FRONTEND (buffer messages to the UI) + msgs = rx_from_aggregator_chunks.next() => { + // End the loop when connection from aggregator ends: + let msgs = match msgs { + Some(msgs) => msgs, + None => break + }; + + // There is only one message type at the mo; bytes to send + // to the websocket. collect them all up to dispatch in one shot. + let all_ws_msgs = msgs.into_iter().map(|msg| { + let bytes = match msg { + ToFeedWebsocket::Bytes(bytes) => bytes + }; + Ok(ws::Message::binary(&*bytes)) + }); + + if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await { + log::warn!("Closing feed websocket due to error: {}", e); + break; + } + } } debounce.await;