From 7dfc582a201601c1061a70459f7258f0a5a0460d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Jun 2021 09:55:28 +0100 Subject: [PATCH] feed/shard disconnects can be handled, and unbounded output to feeds --- backend/telemetry/src/aggregator.rs | 27 ++++++++++++++++++++++++--- backend/telemetry/src/main.rs | 23 ++++++++++++++--------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/backend/telemetry/src/aggregator.rs b/backend/telemetry/src/aggregator.rs index d4c1ee3..a9b1828 100644 --- a/backend/telemetry/src/aggregator.rs +++ b/backend/telemetry/src/aggregator.rs @@ -49,7 +49,9 @@ pub enum FromShardWebsocket { /// Tell the aggregator that a node has been removed when it disconnects. Remove { local_id: LocalId, - } + }, + /// The shard is disconnected. + Disconnected } /// The aggregator can these messages back to a shard connection. @@ -66,8 +68,10 @@ pub enum ToShardWebsocket { pub enum FromFeedWebsocket { /// When the socket is opened, it'll send this first /// so that we have a way to communicate back to it. + /// Unbounded so that slow feeds don't block aggregato + /// progress. Initialize { - channel: mpsc::Sender, + channel: mpsc::UnboundedSender, }, /// The feed can subscribe to a chain to receive /// messages relating to it. @@ -81,7 +85,9 @@ pub enum FromFeedWebsocket { /// An explicit ping message. Ping { chain: Box - } + }, + /// The feed is disconnected. + Disconnected } // The frontend sends text based commands; parse them into these messages: @@ -166,6 +172,7 @@ impl Aggregator { // Now, loop and receive messages to handle. while let Some(msg) = rx_from_external.next().await { match msg { + // FROM FEED ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { mut channel }) => { feed_channels.insert(feed_conn_id, channel.clone()); @@ -268,6 +275,16 @@ impl Aggregator { ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality) => { feed_conn_id_finality.remove(&feed_conn_id); }, + ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Disconnected) => { + // The feed has disconnected; clean up references to it: + if let Some(chain) = feed_conn_id_to_chain.remove(&feed_conn_id) { + chain_to_feed_conn_ids.remove(&chain); + } + feed_channels.remove(&feed_conn_id); + feed_conn_id_finality.remove(&feed_conn_id); + }, + + // FROM SHARD ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => { shard_channels.insert(shard_conn_id, channel); }, @@ -325,6 +342,10 @@ impl Aggregator { // TODO: node_state.update_node, then handle returned diffs }, + ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Disconnected) => { + // The shard has disconnected; remove the shard channel, but also + // remove any nodes associated with the shard, firing the relevant feed messages. + } } } } diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index 9ce32c1..125b634 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -91,8 +91,10 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { let tx_to_aggregator = shard_aggregator.subscribe_shard(); log::info!("Opening /shard_submit connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { - let websocket = handle_shard_websocket_connection(websocket, tx_to_aggregator).await; + let (mut tx_to_aggregator, websocket) = handle_shard_websocket_connection(websocket, tx_to_aggregator).await; log::info!("Closing /shard_submit connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromShardWebsocket::Disconnected).await; let _ = websocket.close().await; }) }); @@ -106,8 +108,10 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { let tx_to_aggregator = feed_aggregator.subscribe_feed(); log::info!("Opening /feed connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { - let websocket = handle_feed_websocket_connection(websocket, tx_to_aggregator).await; + let (mut tx_to_aggregator, websocket) = handle_feed_websocket_connection(websocket, tx_to_aggregator).await; log::info!("Closing /feed connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; let _ = websocket.close().await; }) }); @@ -121,7 +125,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { } /// This handles messages coming to/from a shard connection -async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> ws::WebSocket +async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> (S, ws::WebSocket) where S: futures::Sink + Unpin { let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::channel(10); @@ -132,7 +136,7 @@ async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); - return websocket; + return (tx_to_aggregator, websocket); } // Loop, handling new messages from the shard or from the aggregator: @@ -213,14 +217,15 @@ async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut } // loop ended; give socket back to parent: - websocket + (tx_to_aggregator, websocket) } /// This handles messages coming from a feed connection -async fn handle_feed_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> ws::WebSocket +async fn handle_feed_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> (S, ws::WebSocket) where S: futures::Sink + Unpin { - let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::channel(10); + // unbounded channel so that slow feeds don't block aggregator progress: + let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::unbounded(); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { @@ -228,7 +233,7 @@ async fn handle_feed_websocket_connection(mut websocket: ws::WebSocket, mut t }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); - return websocket; + return (tx_to_aggregator, websocket); } // Loop, handling new messages from the shard or from the aggregator: @@ -290,5 +295,5 @@ async fn handle_feed_websocket_connection(mut websocket: ws::WebSocket, mut t } // loop ended; give socket back to parent: - websocket + (tx_to_aggregator, websocket) }