diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 5fa991b..113eb15 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -52,8 +52,11 @@ impl AggregatorSet { .map(|a| a.subscribe_shard()) .collect(); + let (tx, rx) = flume::unbounded::(); + let mut rx = rx.into_stream(); + let tx = tx.into_sink(); + // Send every incoming message to all aggregators. - let (tx, mut rx) = futures::channel::mpsc::unbounded::(); tokio::spawn(async move { while let Some(msg) = rx.next().await { for conn in &mut conns {