diff --git a/backend/telemetry/src/aggregator/aggregator.rs b/backend/telemetry/src/aggregator/aggregator.rs index 90e3b1f..a2deddb 100644 --- a/backend/telemetry/src/aggregator/aggregator.rs +++ b/backend/telemetry/src/aggregator/aggregator.rs @@ -1,8 +1,11 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; use futures::channel::mpsc; -use futures::{ Sink, SinkExt }; +use futures::{ future, Sink, SinkExt }; use super::inner_loop; +use super::find_location::{ self, find_location }; +use crate::state::NodeId; +use std::net::Ipv4Addr; /// A unique Id is assigned per websocket connection (or more accurately, /// per feed socket and per shard socket). This can be combined with the @@ -30,8 +33,13 @@ impl Aggregator { pub async fn spawn(denylist: Vec) -> anyhow::Result { let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); + // Kick off a locator task to locate nodes, which hands back a channel to make location requests + let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| { + future::ok::<_,mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation(node_id, msg)) + })); + // Handle any incoming messages in our handler loop: - tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_aggregator.clone(), denylist)); + tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_locator, denylist)); // Return a handle to our aggregator: Ok(Aggregator(Arc::new(AggregatorInternal { @@ -46,7 +54,7 @@ impl Aggregator { // any more, this task will gracefully end. async fn handle_messages( rx_from_external: mpsc::Receiver, - tx_to_aggregator: mpsc::Sender, + tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec ) { inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist).handle().await; diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index cc77daf..b5ea7df 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -10,11 +10,11 @@ use common::{ use bimap::BiMap; use std::{iter::FromIterator, net::Ipv4Addr, str::FromStr}; use futures::channel::{ mpsc }; -use futures::{ future, SinkExt, StreamExt }; +use futures::{ SinkExt, StreamExt }; use std::collections::{ HashMap, HashSet }; use crate::state::{ self, State, NodeId }; use crate::feed_message::{ self, FeedMessageSerializer }; -use super::find_location::{ self, find_location }; +use super::find_location; /// A unique Id is assigned per websocket connection (or more accurately, /// per feed socket and per shard socket). This can be combined with the @@ -151,14 +151,9 @@ impl InnerLoop { /// Create a new inner loop handler with the various state it needs. pub fn new( rx_from_external: mpsc::Receiver, - tx_to_aggregator: mpsc::Sender, + tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec ) -> Self { - - let tx_to_locator = find_location(tx_to_aggregator.with(|(node_id, msg)| { - future::ok::<_,mpsc::SendError>(ToAggregator::FromFindLocation(node_id, msg)) - })); - InnerLoop { rx_from_external, node_state: State::new(denylist),