make sure to gracefully handle 'close' messages

This commit is contained in:
James Wilson
2021-07-05 15:11:20 +01:00
parent 6910b4dca4
commit ea52d36999
2 changed files with 56 additions and 49 deletions
+29 -41
View File
@@ -131,12 +131,37 @@ where
None => { log::warn!("Websocket connection from {:?} closed", addr); break }
};
let node_message = match deserialize_ws_message(msg) {
Ok(Some(msg)) => msg,
Ok(None) => continue,
Err(e) => { log::error!("{}", e); break }
// If we see any errors, log them and end our loop:
let msg = match msg {
Err(e) => { log::error!("Error in node websocket connection: {}", e); break },
Ok(msg) => msg,
};
// Close message? Break to close connection.
if msg.is_close() {
break;
}
// If the message isn't something we want to handle, just ignore it.
// This includes system messages like "pings" and such, so don't log anything.
if !msg.is_binary() && !msg.is_text() {
continue;
}
// Deserialize from JSON, warning if deserialization fails:
let bytes = msg.as_bytes();
let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) {
Ok(node_message) => node_message,
Err(_e) => {
// let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
// let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
// log::warn!("Failed to parse node message ({}): {}", msg_start, e);
continue;
}
};
// Pull relevant details from the message:
let node_message: node_message::NodeMessage = node_message.into();
let message_id = node_message.id();
let payload = node_message.into_payload();
@@ -164,40 +189,3 @@ where
// Return what we need to close the connection gracefully:
(tx_to_aggregator, websocket)
}
/// Deserialize an incoming websocket message, returning an error if something
/// fatal went wrong, [`Some`] message if all went well, and [`None`] if a non-fatal
/// issue was encountered and the message should simply be ignored.
fn deserialize_ws_message(
msg: Result<ws::Message, warp::Error>,
) -> anyhow::Result<Option<node_message::NodeMessage>> {
// If we see any errors, log them and end our loop:
let msg = match msg {
Err(e) => {
return Err(anyhow::anyhow!("Error in node websocket connection: {}", e));
}
Ok(msg) => msg,
};
// If the message isn't something we want to handle, just ignore it.
// This includes system messages like "pings" and such, so don't log anything.
if !msg.is_binary() && !msg.is_text() {
return Ok(None);
}
// Deserialize from JSON, warning if deserialization fails:
let bytes = msg.as_bytes();
let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) {
Ok(node_message) => node_message,
Err(_e) => {
// let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
// let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
// log::warn!("Failed to parse node message ({}): {}", msg_start, e);
return Ok(None);
}
};
// Pull relevant details from the message:
let node_message: node_message::NodeMessage = node_message.into();
Ok(Some(node_message))
}
+27 -8
View File
@@ -91,7 +91,10 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
.map(move |ws: ws::Ws, addr: Option<SocketAddr>| {
let tx_to_aggregator = feed_aggregator.subscribe_feed();
log::info!("Opening /feed connection from {:?}", addr);
ws.on_upgrade(move |websocket| async move {
// We can decide how many messages can be buffered to be sent, but not specifically how
// large those messages are cumulatively allowed to be:
ws.max_send_queue(1_000 ).on_upgrade(move |websocket| async move {
let (mut tx_to_aggregator, websocket) =
handle_feed_websocket_connection(websocket, tx_to_aggregator).await;
log::info!("Closing /feed connection from {:?}", addr);
@@ -168,6 +171,11 @@ where
Ok(msg) => msg
};
// Close message? Break and allow connection to be dropped.
if msg.is_close() {
break;
}
// If the message isn't something we want to handle, just ignore it.
// This includes system messages like "pings" and such, so don't log anything.
if !msg.is_binary() && !msg.is_text() {
@@ -230,7 +238,8 @@ where
// Loop, handling new messages from the shard or from the aggregator:
loop {
tokio::select! {
// AGGREGATOR -> FRONTEND
// AGGREGATOR -> FRONTEND (buffer messages to the UI)
msg = rx_from_aggregator.next() => {
// End the loop when connection from aggregator ends:
let msg = match msg {
@@ -240,14 +249,19 @@ where
// Send messages to the client (currently the only message is
// pre-serialized bytes that we send as binary):
match msg {
ToFeedWebsocket::Bytes(bytes) => {
log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8"));
let _ = websocket.send(ws::Message::binary(bytes)).await;
}
let bytes = match msg {
ToFeedWebsocket::Bytes(bytes) => bytes
};
log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8"));
if let Err(e) = websocket.send(ws::Message::binary(bytes)).await {
log::warn!("Closing feed websocket due to error: {}", e);
break;
}
}
// FRONTEND -> AGGREGATOR
// FRONTEND -> AGGREGATOR (relay messages to the aggregator)
msg = websocket.next() => {
// End the loop when connection from feed ends:
let msg = match msg {
@@ -264,6 +278,11 @@ where
Ok(msg) => msg
};
// Close message? Break and allow connection to be dropped.
if msg.is_close() {
break;
}
// We ignore all but text messages from the frontend:
let text = match msg.to_str() {
Ok(s) => s,