mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-20 02:21:07 +00:00
debounce feed polling a little to reduce CPU load when lots of messages are being sent out
This commit is contained in:
Generated
+3
-2
@@ -214,6 +214,7 @@ dependencies = [
|
|||||||
"http",
|
"http",
|
||||||
"log",
|
"log",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
|
"pin-project-lite",
|
||||||
"primitive-types",
|
"primitive-types",
|
||||||
"rustc-hash",
|
"rustc-hash",
|
||||||
"serde",
|
"serde",
|
||||||
@@ -1084,9 +1085,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.6"
|
version = "0.2.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905"
|
checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-utils"
|
name = "pin-utils"
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ hex = "0.4.3"
|
|||||||
http = "0.2.4"
|
http = "0.2.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
|
pin-project-lite = "0.2.7"
|
||||||
primitive-types = { version = "0.9.0", features = ["serde"] }
|
primitive-types = { version = "0.9.0", features = ["serde"] }
|
||||||
rustc-hash = "1.1.0"
|
rustc-hash = "1.1.0"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ pub mod node_message;
|
|||||||
pub mod node_types;
|
pub mod node_types;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
pub mod ws_client;
|
pub mod ws_client;
|
||||||
|
pub mod ready_chunks_all;
|
||||||
|
|
||||||
mod assign_id;
|
mod assign_id;
|
||||||
mod dense_map;
|
mod dense_map;
|
||||||
|
|||||||
@@ -0,0 +1,105 @@
|
|||||||
|
//! [`futures::StreamExt::ready_chunks()`] internally stores a vec with a certain capacity, and will buffer up
|
||||||
|
//! up to that many items that are ready from the underlying stream before returning either when we run out of
|
||||||
|
//! Poll::Ready items, or we hit the capacity.
|
||||||
|
//!
|
||||||
|
//! This variation has no fixed capacity, and will buffer everything it can up at each point to return. This is
|
||||||
|
//! better when the amount of items varies a bunch (and we don't want to allocate a fixed capacity every time),
|
||||||
|
//! and can help ensure that we process as many items as possible each time (rather than only up to capacity items).
|
||||||
|
//!
|
||||||
|
//! Code is adapted from the futures implementation
|
||||||
|
//! (see [ready_chunks.rs](https://docs.rs/futures-util/0.3.15/src/futures_util/stream/stream/ready_chunks.rs.html)).
|
||||||
|
|
||||||
|
use futures::stream::Fuse;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use core::mem;
|
||||||
|
use core::pin::Pin;
|
||||||
|
use futures::stream::{FusedStream, Stream};
|
||||||
|
use futures::task::{Context, Poll};
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
/// Buffer up all Ready items in the underlying stream each time
|
||||||
|
/// we attempt to retrieve items from it, and return a Vec of those
|
||||||
|
/// items.
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[must_use = "streams do nothing unless polled"]
|
||||||
|
pub struct ReadyChunksAll<St: Stream> {
|
||||||
|
#[pin]
|
||||||
|
stream: Fuse<St>,
|
||||||
|
items: Vec<St::Item>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St: Stream> ReadyChunksAll<St>
|
||||||
|
where
|
||||||
|
St: Stream,
|
||||||
|
{
|
||||||
|
pub fn new(stream: St) -> Self {
|
||||||
|
Self {
|
||||||
|
stream: stream.fuse(),
|
||||||
|
items: Vec::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St: Stream> Stream for ReadyChunksAll<St> {
|
||||||
|
type Item = Vec<St::Item>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let mut this = self.project();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match this.stream.as_mut().poll_next(cx) {
|
||||||
|
// Flush all collected data if underlying stream doesn't contain
|
||||||
|
// more ready values
|
||||||
|
Poll::Pending => {
|
||||||
|
return if this.items.is_empty() {
|
||||||
|
Poll::Pending
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Some(mem::replace(this.items, Vec::new())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push the ready item into the buffer
|
||||||
|
Poll::Ready(Some(item)) => {
|
||||||
|
this.items.push(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the underlying stream ran out of values, return what we
|
||||||
|
// have buffered, if we have anything.
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
let last = if this.items.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
let full_buf = mem::replace(this.items, Vec::new());
|
||||||
|
Some(full_buf)
|
||||||
|
};
|
||||||
|
|
||||||
|
return Poll::Ready(last);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
// Look at the underlying stream's size_hint. If we've
|
||||||
|
// buffered some items, we'll return at least that Vec,
|
||||||
|
// giving us a lower bound 1 greater than the underlying.
|
||||||
|
// The upper bound is, worst case, our vec + each individual
|
||||||
|
// item in the underlying stream.
|
||||||
|
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
|
||||||
|
let (lower, upper) = self.stream.size_hint();
|
||||||
|
let lower = lower.saturating_add(chunk_len);
|
||||||
|
let upper = match upper {
|
||||||
|
Some(x) => x.checked_add(chunk_len),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
(lower, upper)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St: FusedStream> FusedStream for ReadyChunksAll<St> {
|
||||||
|
fn is_terminated(&self) -> bool {
|
||||||
|
self.stream.is_terminated() && self.items.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ use aggregator::{
|
|||||||
};
|
};
|
||||||
use bincode::Options;
|
use bincode::Options;
|
||||||
use common::internal_messages;
|
use common::internal_messages;
|
||||||
|
use common::ready_chunks_all::ReadyChunksAll;
|
||||||
use futures::{channel::mpsc, SinkExt, StreamExt};
|
use futures::{channel::mpsc, SinkExt, StreamExt};
|
||||||
use simple_logger::SimpleLogger;
|
use simple_logger::SimpleLogger;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
@@ -225,7 +226,8 @@ where
|
|||||||
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin,
|
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin,
|
||||||
{
|
{
|
||||||
// unbounded channel so that slow feeds don't block aggregator progress:
|
// unbounded channel so that slow feeds don't block aggregator progress:
|
||||||
let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::unbounded();
|
let (tx_to_feed_conn, rx_from_aggregator) = mpsc::unbounded();
|
||||||
|
let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator);
|
||||||
|
|
||||||
// Tell the aggregator about this new connection, and give it a way to send messages to us:
|
// Tell the aggregator about this new connection, and give it a way to send messages to us:
|
||||||
let init_msg = FromFeedWebsocket::Initialize {
|
let init_msg = FromFeedWebsocket::Initialize {
|
||||||
@@ -238,23 +240,38 @@ where
|
|||||||
|
|
||||||
// Loop, handling new messages from the shard or from the aggregator:
|
// Loop, handling new messages from the shard or from the aggregator:
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
// Without any special handling, if messages come in every ~10ms to each feed, the select! loop
|
||||||
|
// has to wake up 100 times a second to poll things. If we have 1000 feeds, that's 100,000 waksups
|
||||||
|
// per second. Even without any work in the loop, that uses a lot of CPU.
|
||||||
|
//
|
||||||
|
// To combat this, we add a small wait to reduce how often the select loop can be woken up. We
|
||||||
|
// buffer messages to feeds so that we do as much work as possible during each wakeup, and if the
|
||||||
|
// wakeup lasts longer than 75ms we don't wait before polling again. This knocks ~80% CPU usage
|
||||||
|
// off on my machine running a soak test with 500 feeds, 4 shards and 100 nodes, doesn't seem to impact
|
||||||
|
// memory usage much, and still ensures that messages are delivered in a timely fashion.
|
||||||
|
//
|
||||||
|
// Increasing the wait to 100ms or more doesn't seem to have much more of a positive impact anyway.
|
||||||
|
let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75));
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
// AGGREGATOR -> FRONTEND (buffer messages to the UI)
|
// AGGREGATOR -> FRONTEND (buffer messages to the UI)
|
||||||
msg = rx_from_aggregator.next() => {
|
msgs = rx_from_aggregator_chunks.next() => {
|
||||||
// End the loop when connection from aggregator ends:
|
// End the loop when connection from aggregator ends:
|
||||||
let msg = match msg {
|
let msgs = match msgs {
|
||||||
Some(msg) => msg,
|
Some(msgs) => msgs,
|
||||||
None => break
|
None => break
|
||||||
};
|
};
|
||||||
|
|
||||||
// Send messages to the client (currently the only message is
|
// There is only one message type at the mo; bytes to send
|
||||||
// pre-serialized bytes that we send as binary):
|
// to the websocket. collect them all up to dispatch in one shot.
|
||||||
let bytes = match msg {
|
let all_ws_msgs = msgs.into_iter().map(|msg| {
|
||||||
ToFeedWebsocket::Bytes(bytes) => bytes
|
let bytes = match msg {
|
||||||
};
|
ToFeedWebsocket::Bytes(bytes) => bytes
|
||||||
|
};
|
||||||
|
Ok(ws::Message::binary(&*bytes))
|
||||||
|
});
|
||||||
|
|
||||||
if let Err(e) = websocket.send(ws::Message::binary(&*bytes)).await {
|
if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await {
|
||||||
log::warn!("Closing feed websocket due to error: {}", e);
|
log::warn!("Closing feed websocket due to error: {}", e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -302,6 +319,8 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debounce.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// loop ended; give socket back to parent:
|
// loop ended; give socket back to parent:
|
||||||
|
|||||||
Reference in New Issue
Block a user