Tweak logs and attempt to avoid races around removing nodes (#504)

* Tweak logs and attempt to avoid races around removing nodes

* wrapping_add in assign_id
This commit is contained in:
James Wilson
2022-10-10 13:12:07 +01:00
committed by GitHub
parent 41c93a8a19
commit e7d15d03b3
5 changed files with 53 additions and 52 deletions
+7 -2
View File
@@ -47,7 +47,9 @@ where
pub fn assign_id(&mut self, details: Details) -> Id { pub fn assign_id(&mut self, details: Details) -> Id {
let this_id = self.current_id; let this_id = self.current_id;
self.current_id += 1; // It's very unlikely we'll ever overflow the ID limit, but in case we do,
// a wrapping_add will almost certainly be fine:
self.current_id = self.current_id.wrapping_add(1);
self.mapping.insert(this_id, details); self.mapping.insert(this_id, details);
this_id.into() this_id.into()
} }
@@ -73,7 +75,10 @@ where
} }
pub fn clear(&mut self) { pub fn clear(&mut self) {
*self = AssignId::new(); // Leave the `current_id` as-is. Why? To avoid reusing IDs and risking
// race conditions where old messages can accidentally screw with new nodes
// that have been assigned the same ID.
self.mapping = BiMap::new();
} }
pub fn iter(&self) -> impl Iterator<Item = (Id, &Details)> { pub fn iter(&self) -> impl Iterator<Item = (Id, &Details)> {
@@ -248,7 +248,7 @@ impl InnerLoop {
} }
if let Err(e) = metered_tx.send(msg) { if let Err(e) = metered_tx.send(msg) {
log::error!("Cannot send message into aggregator: {}", e); log::error!("Cannot send message into aggregator: {e}");
break; break;
} }
} }
@@ -386,10 +386,11 @@ impl InnerLoop {
let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) {
Some((node_id, _)) => node_id, Some((node_id, _)) => node_id,
None => { None => {
log::error!( // It's possible that some race between removing and disconnecting shards might lead to
"Cannot find ID for node with shard/connectionId of {:?}/{:?}", // more than one remove message for the same node. This isn't really a problem, but we
shard_conn_id, // hope it won't happen so make a note if it does:
local_id log::debug!(
"Remove: Cannot find ID for node with shard/connectionId of {shard_conn_id:?}/{local_id:?}"
); );
return; return;
} }
@@ -401,9 +402,7 @@ impl InnerLoop {
Some(id) => *id, Some(id) => *id,
None => { None => {
log::error!( log::error!(
"Cannot find ID for node with shard/connectionId of {:?}/{:?}", "Update: Cannot find ID for node with shard/connectionId of {shard_conn_id:?}/{local_id:?}"
shard_conn_id,
local_id
); );
return; return;
} }
@@ -606,7 +605,7 @@ impl InnerLoop {
let removed_details = match self.node_state.remove_node(node_id) { let removed_details = match self.node_state.remove_node(node_id) {
Some(remove_details) => remove_details, Some(remove_details) => remove_details,
None => { None => {
log::error!("Could not find node {:?}", node_id); log::error!("Could not find node {node_id:?}");
return; return;
} }
}; };
+9 -25
View File
@@ -251,10 +251,7 @@ where
break; break;
} }
if let Err(e) = msg_info { if let Err(e) = msg_info {
log::error!( log::error!("Shutting down websocket connection: Failed to receive data: {e}");
"Shutting down websocket connection: Failed to receive data: {}",
e
);
break; break;
} }
@@ -262,10 +259,7 @@ where
match bincode::options().deserialize(&bytes) { match bincode::options().deserialize(&bytes) {
Ok(msg) => msg, Ok(msg) => msg,
Err(e) => { Err(e) => {
log::error!( log::error!("Failed to deserialize message from shard; booting it: {e}");
"Failed to deserialize message from shard; booting it: {}",
e
);
break; break;
} }
}; };
@@ -292,7 +286,7 @@ where
}; };
if let Err(e) = tx_to_aggregator.send(aggregator_msg).await { if let Err(e) = tx_to_aggregator.send(aggregator_msg).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e); log::error!("Failed to send message to aggregator; closing shard: {e}");
break; break;
} }
} }
@@ -325,13 +319,10 @@ where
.expect("message to shard should serialize"); .expect("message to shard should serialize");
if let Err(e) = ws_send.send_binary(bytes).await { if let Err(e) = ws_send.send_binary(bytes).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e) log::error!("Failed to send message to aggregator; closing shard: {e}")
} }
if let Err(e) = ws_send.flush().await { if let Err(e) = ws_send.flush().await {
log::error!( log::error!("Failed to flush message to aggregator; closing shard: {e}")
"Failed to flush message to aggregator; closing shard: {}",
e
)
} }
} }
@@ -374,7 +365,7 @@ where
channel: tx_to_feed_conn, channel: tx_to_feed_conn,
}; };
if let Err(e) = tx_to_aggregator.send(init_msg).await { if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e); log::error!("Error sending message to aggregator: {e}");
return (tx_to_aggregator, ws_send); return (tx_to_aggregator, ws_send);
} }
@@ -399,10 +390,7 @@ where
break; break;
} }
if let Err(e) = msg_info { if let Err(e) = msg_info {
log::error!( log::error!("Shutting down websocket connection: Failed to receive data: {e}");
"Shutting down websocket connection: Failed to receive data: {}",
e
);
break; break;
} }
@@ -416,16 +404,12 @@ where
let cmd = match FromFeedWebsocket::from_str(&text) { let cmd = match FromFeedWebsocket::from_str(&text) {
Ok(cmd) => cmd, Ok(cmd) => cmd,
Err(e) => { Err(e) => {
log::warn!( log::warn!("Ignoring invalid command '{text}' from the frontend: {e}");
"Ignoring invalid command '{}' from the frontend: {}",
text,
e
);
continue; continue;
} }
}; };
if let Err(e) = tx_to_aggregator.send(cmd).await { if let Err(e) = tx_to_aggregator.send(cmd).await {
log::error!("Failed to send message to aggregator; closing feed: {}", e); log::error!("Failed to send message to aggregator; closing feed: {e}");
break; break;
} }
} }
+16 -6
View File
@@ -261,9 +261,14 @@ impl Aggregator {
// Remove references to this single node: // Remove references to this single node:
to_local_id.remove_by_id(local_id); to_local_id.remove_by_id(local_id);
muted.remove(&local_id); muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id }) // If we're not connected to the core, don't buffer up remove messages. The core will remove
.await; // all nodes associated with this shard anyway, so the remove message would be redundant.
if connected_to_telemetry_core {
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
} }
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
// Find all of the local IDs corresponding to the disconnected connection ID and // Find all of the local IDs corresponding to the disconnected connection ID and
@@ -280,9 +285,14 @@ impl Aggregator {
for local_id in local_ids_disconnected { for local_id in local_ids_disconnected {
to_local_id.remove_by_id(local_id); to_local_id.remove_by_id(local_id);
muted.remove(&local_id); muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id }) // If we're not connected to the core, don't buffer up remove messages. The core will remove
.await; // all nodes associated with this shard anyway, so the remove message would be redundant.
if connected_to_telemetry_core {
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
} }
} }
ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute {
+13 -10
View File
@@ -239,7 +239,7 @@ where
close_connection: close_connection_tx.clone(), close_connection: close_connection_tx.clone(),
}; };
if let Err(e) = tx_to_aggregator.send(init_msg).await { if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e); log::error!("Shutting down websocket connection from {real_addr:?}: Error sending message to aggregator: {e}");
return (tx_to_aggregator, ws_send); return (tx_to_aggregator, ws_send);
} }
@@ -254,7 +254,7 @@ where
// The close channel has fired, so end the loop. `ws_recv.receive_data` is // The close channel has fired, so end the loop. `ws_recv.receive_data` is
// *not* cancel safe, but since we're closing the connection we don't care. // *not* cancel safe, but since we're closing the connection we don't care.
_ = close_connection_rx.recv_async() => { _ = close_connection_rx.recv_async() => {
log::info!("connection to {:?} being closed", real_addr); log::info!("connection to {real_addr:?} being closed");
break break
}, },
// Receive data and relay it on to our main select loop below. // Receive data and relay it on to our main select loop below.
@@ -263,7 +263,7 @@ where
break; break;
} }
if let Err(e) = msg_info { if let Err(e) = msg_info {
log::error!("Shutting down websocket connection: Failed to receive data: {}", e); log::error!("Shutting down websocket connection from {real_addr:?}: Failed to receive data: {e}");
break; break;
} }
if ws_tx_atomic.unbounded_send(bytes).is_err() { if ws_tx_atomic.unbounded_send(bytes).is_err() {
@@ -292,14 +292,14 @@ where
.collect(); .collect();
for &message_id in &stale_ids { for &message_id in &stale_ids {
log::info!("Removing stale node with message ID {} from {:?}", message_id, real_addr); log::info!("Removing stale node with message ID {message_id} from {real_addr:?}");
allowed_message_ids.remove(&message_id); allowed_message_ids.remove(&message_id);
let _ = tx_to_aggregator.send(FromWebsocket::Remove { message_id } ).await; let _ = tx_to_aggregator.send(FromWebsocket::Remove { message_id } ).await;
} }
if !stale_ids.is_empty() && allowed_message_ids.is_empty() { if !stale_ids.is_empty() && allowed_message_ids.is_empty() {
// End the entire connection if no recent messages came in for any ID. // End the entire connection if no recent messages came in for any ID.
log::info!("Closing stale connection from {:?}", real_addr); log::info!("Closing stale connection from {real_addr:?}");
break; break;
} }
}, },
@@ -316,7 +316,7 @@ where
let this_bytes_per_second = rolling_total_bytes.total() / 10; let this_bytes_per_second = rolling_total_bytes.total() / 10;
if this_bytes_per_second > bytes_per_second { if this_bytes_per_second > bytes_per_second {
block_list.block_addr(real_addr, "Too much traffic"); block_list.block_addr(real_addr, "Too much traffic");
log::error!("Shutting down websocket connection: Too much traffic ({}bps averaged over last 10s)", this_bytes_per_second); log::error!("Shutting down websocket connection: Too much traffic ({this_bytes_per_second}bps averaged over last 10s)");
break; break;
} }
@@ -327,7 +327,7 @@ where
Err(e) => { Err(e) => {
let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
log::warn!("Failed to parse node message ({}): {}", msg_start, e); log::warn!("Failed to parse node message ({msg_start}): {e}");
continue; continue;
}, },
#[cfg(not(debug))] #[cfg(not(debug))]
@@ -347,7 +347,7 @@ where
if let node_message::Payload::SystemConnected(info) = payload { if let node_message::Payload::SystemConnected(info) = payload {
// Too many nodes seen on this connection? Ignore this one. // Too many nodes seen on this connection? Ignore this one.
if allowed_message_ids.len() >= max_nodes_per_connection { if allowed_message_ids.len() >= max_nodes_per_connection {
log::info!("Ignoring new node from {:?} (we've hit the max of {} nodes per connection)", real_addr, max_nodes_per_connection); log::info!("Ignoring new node with ID {message_id} from {real_addr:?} (we've hit the max of {max_nodes_per_connection} nodes per connection)");
continue; continue;
} }
@@ -355,7 +355,7 @@ where
allowed_message_ids.insert(message_id, Instant::now()); allowed_message_ids.insert(message_id, Instant::now());
// Tell the aggregator loop about the new node. // Tell the aggregator loop about the new node.
log::info!("Adding node with message ID {} from {:?}", message_id, real_addr); log::info!("Adding node with message ID {message_id} from {real_addr:?}");
let _ = tx_to_aggregator.send(FromWebsocket::Add { let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id, message_id,
ip: real_addr, ip: real_addr,
@@ -369,9 +369,12 @@ where
if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) { if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) {
*last_seen = Instant::now(); *last_seen = Instant::now();
if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
log::error!("Failed to send node message to aggregator: {}", e); log::error!("Failed to send node message to aggregator: {e}");
continue; continue;
} }
} else {
log::info!("Ignoring message with ID {message_id} from {real_addr:?} (we've hit the max of {max_nodes_per_connection} nodes per connection)");
continue;
} }
} }
} }