mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-12 16:51:02 +00:00
Avoid using flume::Receiver::into_stream() to avoid memory leaks until the issue is resolved upstream (#394)
* Tweak rolling_total test to also confirm capacity doesn't go nuts * Use Jemalloc * Avoid flume's into_stream and use a workaround for now * cargo fmt * Improve comments now that there's an issue to point to
This commit is contained in:
Generated
+29
@@ -437,6 +437,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs_extra"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
|
||||
|
||||
[[package]]
|
||||
name = "funty"
|
||||
version = "1.1.0"
|
||||
@@ -755,6 +761,27 @@ version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
|
||||
|
||||
[[package]]
|
||||
name = "jemalloc-sys"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"fs_extra",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jemallocator"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69"
|
||||
dependencies = [
|
||||
"jemalloc-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.51"
|
||||
@@ -1580,6 +1607,7 @@ dependencies = [
|
||||
"hex",
|
||||
"http",
|
||||
"hyper",
|
||||
"jemallocator",
|
||||
"log",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
@@ -1613,6 +1641,7 @@ dependencies = [
|
||||
"hex",
|
||||
"http",
|
||||
"hyper",
|
||||
"jemallocator",
|
||||
"log",
|
||||
"num_cpus",
|
||||
"primitive-types",
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
//! A sort-of drop-in replacement to create a Stream from a flume Receiver, because `flume::Receiver::into_stream()`
|
||||
//! leaks memory. See:
|
||||
//!
|
||||
//! https://github.com/zesterer/flume/issues/88
|
||||
//!
|
||||
//! Hopefully we won't need to use these for long; the issue will probably be resolved fairly prompty and we can
|
||||
//! revert back to using the built-in flume methods.
|
||||
//!
|
||||
use flume::Receiver;
|
||||
use futures::stream::poll_fn;
|
||||
use futures::{FutureExt, Stream};
|
||||
use std::pin::Pin;
|
||||
|
||||
/// A drop-in replacement which is similar to `flume::RecvStream`.
|
||||
pub type FlumeRecvStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
|
||||
|
||||
/// A drop-in replacement for `flume`'s `Receiver::into_stream()` method.
|
||||
pub fn flume_receiver_into_stream<'a, T: Send + 'a>(r: Receiver<T>) -> FlumeRecvStream<'a, T> {
|
||||
let stream = poll_fn(move |cx| r.recv_async().poll_unpin(cx).map(|r| r.ok()));
|
||||
Box::pin(stream)
|
||||
}
|
||||
@@ -28,6 +28,7 @@ pub mod ws_client;
|
||||
mod assign_id;
|
||||
mod dense_map;
|
||||
mod either_sink;
|
||||
mod flume_recv_stream;
|
||||
mod mean_list;
|
||||
mod most_seen;
|
||||
mod multi_map_unique;
|
||||
@@ -37,6 +38,7 @@ mod num_stats;
|
||||
pub use assign_id::AssignId;
|
||||
pub use dense_map::DenseMap;
|
||||
pub use either_sink::EitherSink;
|
||||
pub use flume_recv_stream::{flume_receiver_into_stream, FlumeRecvStream};
|
||||
pub use mean_list::MeanList;
|
||||
pub use most_seen::MostSeen;
|
||||
pub use multi_map_unique::MultiMapUnique;
|
||||
|
||||
@@ -215,6 +215,7 @@ mod test {
|
||||
}
|
||||
|
||||
assert_eq!(rolling_total.averages().len(), 3);
|
||||
assert!(rolling_total.averages().capacity() < 10); // Just to show that it's capacity is bounded.
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -205,7 +205,7 @@ impl Connection {
|
||||
closer: Arc::clone(&on_close),
|
||||
},
|
||||
Receiver {
|
||||
inner: rx_from_ws.into_stream(),
|
||||
inner: crate::flume_receiver_into_stream(rx_from_ws),
|
||||
closer: on_close,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
||||
|
||||
/// Receive messages out of a connection
|
||||
pub struct Receiver {
|
||||
pub(super) inner: flume::r#async::RecvStream<'static, Result<RecvMessage, RecvError>>,
|
||||
pub(super) inner: crate::FlumeRecvStream<'static, Result<RecvMessage, RecvError>>,
|
||||
pub(super) closer: Arc<OnClose>,
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,9 @@ thiserror = "1.0.25"
|
||||
tokio = { version = "1.10.1", features = ["full"] }
|
||||
tokio-util = { version = "0.6", features = ["compat"] }
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
jemallocator = "0.3.2"
|
||||
|
||||
[dev-dependencies]
|
||||
shellwords = "1.1.0"
|
||||
test_utils = { path = "../test_utils" }
|
||||
|
||||
@@ -34,6 +34,13 @@ use hyper::{Method, Response};
|
||||
use simple_logger::SimpleLogger;
|
||||
use structopt::StructOpt;
|
||||
|
||||
#[cfg(not(target_env = "msvc"))]
|
||||
use jemallocator::Jemalloc;
|
||||
|
||||
#[cfg(not(target_env = "msvc"))]
|
||||
#[global_allocator]
|
||||
static GLOBAL: Jemalloc = Jemalloc;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||
const NAME: &str = "Substrate Telemetry Backend Core";
|
||||
@@ -204,7 +211,6 @@ where
|
||||
S: futures::Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
|
||||
{
|
||||
let (tx_to_shard_conn, rx_from_aggregator) = flume::unbounded();
|
||||
let mut rx_from_aggregator = rx_from_aggregator.into_stream();
|
||||
|
||||
// Tell the aggregator about this new connection, and give it a way to send messages to us:
|
||||
let init_msg = FromShardWebsocket::Initialize {
|
||||
@@ -291,13 +297,13 @@ where
|
||||
let send_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
msg = rx_from_aggregator.next() => msg,
|
||||
msg = rx_from_aggregator.recv_async() => msg,
|
||||
_ = &mut send_closer_rx => { break }
|
||||
};
|
||||
|
||||
let msg = match msg {
|
||||
Some(msg) => msg,
|
||||
None => break,
|
||||
Ok(msg) => msg,
|
||||
Err(flume::RecvError::Disconnected) => break,
|
||||
};
|
||||
|
||||
let internal_msg = match msg {
|
||||
@@ -347,7 +353,8 @@ where
|
||||
{
|
||||
// unbounded channel so that slow feeds don't block aggregator progress:
|
||||
let (tx_to_feed_conn, rx_from_aggregator) = flume::unbounded();
|
||||
let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator.into_stream());
|
||||
let mut rx_from_aggregator_chunks =
|
||||
ReadyChunksAll::new(common::flume_receiver_into_stream(rx_from_aggregator));
|
||||
|
||||
// Tell the aggregator about this new connection, and give it a way to send messages to us:
|
||||
let init_msg = FromFeedWebsocket::Initialize {
|
||||
|
||||
@@ -25,3 +25,6 @@ structopt = "0.3.21"
|
||||
thiserror = "1.0.25"
|
||||
tokio = { version = "1.10.1", features = ["full"] }
|
||||
tokio-util = { version = "0.6", features = ["compat"] }
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
jemallocator = "0.3.2"
|
||||
@@ -35,6 +35,13 @@ use hyper::{Method, Response};
|
||||
use simple_logger::SimpleLogger;
|
||||
use structopt::StructOpt;
|
||||
|
||||
#[cfg(not(target_env = "msvc"))]
|
||||
use jemallocator::Jemalloc;
|
||||
|
||||
#[cfg(not(target_env = "msvc"))]
|
||||
#[global_allocator]
|
||||
static GLOBAL: Jemalloc = Jemalloc;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||
const NAME: &str = "Substrate Telemetry Backend Shard";
|
||||
|
||||
Reference in New Issue
Block a user