locator shuffling around

This commit is contained in:
James Wilson
2021-06-24 09:54:00 +01:00
parent 47c12ce210
commit e383866322
2 changed files with 14 additions and 11 deletions
+11 -3
View File
@@ -1,8 +1,11 @@
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use futures::channel::mpsc;
use futures::{ Sink, SinkExt };
use futures::{ future, Sink, SinkExt };
use super::inner_loop;
use super::find_location::{ self, find_location };
use crate::state::NodeId;
use std::net::Ipv4Addr;
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
@@ -30,8 +33,13 @@ impl Aggregator {
pub async fn spawn(denylist: Vec<String>) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
// Kick off a locator task to locate nodes, which hands back a channel to make location requests
let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| {
future::ok::<_,mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation(node_id, msg))
}));
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_aggregator.clone(), denylist));
tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_locator, denylist));
// Return a handle to our aggregator:
Ok(Aggregator(Arc::new(AggregatorInternal {
@@ -46,7 +54,7 @@ impl Aggregator {
// any more, this task will gracefully end.
async fn handle_messages(
rx_from_external: mpsc::Receiver<inner_loop::ToAggregator>,
tx_to_aggregator: mpsc::Sender<inner_loop::ToAggregator>,
tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
denylist: Vec<String>
) {
inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist).handle().await;
@@ -10,11 +10,11 @@ use common::{
use bimap::BiMap;
use std::{iter::FromIterator, net::Ipv4Addr, str::FromStr};
use futures::channel::{ mpsc };
use futures::{ future, SinkExt, StreamExt };
use futures::{ SinkExt, StreamExt };
use std::collections::{ HashMap, HashSet };
use crate::state::{ self, State, NodeId };
use crate::feed_message::{ self, FeedMessageSerializer };
use super::find_location::{ self, find_location };
use super::find_location;
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
@@ -151,14 +151,9 @@ impl InnerLoop {
/// Create a new inner loop handler with the various state it needs.
pub fn new(
rx_from_external: mpsc::Receiver<ToAggregator>,
tx_to_aggregator: mpsc::Sender<ToAggregator>,
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
denylist: Vec<String>
) -> Self {
let tx_to_locator = find_location(tx_to_aggregator.with(|(node_id, msg)| {
future::ok::<_,mpsc::SendError>(ToAggregator::FromFindLocation(node_id, msg))
}));
InnerLoop {
rx_from_external,
node_state: State::new(denylist),