diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c22dcac..aec4623 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -437,6 +437,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "funty" version = "1.1.0" @@ -755,6 +761,27 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jemalloc-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.51" @@ -1580,6 +1607,7 @@ dependencies = [ "hex", "http", "hyper", + "jemallocator", "log", "num_cpus", "once_cell", @@ -1613,6 +1641,7 @@ dependencies = [ "hex", "http", "hyper", + "jemallocator", "log", "num_cpus", "primitive-types", diff --git a/backend/common/src/flume_recv_stream.rs b/backend/common/src/flume_recv_stream.rs new file mode 100644 index 0000000..e0dd73a --- /dev/null +++ b/backend/common/src/flume_recv_stream.rs @@ -0,0 +1,21 @@ +//! A sort-of drop-in replacement to create a Stream from a flume Receiver, because `flume::Receiver::into_stream()` +//! leaks memory. See: +//! +//! https://github.com/zesterer/flume/issues/88 +//! +//! Hopefully we won't need to use these for long; the issue will probably be resolved fairly prompty and we can +//! revert back to using the built-in flume methods. +//! +use flume::Receiver; +use futures::stream::poll_fn; +use futures::{FutureExt, Stream}; +use std::pin::Pin; + +/// A drop-in replacement which is similar to `flume::RecvStream`. +pub type FlumeRecvStream<'a, T> = Pin + Send + 'a>>; + +/// A drop-in replacement for `flume`'s `Receiver::into_stream()` method. +pub fn flume_receiver_into_stream<'a, T: Send + 'a>(r: Receiver) -> FlumeRecvStream<'a, T> { + let stream = poll_fn(move |cx| r.recv_async().poll_unpin(cx).map(|r| r.ok())); + Box::pin(stream) +} diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index c0df1f6..cc8f42a 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -28,6 +28,7 @@ pub mod ws_client; mod assign_id; mod dense_map; mod either_sink; +mod flume_recv_stream; mod mean_list; mod most_seen; mod multi_map_unique; @@ -37,6 +38,7 @@ mod num_stats; pub use assign_id::AssignId; pub use dense_map::DenseMap; pub use either_sink::EitherSink; +pub use flume_recv_stream::{flume_receiver_into_stream, FlumeRecvStream}; pub use mean_list::MeanList; pub use most_seen::MostSeen; pub use multi_map_unique::MultiMapUnique; diff --git a/backend/common/src/rolling_total.rs b/backend/common/src/rolling_total.rs index e22384f..b9caf9f 100644 --- a/backend/common/src/rolling_total.rs +++ b/backend/common/src/rolling_total.rs @@ -215,6 +215,7 @@ mod test { } assert_eq!(rolling_total.averages().len(), 3); + assert!(rolling_total.averages().capacity() < 10); // Just to show that it's capacity is bounded. } #[test] diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 787a88b..1509c1c 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -205,7 +205,7 @@ impl Connection { closer: Arc::clone(&on_close), }, Receiver { - inner: rx_from_ws.into_stream(), + inner: crate::flume_receiver_into_stream(rx_from_ws), closer: on_close, }, ) diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index e1e2397..38a7e1c 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -20,7 +20,7 @@ use std::sync::Arc; /// Receive messages out of a connection pub struct Receiver { - pub(super) inner: flume::r#async::RecvStream<'static, Result>, + pub(super) inner: crate::FlumeRecvStream<'static, Result>, pub(super) closer: Arc, } diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index c85e613..2df969e 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -34,6 +34,9 @@ thiserror = "1.0.25" tokio = { version = "1.10.1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = "0.3.2" + [dev-dependencies] shellwords = "1.1.0" test_utils = { path = "../test_utils" } diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 1508daa..872a37a 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -34,6 +34,13 @@ use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); const NAME: &str = "Substrate Telemetry Backend Core"; @@ -204,7 +211,6 @@ where S: futures::Sink + Unpin + Send + 'static, { let (tx_to_shard_conn, rx_from_aggregator) = flume::unbounded(); - let mut rx_from_aggregator = rx_from_aggregator.into_stream(); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromShardWebsocket::Initialize { @@ -291,13 +297,13 @@ where let send_handle = tokio::spawn(async move { loop { let msg = tokio::select! { - msg = rx_from_aggregator.next() => msg, + msg = rx_from_aggregator.recv_async() => msg, _ = &mut send_closer_rx => { break } }; let msg = match msg { - Some(msg) => msg, - None => break, + Ok(msg) => msg, + Err(flume::RecvError::Disconnected) => break, }; let internal_msg = match msg { @@ -347,7 +353,8 @@ where { // unbounded channel so that slow feeds don't block aggregator progress: let (tx_to_feed_conn, rx_from_aggregator) = flume::unbounded(); - let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator.into_stream()); + let mut rx_from_aggregator_chunks = + ReadyChunksAll::new(common::flume_receiver_into_stream(rx_from_aggregator)); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index 97f0a4b..dd554ed 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -25,3 +25,6 @@ structopt = "0.3.21" thiserror = "1.0.25" tokio = { version = "1.10.1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = "0.3.2" \ No newline at end of file diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 78e0162..eba6f7d 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -35,6 +35,13 @@ use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); const NAME: &str = "Substrate Telemetry Backend Shard";