Add more tests, fix (and test for) a deadlock re overquota messages, more unbounded channels and less .awaiting

This commit is contained in:
James Wilson
2021-07-15 10:53:02 +01:00
parent 86a3edf053
commit db8ea9a8f3
8 changed files with 285 additions and 73 deletions
@@ -28,13 +28,13 @@ struct AggregatorInternal {
/// Send messages in to the aggregator from the outside via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::Sender<inner_loop::ToAggregator>,
tx_to_aggregator: mpsc::UnboundedSender<inner_loop::ToAggregator>,
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(denylist: Vec<String>) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
let (tx_to_aggregator, rx_from_external) = mpsc::unbounded();
// Kick off a locator task to locate nodes, which hands back a channel to make location requests
let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| {
@@ -62,7 +62,7 @@ impl Aggregator {
// in to the aggregator. If nobody is tolding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
rx_from_external: mpsc::Receiver<inner_loop::ToAggregator>,
rx_from_external: mpsc::UnboundedReceiver<inner_loop::ToAggregator>,
tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
denylist: Vec<String>,
) {
@@ -10,7 +10,7 @@ use common::{
time,
};
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use futures::StreamExt;
use std::collections::{HashMap, HashSet};
use std::{
net::{IpAddr, Ipv4Addr},
@@ -31,7 +31,7 @@ pub enum FromShardWebsocket {
/// When the socket is opened, it'll send this first
/// so that we have a way to communicate back to it.
Initialize {
channel: mpsc::Sender<ToShardWebsocket>,
channel: mpsc::UnboundedSender<ToShardWebsocket>,
},
/// Tell the aggregator about a new node.
Add {
@@ -112,7 +112,7 @@ pub enum ToFeedWebsocket {
/// outgoing messages in the main aggregator loop.
pub struct InnerLoop {
/// Messages from the outside world come into this:
rx_from_external: mpsc::Receiver<ToAggregator>,
rx_from_external: mpsc::UnboundedReceiver<ToAggregator>,
/// The state of our chains and nodes lives here:
node_state: State,
@@ -123,7 +123,7 @@ pub struct InnerLoop {
/// Keep track of how to send messages out to feeds.
feed_channels: HashMap<ConnId, mpsc::UnboundedSender<ToFeedWebsocket>>,
/// Keep track of how to send messages out to shards.
shard_channels: HashMap<ConnId, mpsc::Sender<ToShardWebsocket>>,
shard_channels: HashMap<ConnId, mpsc::UnboundedSender<ToShardWebsocket>>,
/// Which chain is a feed subscribed to?
/// Feed Connection ID -> Chain Genesis Hash
@@ -142,7 +142,7 @@ pub struct InnerLoop {
impl InnerLoop {
/// Create a new inner loop handler with the various state it needs.
pub fn new(
rx_from_external: mpsc::Receiver<ToAggregator>,
rx_from_external: mpsc::UnboundedReceiver<ToAggregator>,
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
denylist: Vec<String>,
) -> Self {
@@ -159,25 +159,27 @@ impl InnerLoop {
}
}
/// Start handling and responding to incoming messages.
/// Start handling and responding to incoming messages. Owing to unbounded channels, we actually
/// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop
/// will be able to make progress quickly without any potential yield points.
pub async fn handle(mut self) {
while let Some(msg) = self.rx_from_external.next().await {
match msg {
ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => {
self.handle_from_feed(feed_conn_id, msg).await
self.handle_from_feed(feed_conn_id, msg)
}
ToAggregator::FromShardWebsocket(shard_conn_id, msg) => {
self.handle_from_shard(shard_conn_id, msg).await
self.handle_from_shard(shard_conn_id, msg)
}
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location).await
self.handle_from_find_location(node_id, location)
}
}
}
}
/// Handle messages that come from the node geographical locator.
async fn handle_from_find_location(
fn handle_from_find_location(
&mut self,
node_id: NodeId,
location: find_location::Location,
@@ -209,7 +211,7 @@ impl InnerLoop {
}
/// Handle messages coming from shards.
async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) {
fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) {
log::debug!("Message from shard ({:?}): {:?}", shard_conn_id, msg);
match msg {
@@ -226,21 +228,19 @@ impl InnerLoop {
state::AddNodeResult::ChainOnDenyList => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
let _ = shard_conn
.send(ToShardWebsocket::Mute {
.unbounded_send(ToShardWebsocket::Mute {
local_id,
reason: MuteReason::ChainNotAllowed,
})
.await;
});
}
}
state::AddNodeResult::ChainOverQuota => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
let _ = shard_conn
.send(ToShardWebsocket::Mute {
.unbounded_send(ToShardWebsocket::Mute {
local_id,
reason: MuteReason::Overquota,
})
.await;
});
}
}
state::AddNodeResult::NodeAddedToChain(details) => {
@@ -295,7 +295,7 @@ impl InnerLoop {
return;
}
};
self.remove_nodes_and_broadcast_result(Some(node_id)).await;
self.remove_nodes_and_broadcast_result(Some(node_id));
}
FromShardWebsocket::Update { local_id, payload } => {
let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) {
@@ -340,17 +340,16 @@ impl InnerLoop {
.collect();
// ... and remove them:
self.remove_nodes_and_broadcast_result(node_ids_to_remove)
.await;
self.remove_nodes_and_broadcast_result(node_ids_to_remove);
}
}
}
/// Handle messages coming from feeds.
async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) {
fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) {
log::debug!("Message from feed ({:?}): {:?}", feed_conn_id, msg);
match msg {
FromFeedWebsocket::Initialize { mut channel } => {
FromFeedWebsocket::Initialize { channel } => {
self.feed_channels.insert(feed_conn_id, channel.clone());
// Tell the new feed subscription some basic things to get it going:
@@ -363,7 +362,7 @@ impl InnerLoop {
// Send this to the channel that subscribed:
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await;
let _ = channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
}
FromFeedWebsocket::Ping { value } => {
@@ -474,7 +473,7 @@ impl InnerLoop {
}
/// Remove all of the node IDs provided and broadcast messages to feeds as needed.
async fn remove_nodes_and_broadcast_result(
fn remove_nodes_and_broadcast_result(
&mut self,
node_ids: impl IntoIterator<Item = NodeId>,
) {
+1 -1
View File
@@ -119,7 +119,7 @@ async fn handle_shard_websocket_connection<S>(
where
S: futures::Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin,
{
let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::channel(10);
let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::unbounded();
// Tell the aggregator about this new connection, and give it a way to send messages to us:
let init_msg = FromShardWebsocket::Initialize {