diff --git a/backend/common/src/flume_recv_stream.rs b/backend/common/src/flume_recv_stream.rs deleted file mode 100644 index e0dd73a..0000000 --- a/backend/common/src/flume_recv_stream.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! A sort-of drop-in replacement to create a Stream from a flume Receiver, because `flume::Receiver::into_stream()` -//! leaks memory. See: -//! -//! https://github.com/zesterer/flume/issues/88 -//! -//! Hopefully we won't need to use these for long; the issue will probably be resolved fairly prompty and we can -//! revert back to using the built-in flume methods. -//! -use flume::Receiver; -use futures::stream::poll_fn; -use futures::{FutureExt, Stream}; -use std::pin::Pin; - -/// A drop-in replacement which is similar to `flume::RecvStream`. -pub type FlumeRecvStream<'a, T> = Pin + Send + 'a>>; - -/// A drop-in replacement for `flume`'s `Receiver::into_stream()` method. -pub fn flume_receiver_into_stream<'a, T: Send + 'a>(r: Receiver) -> FlumeRecvStream<'a, T> { - let stream = poll_fn(move |cx| r.recv_async().poll_unpin(cx).map(|r| r.ok())); - Box::pin(stream) -} diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index cc8f42a..c0df1f6 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -28,7 +28,6 @@ pub mod ws_client; mod assign_id; mod dense_map; mod either_sink; -mod flume_recv_stream; mod mean_list; mod most_seen; mod multi_map_unique; @@ -38,7 +37,6 @@ mod num_stats; pub use assign_id::AssignId; pub use dense_map::DenseMap; pub use either_sink::EitherSink; -pub use flume_recv_stream::{flume_receiver_into_stream, FlumeRecvStream}; pub use mean_list::MeanList; pub use most_seen::MostSeen; pub use multi_map_unique::MultiMapUnique; diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 1509c1c..193b3ee 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . use super::on_close::OnClose; +use futures::{channel, StreamExt}; use soketto::handshake::{Client, ServerResponse}; use std::sync::Arc; use tokio::net::TcpStream; @@ -71,7 +72,7 @@ impl Connection { let mut rx_closed2 = tx_closed1.subscribe(); // Receive messages from the socket: - let (tx_to_external, rx_from_ws) = flume::unbounded(); + let (tx_to_external, rx_from_ws) = channel::mpsc::unbounded(); tokio::spawn(async move { let mut send_to_external = true; loop { @@ -110,7 +111,7 @@ impl Connection { .map_err(|e| e.into()), }; - if let Err(e) = tx_to_external.send_async(msg).await { + if let Err(e) = tx_to_external.unbounded_send(msg) { // Our external channel may have closed or errored, but the socket hasn't // been closed, so keep receiving in order to allow the socket to continue to // function properly (we may be happy just sending messages to it), but stop @@ -122,12 +123,12 @@ impl Connection { }); // Send messages to the socket: - let (tx_to_ws, rx_from_external) = flume::unbounded::(); + let (tx_to_ws, mut rx_from_external) = channel::mpsc::unbounded::(); tokio::spawn(async move { loop { // Wait for messages, or bail entirely if asked to close. let msg = tokio::select! { - msg = rx_from_external.recv_async() => { msg }, + msg = rx_from_external.next() => { msg }, _ = rx_closed2.recv() => { // attempt to gracefully end the connection. let _ = ws_to_connection.close().await; @@ -139,8 +140,8 @@ impl Connection { // needs to keep receiving data for the WS connection to stay open, there's no // reason to keep this side of the loop open if our channel is closed. let msg = match msg { - Ok(msg) => msg, - _ => break, + Some(msg) => msg, + None => break, }; // We don't explicitly shut down the channel if we hit send errors. Why? Because the @@ -205,7 +206,7 @@ impl Connection { closer: Arc::clone(&on_close), }, Receiver { - inner: crate::flume_receiver_into_stream(rx_from_ws), + inner: rx_from_ws, closer: on_close, }, ) diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index 38a7e1c..2937fdd 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -15,12 +15,12 @@ // along with this program. If not, see . use super::on_close::OnClose; -use futures::{Stream, StreamExt}; +use futures::{channel, Stream, StreamExt}; use std::sync::Arc; /// Receive messages out of a connection pub struct Receiver { - pub(super) inner: crate::FlumeRecvStream<'static, Result>, + pub(super) inner: channel::mpsc::UnboundedReceiver>, pub(super) closer: Arc, } diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index b81529c..2d6f3e3 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . use super::on_close::OnClose; +use futures::channel; use std::sync::Arc; /// A message that can be sent into the channel interface @@ -39,7 +40,7 @@ pub enum SentMessage { /// Send messages into the connection #[derive(Clone)] pub struct Sender { - pub(super) inner: flume::Sender, + pub(super) inner: channel::mpsc::UnboundedSender, pub(super) closer: Arc, } @@ -51,19 +52,21 @@ impl Sender { } /// Returns whether this channel is closed. pub fn is_closed(&self) -> bool { - self.inner.is_disconnected() + self.inner.is_closed() } /// Unbounded send will always queue the message and doesn't /// need to be awaited. - pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), flume::SendError> { - self.inner.send(msg)?; + pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), channel::mpsc::SendError> { + self.inner + .unbounded_send(msg) + .map_err(|e| e.into_send_error())?; Ok(()) } /// Convert this sender into a Sink pub fn into_sink( self, ) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { - self.inner.into_sink() + self.inner } } diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 872a37a..fedc0f0 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -353,8 +353,13 @@ where { // unbounded channel so that slow feeds don't block aggregator progress: let (tx_to_feed_conn, rx_from_aggregator) = flume::unbounded(); - let mut rx_from_aggregator_chunks = - ReadyChunksAll::new(common::flume_receiver_into_stream(rx_from_aggregator)); + + // `Receiver::into_stream()` is currently problematic at the time of writing + // (see https://github.com/zesterer/flume/issues/88). If this stream is polled lots + // and isn't ready, it'll leak memory. In this case, since we only select from it or + // a close channel, we shouldn't poll the thing more than once before it's ready (and + // when it's ready, it cleans up after itself properly). So, I hope it won't leak! + let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator.into_stream()); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index 047de88..ff68b0f 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -21,7 +21,7 @@ use std::{ use crate::feed_message_de::FeedMessage; use common::ws_client; -use futures::{Stream, StreamExt}; +use futures::{channel, Stream, StreamExt}; /// Wrap a `ws_client::Sender` with convenient utility methods for shard connections pub struct ShardSender(ws_client::Sender); @@ -37,7 +37,7 @@ impl ShardSender { pub fn send_json_binary( &mut self, json: serde_json::Value, - ) -> Result<(), flume::SendError> { + ) -> Result<(), channel::mpsc::SendError> { let bytes = serde_json::to_vec(&json).expect("valid bytes"); self.unbounded_send(ws_client::SentMessage::Binary(bytes)) } @@ -45,7 +45,7 @@ impl ShardSender { pub fn send_json_text( &mut self, json: serde_json::Value, - ) -> Result<(), flume::SendError> { + ) -> Result<(), channel::mpsc::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.unbounded_send(ws_client::SentMessage::Text(s)) } @@ -123,7 +123,7 @@ impl FeedSender { &self, command: S, param: S, - ) -> Result<(), flume::SendError> { + ) -> Result<(), channel::mpsc::SendError> { self.unbounded_send(ws_client::SentMessage::Text(format!( "{}:{}", command.as_ref(),