Flumify everything

This commit is contained in:
James Wilson
2021-08-10 11:19:26 +01:00
parent 11b0b3a3c7
commit bd7a21ec39
14 changed files with 73 additions and 165 deletions
+18 -18
View File
@@ -21,8 +21,7 @@ use common::{
node_types::BlockHash,
AssignId,
};
use futures::channel::mpsc;
use futures::{Sink, SinkExt, StreamExt};
use futures::{Sink, SinkExt};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
@@ -60,7 +59,7 @@ pub enum FromWebsocket {
/// the websocket connection and force the node to reconnect
/// so that it sends its system info again incase the telemetry
/// core has restarted.
close_connection: mpsc::Sender<()>,
close_connection: flume::Sender<()>,
},
/// Tell the aggregator about a new node.
Add {
@@ -94,28 +93,28 @@ struct AggregatorInternal {
/// Send messages to the aggregator from websockets via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::Sender<ToAggregator>,
tx_to_aggregator: flume::Sender<ToAggregator>,
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
let (tx_to_aggregator, rx_from_external) = flume::bounded(10);
// Establish a resiliant connection to the core (this retries as needed):
let (tx_to_telemetry_core, mut rx_from_telemetry_core) =
let (tx_to_telemetry_core, rx_from_telemetry_core) =
create_ws_connection_to_core(telemetry_uri).await;
// Forward messages from the telemetry core into the aggregator:
let mut tx_to_aggregator2 = tx_to_aggregator.clone();
let tx_to_aggregator2 = tx_to_aggregator.clone();
tokio::spawn(async move {
while let Some(msg) = rx_from_telemetry_core.next().await {
while let Ok(msg) = rx_from_telemetry_core.recv_async().await {
let msg_to_aggregator = match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
};
if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await {
if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await {
// This will close the ws channels, which themselves log messages.
break;
}
@@ -139,8 +138,8 @@ impl Aggregator {
// in to the aggregator. If nobody is holding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
mut rx_from_external: mpsc::Receiver<ToAggregator>,
mut tx_to_telemetry_core: mpsc::Sender<FromAggregator>,
rx_from_external: flume::Receiver<ToAggregator>,
tx_to_telemetry_core: flume::Sender<FromAggregator>,
) {
use internal_messages::{FromShardAggregator, FromTelemetryCore};
@@ -150,7 +149,7 @@ impl Aggregator {
// A list of close channels for the currently connected substrate nodes. Send an empty
// tuple to these to ask the connections to be closed.
let mut close_connections: HashMap<ConnId, mpsc::Sender<()>> = HashMap::new();
let mut close_connections: HashMap<ConnId, flume::Sender<()>> = HashMap::new();
// Maintain mappings from the connection ID and node message ID to the "local ID" which we
// broadcast to the telemetry core.
@@ -160,15 +159,15 @@ impl Aggregator {
let mut muted: HashSet<ShardNodeId> = HashSet::new();
// Now, loop and receive messages to handle.
while let Some(msg) = rx_from_external.next().await {
while let Ok(msg) = rx_from_external.recv_async().await {
match msg {
ToAggregator::ConnectedToTelemetryCore => {
// Take hold of the connection closers and run them all.
let closers = close_connections;
for (_, mut closer) in closers {
for (_, closer) in closers {
// if this fails, it probably means the connection has died already anyway.
let _ = closer.send(()).await;
let _ = closer.send_async(()).await;
}
// We've told everything to disconnect. Now, reset our state:
@@ -212,7 +211,7 @@ impl Aggregator {
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
.send(FromShardAggregator::AddNode {
.send_async(FromShardAggregator::AddNode {
ip,
node,
genesis_hash,
@@ -245,7 +244,7 @@ impl Aggregator {
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
.send(FromShardAggregator::UpdateNode { local_id, payload })
.send_async(FromShardAggregator::UpdateNode { local_id, payload })
.await;
}
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
@@ -264,7 +263,7 @@ impl Aggregator {
to_local_id.remove_by_id(local_id);
muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send(FromShardAggregator::RemoveNode { local_id })
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
}
@@ -293,6 +292,7 @@ impl Aggregator {
// but pinning by boxing is the easy solution for now:
Box::pin(
tx_to_aggregator
.into_sink()
.with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }),
)
}
+16 -21
View File
@@ -16,8 +16,7 @@
use bincode::Options;
use common::ws_client;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use futures::StreamExt;
#[derive(Clone, Debug)]
pub enum Message<Out> {
@@ -36,13 +35,13 @@ pub enum Message<Out> {
/// between aggregator and core.
pub async fn create_ws_connection_to_core<In, Out>(
telemetry_uri: http::Uri,
) -> (mpsc::Sender<In>, mpsc::Receiver<Message<Out>>)
) -> (flume::Sender<In>, flume::Receiver<Message<Out>>)
where
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
let (tx_in, mut rx_in) = mpsc::channel(10);
let (mut tx_out, rx_out) = mpsc::channel(10);
let (tx_in, rx_in) = flume::bounded::<In>(10);
let (tx_out, rx_out) = flume::bounded(10);
let mut is_connected = false;
@@ -51,7 +50,7 @@ where
// Throw away any pending messages from the incoming channel so that it
// doesn't get filled up and begin blocking while we're looping and waiting
// for a reconnection.
while let Ok(Some(_)) = rx_in.try_next() {}
while let Ok(_) = rx_in.try_recv() {}
// Try to connect. If connection established, we serialize and forward messages
// to/from the core. If the external channels break, we end for good. If the internal
@@ -60,9 +59,9 @@ where
Ok(connection) => {
let (tx_to_core, mut rx_from_core) = connection.into_channels();
is_connected = true;
let mut tx_out = tx_out.clone();
let tx_out = tx_out.clone();
if let Err(e) = tx_out.send(Message::Connected).await {
if let Err(e) = tx_out.send_async(Message::Connected).await {
// If receiving end is closed, bail now.
log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
@@ -73,35 +72,31 @@ where
tokio::select! {
msg = rx_from_core.next() => {
let msg = match msg {
Some(msg) => msg,
Some(Ok(msg)) => msg,
// No more messages from core? core WS is disconnected.
None => {
_ => {
log::warn!("No more messages from core: shutting down connection (will reconnect)");
break
}
};
let bytes = match msg {
Ok(ws_client::RecvMessage::Binary(bytes)) => bytes,
Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(),
Err(e) => {
log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e);
break;
}
ws_client::RecvMessage::Binary(bytes) => bytes,
ws_client::RecvMessage::Text(s) => s.into_bytes()
};
let msg = bincode::options()
.deserialize(&bytes)
.expect("internal messages must be deserializable");
if let Err(e) = tx_out.send(Message::Data(msg)).await {
if let Err(e) = tx_out.send_async(Message::Data(msg)).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
},
msg = rx_in.next() => {
msg = rx_in.recv_async() => {
let msg = match msg {
Some(msg) => msg,
None => {
Ok(msg) => msg,
Err(flume::RecvError::Disconnected) => {
log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)");
return
}
@@ -131,7 +126,7 @@ where
if is_connected {
is_connected = false;
if let Err(e) = tx_out.send(Message::Disconnected).await {
if let Err(e) = tx_out.send_async(Message::Disconnected).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
+3 -3
View File
@@ -29,7 +29,7 @@ use common::byte_size::ByteSize;
use common::http_utils;
use common::node_message;
use common::rolling_total::RollingTotalBuilder;
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::SinkExt;
use http::Uri;
use hyper::{Method, Response};
use simple_logger::SimpleLogger;
@@ -203,7 +203,7 @@ where
// This could be a oneshot channel, but it's useful to be able to clone
// messages, and we can't clone oneshot channel senders.
let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0);
let (close_connection_tx, close_connection_rx) = flume::bounded(1);
// Tell the aggregator about this new connection, and give it a way to close this connection:
let init_msg = FromWebsocket::Initialize {
@@ -223,7 +223,7 @@ where
tokio::select! {
// The close channel has fired, so end the loop. `ws_recv.receive_data` is
// *not* cancel safe, but since we're closing the connection we don't care.
_ = close_connection_rx.next() => {
_ = close_connection_rx.recv_async() => {
log::info!("connection to {:?} being closed by aggregator", real_addr);
break
},