diff --git a/backend/shard/src/main.rs b/backend/shard/src/main.rs index 16d01a6..f32c4be 100644 --- a/backend/shard/src/main.rs +++ b/backend/shard/src/main.rs @@ -131,12 +131,37 @@ where None => { log::warn!("Websocket connection from {:?} closed", addr); break } }; - let node_message = match deserialize_ws_message(msg) { - Ok(Some(msg)) => msg, - Ok(None) => continue, - Err(e) => { log::error!("{}", e); break } + // If we see any errors, log them and end our loop: + let msg = match msg { + Err(e) => { log::error!("Error in node websocket connection: {}", e); break }, + Ok(msg) => msg, }; + // Close message? Break to close connection. + if msg.is_close() { + break; + } + + // If the message isn't something we want to handle, just ignore it. + // This includes system messages like "pings" and such, so don't log anything. + if !msg.is_binary() && !msg.is_text() { + continue; + } + + // Deserialize from JSON, warning if deserialization fails: + let bytes = msg.as_bytes(); + let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) { + Ok(node_message) => node_message, + Err(_e) => { + // let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); + // let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); + // log::warn!("Failed to parse node message ({}): {}", msg_start, e); + continue; + } + }; + + // Pull relevant details from the message: + let node_message: node_message::NodeMessage = node_message.into(); let message_id = node_message.id(); let payload = node_message.into_payload(); @@ -164,40 +189,3 @@ where // Return what we need to close the connection gracefully: (tx_to_aggregator, websocket) } - -/// Deserialize an incoming websocket message, returning an error if something -/// fatal went wrong, [`Some`] message if all went well, and [`None`] if a non-fatal -/// issue was encountered and the message should simply be ignored. -fn deserialize_ws_message( - msg: Result, -) -> anyhow::Result> { - // If we see any errors, log them and end our loop: - let msg = match msg { - Err(e) => { - return Err(anyhow::anyhow!("Error in node websocket connection: {}", e)); - } - Ok(msg) => msg, - }; - - // If the message isn't something we want to handle, just ignore it. - // This includes system messages like "pings" and such, so don't log anything. - if !msg.is_binary() && !msg.is_text() { - return Ok(None); - } - - // Deserialize from JSON, warning if deserialization fails: - let bytes = msg.as_bytes(); - let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) { - Ok(node_message) => node_message, - Err(_e) => { - // let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); - // let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); - // log::warn!("Failed to parse node message ({}): {}", msg_start, e); - return Ok(None); - } - }; - - // Pull relevant details from the message: - let node_message: node_message::NodeMessage = node_message.into(); - Ok(Some(node_message)) -} diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index e9e5926..508c8b5 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -91,7 +91,10 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { .map(move |ws: ws::Ws, addr: Option| { let tx_to_aggregator = feed_aggregator.subscribe_feed(); log::info!("Opening /feed connection from {:?}", addr); - ws.on_upgrade(move |websocket| async move { + + // We can decide how many messages can be buffered to be sent, but not specifically how + // large those messages are cumulatively allowed to be: + ws.max_send_queue(1_000 ).on_upgrade(move |websocket| async move { let (mut tx_to_aggregator, websocket) = handle_feed_websocket_connection(websocket, tx_to_aggregator).await; log::info!("Closing /feed connection from {:?}", addr); @@ -168,6 +171,11 @@ where Ok(msg) => msg }; + // Close message? Break and allow connection to be dropped. + if msg.is_close() { + break; + } + // If the message isn't something we want to handle, just ignore it. // This includes system messages like "pings" and such, so don't log anything. if !msg.is_binary() && !msg.is_text() { @@ -230,7 +238,8 @@ where // Loop, handling new messages from the shard or from the aggregator: loop { tokio::select! { - // AGGREGATOR -> FRONTEND + + // AGGREGATOR -> FRONTEND (buffer messages to the UI) msg = rx_from_aggregator.next() => { // End the loop when connection from aggregator ends: let msg = match msg { @@ -240,14 +249,19 @@ where // Send messages to the client (currently the only message is // pre-serialized bytes that we send as binary): - match msg { - ToFeedWebsocket::Bytes(bytes) => { - log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8")); - let _ = websocket.send(ws::Message::binary(bytes)).await; - } + let bytes = match msg { + ToFeedWebsocket::Bytes(bytes) => bytes + }; + + log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8")); + + if let Err(e) = websocket.send(ws::Message::binary(bytes)).await { + log::warn!("Closing feed websocket due to error: {}", e); + break; } } - // FRONTEND -> AGGREGATOR + + // FRONTEND -> AGGREGATOR (relay messages to the aggregator) msg = websocket.next() => { // End the loop when connection from feed ends: let msg = match msg { @@ -264,6 +278,11 @@ where Ok(msg) => msg }; + // Close message? Break and allow connection to be dropped. + if msg.is_close() { + break; + } + // We ignore all but text messages from the frontend: let text = match msg.to_str() { Ok(s) => s,