Improve comment and bias select loop

This commit is contained in:
James Wilson
2021-07-21 17:48:18 +01:00
parent cedc2f9dbe
commit 649fb966d2
+39 -27
View File
@@ -240,9 +240,19 @@ where
// Loop, handling new messages from the shard or from the aggregator:
loop {
// Without any special handling, if messages come in every ~10ms to each feed, the select! loop
// has to wake up 100 times a second to poll things. If we have 1000 feeds, that's 100,000 waksups
// per second. Even without any work in the loop, that uses a lot of CPU.
// Without any special handling, if messages come in every ~2.5ms to each feed, the select! loop
// has to wake up 400 times a second to poll things. If we have 1000 feeds, that's 400,000 wakeups
// per second. Even without any work in the loop, that uses a bunch of CPU. As an example, try
// replacing the loop with this:
//
// ```
// let s = tokio::time::sleep(tokio::time::Duration::from_micros(2500));
// tokio::select! {
// _ = s => {},
// _ = websocket.next() => {}
// }
// continue;
// ```
//
// To combat this, we add a small wait to reduce how often the select loop will be woken up under high load. We
// buffer messages to feeds so that we do as much work as possible during each wakeup, and if the
@@ -253,31 +263,10 @@ where
// Increasing the wait to 100ms or more doesn't seem to have much more of a positive impact anyway.
let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75));
tokio::select! {
// AGGREGATOR -> FRONTEND (buffer messages to the UI)
msgs = rx_from_aggregator_chunks.next() => {
// End the loop when connection from aggregator ends:
let msgs = match msgs {
Some(msgs) => msgs,
None => break
};
tokio::select! {biased;
// There is only one message type at the mo; bytes to send
// to the websocket. collect them all up to dispatch in one shot.
let all_ws_msgs = msgs.into_iter().map(|msg| {
let bytes = match msg {
ToFeedWebsocket::Bytes(bytes) => bytes
};
Ok(ws::Message::binary(&*bytes))
});
if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await {
log::warn!("Closing feed websocket due to error: {}", e);
break;
}
}
// FRONTEND -> AGGREGATOR (relay messages to the aggregator)
// FRONTEND -> AGGREGATOR (relay messages to the aggregator). Biased, so messages
// from the UI will have priority (especially important with our debounce delay).
msg = websocket.next() => {
// End the loop when connection from feed ends:
let msg = match msg {
@@ -318,6 +307,29 @@ where
break;
}
}
// AGGREGATOR -> FRONTEND (buffer messages to the UI)
msgs = rx_from_aggregator_chunks.next() => {
// End the loop when connection from aggregator ends:
let msgs = match msgs {
Some(msgs) => msgs,
None => break
};
// There is only one message type at the mo; bytes to send
// to the websocket. collect them all up to dispatch in one shot.
let all_ws_msgs = msgs.into_iter().map(|msg| {
let bytes = match msg {
ToFeedWebsocket::Bytes(bytes) => bytes
};
Ok(ws::Message::binary(&*bytes))
});
if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await {
log::warn!("Closing feed websocket due to error: {}", e);
break;
}
}
}
debounce.await;