Drain all the pending messages in the channel when TracingUnboundedReceiver is dropped (#13917)

Signed-off-by: linning <linningde25@gmail.com>
This commit is contained in:
NingLin-P
2023-04-14 22:15:31 +08:00
committed by GitHub
parent 890a65b87d
commit 8d06402e2e
+24
View File
@@ -141,6 +141,7 @@ impl<T> TracingUnboundedReceiver<T> {
impl<T> Drop for TracingUnboundedReceiver<T> {
fn drop(&mut self) {
// Close the channel to prevent any further messages to be sent into the channel
self.close();
// the number of messages about to be dropped
let count = self.inner.len();
@@ -150,6 +151,10 @@ impl<T> Drop for TracingUnboundedReceiver<T> {
.with_label_values(&[self.name, "dropped"])
.inc_by(count.saturated_into());
}
// Drain all the pending messages in the channel since they can never be accessed,
// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
// resolved
while let Ok(_) = self.inner.try_recv() {}
}
}
@@ -177,3 +182,22 @@ impl<T> FusedStream for TracingUnboundedReceiver<T> {
self.inner.is_terminated()
}
}
#[cfg(test)]
mod tests {
use super::tracing_unbounded;
use async_channel::{self, RecvError, TryRecvError};
#[test]
fn test_tracing_unbounded_receiver_drop() {
let (tracing_unbounded_sender, tracing_unbounded_receiver) =
tracing_unbounded("test-receiver-drop", 10);
let (tx, rx) = async_channel::unbounded::<usize>();
tracing_unbounded_sender.unbounded_send(tx).unwrap();
drop(tracing_unbounded_receiver);
assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
assert_eq!(rx.recv_blocking(), Err(RecvError));
}
}