From 999d8ff0bd7213a1a4e8dd960b6501628f59b0e1 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 21 Jul 2021 12:06:22 +0100 Subject: [PATCH] debounce feed polling a little to reduce CPU load when lots of messages are being sent out --- backend/Cargo.lock | 5 +- backend/common/Cargo.toml | 1 + backend/common/src/lib.rs | 1 + backend/common/src/ready_chunks_all.rs | 105 +++++++++++++++++++++++++ backend/telemetry_core/src/main.rs | 41 +++++++--- 5 files changed, 140 insertions(+), 13 deletions(-) create mode 100644 backend/common/src/ready_chunks_all.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 636f2e9..d8e0ca9 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -214,6 +214,7 @@ dependencies = [ "http", "log", "num-traits", + "pin-project-lite", "primitive-types", "rustc-hash", "serde", @@ -1084,9 +1085,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" [[package]] name = "pin-utils" diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml index 635483f..1b38b09 100644 --- a/backend/common/Cargo.toml +++ b/backend/common/Cargo.toml @@ -14,6 +14,7 @@ hex = "0.4.3" http = "0.2.4" log = "0.4" num-traits = "0.2" +pin-project-lite = "0.2.7" primitive-types = { version = "0.9.0", features = ["serde"] } rustc-hash = "1.1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 97d597e..4eaab74 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -4,6 +4,7 @@ pub mod node_message; pub mod node_types; pub mod time; pub mod ws_client; +pub mod ready_chunks_all; mod assign_id; mod dense_map; diff --git a/backend/common/src/ready_chunks_all.rs b/backend/common/src/ready_chunks_all.rs new file mode 100644 index 0000000..f4cc367 --- /dev/null +++ b/backend/common/src/ready_chunks_all.rs @@ -0,0 +1,105 @@ +//! [`futures::StreamExt::ready_chunks()`] internally stores a vec with a certain capacity, and will buffer up +//! up to that many items that are ready from the underlying stream before returning either when we run out of +//! Poll::Ready items, or we hit the capacity. +//! +//! This variation has no fixed capacity, and will buffer everything it can up at each point to return. This is +//! better when the amount of items varies a bunch (and we don't want to allocate a fixed capacity every time), +//! and can help ensure that we process as many items as possible each time (rather than only up to capacity items). +//! +//! Code is adapted from the futures implementation +//! (see [ready_chunks.rs](https://docs.rs/futures-util/0.3.15/src/futures_util/stream/stream/ready_chunks.rs.html)). + +use futures::stream::Fuse; +use futures::StreamExt; +use core::mem; +use core::pin::Pin; +use futures::stream::{FusedStream, Stream}; +use futures::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Buffer up all Ready items in the underlying stream each time + /// we attempt to retrieve items from it, and return a Vec of those + /// items. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct ReadyChunksAll { + #[pin] + stream: Fuse, + items: Vec, + } +} + +impl ReadyChunksAll +where + St: Stream, +{ + pub fn new(stream: St) -> Self { + Self { + stream: stream.fuse(), + items: Vec::new() + } + } +} + +impl Stream for ReadyChunksAll { + type Item = Vec; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + loop { + match this.stream.as_mut().poll_next(cx) { + // Flush all collected data if underlying stream doesn't contain + // more ready values + Poll::Pending => { + return if this.items.is_empty() { + Poll::Pending + } else { + Poll::Ready(Some(mem::replace(this.items, Vec::new()))) + } + } + + // Push the ready item into the buffer + Poll::Ready(Some(item)) => { + this.items.push(item); + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Poll::Ready(None) => { + let last = if this.items.is_empty() { + None + } else { + let full_buf = mem::replace(this.items, Vec::new()); + Some(full_buf) + }; + + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + // Look at the underlying stream's size_hint. If we've + // buffered some items, we'll return at least that Vec, + // giving us a lower bound 1 greater than the underlying. + // The upper bound is, worst case, our vec + each individual + // item in the underlying stream. + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(chunk_len); + let upper = match upper { + Some(x) => x.checked_add(chunk_len), + None => None, + }; + (lower, upper) + } +} + +impl FusedStream for ReadyChunksAll { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.items.is_empty() + } +} diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 15c1b50..74ccb7b 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -11,6 +11,7 @@ use aggregator::{ }; use bincode::Options; use common::internal_messages; +use common::ready_chunks_all::ReadyChunksAll; use futures::{channel::mpsc, SinkExt, StreamExt}; use simple_logger::SimpleLogger; use structopt::StructOpt; @@ -225,7 +226,8 @@ where S: futures::Sink + Unpin, { // unbounded channel so that slow feeds don't block aggregator progress: - let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::unbounded(); + let (tx_to_feed_conn, rx_from_aggregator) = mpsc::unbounded(); + let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { @@ -238,23 +240,38 @@ where // Loop, handling new messages from the shard or from the aggregator: loop { - tokio::select! { + // Without any special handling, if messages come in every ~10ms to each feed, the select! loop + // has to wake up 100 times a second to poll things. If we have 1000 feeds, that's 100,000 waksups + // per second. Even without any work in the loop, that uses a lot of CPU. + // + // To combat this, we add a small wait to reduce how often the select loop can be woken up. We + // buffer messages to feeds so that we do as much work as possible during each wakeup, and if the + // wakeup lasts longer than 75ms we don't wait before polling again. This knocks ~80% CPU usage + // off on my machine running a soak test with 500 feeds, 4 shards and 100 nodes, doesn't seem to impact + // memory usage much, and still ensures that messages are delivered in a timely fashion. + // + // Increasing the wait to 100ms or more doesn't seem to have much more of a positive impact anyway. + let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75)); + tokio::select! { // AGGREGATOR -> FRONTEND (buffer messages to the UI) - msg = rx_from_aggregator.next() => { + msgs = rx_from_aggregator_chunks.next() => { // End the loop when connection from aggregator ends: - let msg = match msg { - Some(msg) => msg, + let msgs = match msgs { + Some(msgs) => msgs, None => break }; - // Send messages to the client (currently the only message is - // pre-serialized bytes that we send as binary): - let bytes = match msg { - ToFeedWebsocket::Bytes(bytes) => bytes - }; + // There is only one message type at the mo; bytes to send + // to the websocket. collect them all up to dispatch in one shot. + let all_ws_msgs = msgs.into_iter().map(|msg| { + let bytes = match msg { + ToFeedWebsocket::Bytes(bytes) => bytes + }; + Ok(ws::Message::binary(&*bytes)) + }); - if let Err(e) = websocket.send(ws::Message::binary(&*bytes)).await { + if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await { log::warn!("Closing feed websocket due to error: {}", e); break; } @@ -302,6 +319,8 @@ where } } } + + debounce.await; } // loop ended; give socket back to parent: