diff --git a/backend/Cargo.lock b/backend/Cargo.lock index aec4623..73057ee 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -31,6 +31,9 @@ name = "arrayvec" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4dc07131ffa69b8072d35f5007352af944213cde02545e2103680baed38fcd" +dependencies = [ + "serde", +] [[package]] name = "atty" @@ -198,6 +201,7 @@ name = "common" version = "0.1.0" dependencies = [ "anyhow", + "arrayvec", "base64", "bimap", "bincode", diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml index a0ccdfa..6fcb2c9 100644 --- a/backend/common/Cargo.toml +++ b/backend/common/Cargo.toml @@ -28,6 +28,7 @@ soketto = "0.6.0" thiserror = "1.0.24" tokio = { version = "1.8.2", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } +arrayvec = { version = "0.7.1", features = ["serde"] } [dev-dependencies] bincode = "1.3.3" diff --git a/backend/common/src/node_message.rs b/backend/common/src/node_message.rs index ae491dc..a966936 100644 --- a/backend/common/src/node_message.rs +++ b/backend/common/src/node_message.rs @@ -141,6 +141,7 @@ impl Payload { #[cfg(test)] mod tests { use super::*; + use arrayvec::ArrayString; use bincode::Options; // Without adding a derive macro and marker trait (and enforcing their use), we don't really @@ -166,7 +167,7 @@ mod tests { implementation: "foo".into(), version: "foo".into(), validator: None, - network_id: None, + network_id: ArrayString::new(), startup_time: None, }, }), diff --git a/backend/common/src/node_types.rs b/backend/common/src/node_types.rs index d06131f..7f6d444 100644 --- a/backend/common/src/node_types.rs +++ b/backend/common/src/node_types.rs @@ -17,6 +17,7 @@ //! These types are partly used in [`crate::node_message`], but also stored and used //! more generally through the application. +use arrayvec::ArrayString; use serde::ser::{SerializeTuple, Serializer}; use serde::{Deserialize, Serialize}; @@ -25,6 +26,7 @@ use crate::{time, MeanList}; pub type BlockNumber = u64; pub type Timestamp = u64; pub use primitive_types::H256 as BlockHash; +pub type NetworkId = ArrayString<64>; /// Basic node details. #[derive(Serialize, Deserialize, Debug, Clone)] @@ -34,7 +36,7 @@ pub struct NodeDetails { pub implementation: Box, pub version: Box, pub validator: Option>, - pub network_id: Option>, + pub network_id: NetworkId, pub startup_time: Option>, } diff --git a/backend/telemetry_core/src/state/state.rs b/backend/telemetry_core/src/state/state.rs index 6bde12b..f507cd7 100644 --- a/backend/telemetry_core/src/state/state.rs +++ b/backend/telemetry_core/src/state/state.rs @@ -274,6 +274,7 @@ impl<'a> StateChain<'a> { #[cfg(test)] mod test { use super::*; + use common::node_types::NetworkId; fn node(name: &str, chain: &str) -> NodeDetails { NodeDetails { @@ -282,7 +283,7 @@ mod test { implementation: "Bar".into(), version: "0.1".into(), validator: None, - network_id: None, + network_id: NetworkId::new(), startup_time: None, } } diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index 3ebccdf..b6afd61 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -657,7 +657,10 @@ async fn e2e_slow_feeds_are_disconnected() { let (mut raw_feed_tx, mut raw_feed_rx) = server.get_core().connect_feed_raw().await.unwrap(); // Subscribe the feed: - raw_feed_tx.send_text("subscribe:Polkadot").await.unwrap(); + raw_feed_tx + .send_text("subscribe:0x0000000000000000000000000000000000000000000000000000000000000001") + .await + .unwrap(); // Wait a little.. the feed hasn't been receiving messages so it should // be booted after ~a second. diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 50ded1d..7b7145d 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -73,6 +73,10 @@ pub enum FromWebsocket { message_id: node_message::NodeMessageId, payload: node_message::Payload, }, + /// remove a node with the given message ID + Remove { + message_id: node_message::NodeMessageId, + }, /// Make a note when the node disconnects. Disconnected, } @@ -247,6 +251,20 @@ impl Aggregator { .send_async(FromShardAggregator::UpdateNode { local_id, payload }) .await; } + ToAggregator::FromWebsocket(conn_id, FromWebsocket::Remove { message_id }) => { + // Get the local ID, ignoring the message if none match: + let local_id = match to_local_id.get_id(&(conn_id, message_id)) { + Some(id) => id, + None => continue, + }; + + // Remove references to this single node: + to_local_id.remove_by_id(local_id); + muted.remove(&local_id); + let _ = tx_to_telemetry_core + .send_async(FromShardAggregator::RemoveNode { local_id }) + .await; + } ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { // Find all of the local IDs corresponding to the disconnected connection ID and // remove them, telling Telemetry Core about them too. This could be more efficient, diff --git a/backend/telemetry_shard/src/json_message/node_message.rs b/backend/telemetry_shard/src/json_message/node_message.rs index fc40898..c45a90f 100644 --- a/backend/telemetry_shard/src/json_message/node_message.rs +++ b/backend/telemetry_shard/src/json_message/node_message.rs @@ -246,7 +246,7 @@ pub struct NodeDetails { pub implementation: Box, pub version: Box, pub validator: Option>, - pub network_id: Option>, + pub network_id: node_types::NetworkId, pub startup_time: Option>, } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index eba6f7d..778b1d4 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -21,15 +21,20 @@ mod connection; mod json_message; mod real_ip; -use std::{collections::HashSet, net::IpAddr, time::Duration}; +use std::{ + collections::HashMap, + net::IpAddr, + time::{Duration, Instant}, +}; use aggregator::{Aggregator, FromWebsocket}; use blocked_addrs::BlockedAddrs; use common::byte_size::ByteSize; use common::http_utils; use common::node_message; +use common::node_message::NodeMessageId; use common::rolling_total::RollingTotalBuilder; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use http::Uri; use hyper::{Method, Response}; use simple_logger::SimpleLogger; @@ -91,6 +96,12 @@ struct Opts { /// on the machine. If no value is given, use an internal default that we have deemed sane. #[structopt(long)] worker_threads: Option, + /// Roughly how long to wait in seconds for new telemetry data to arrive from a node. If + /// telemetry for a node does not arrive in this time frame, we remove the corresponding node + /// state, and if no messages are received on the connection at all in this time, it will be + /// dropped. + #[structopt(long, default_value = "60")] + stale_node_timeout: u64, } fn main() { @@ -131,6 +142,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { let socket_addr = opts.socket; let max_nodes_per_connection = opts.max_nodes_per_connection; let bytes_per_second = opts.max_node_data_per_second; + let stale_node_timeout = Duration::from_secs(opts.stale_node_timeout); let server = http_utils::start_server(socket_addr, move |addr, req| { let aggregator = aggregator.clone(); @@ -165,6 +177,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { max_nodes_per_connection, bytes_per_second, block_list, + stale_node_timeout, ) .await; log::info!( @@ -200,10 +213,15 @@ async fn handle_node_websocket_connection( max_nodes_per_connection: usize, bytes_per_second: ByteSize, block_list: BlockedAddrs, + stale_node_timeout: Duration, ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, { + // Keep track of the message Ids that have been "granted access". We allow a maximum of + // `max_nodes_per_connection` before ignoring others. + let mut allowed_message_ids = HashMap::::new(); + // Limit the number of bytes based on a rolling total and the incoming bytes per second // that has been configured via the CLI opts. let bytes_per_second = bytes_per_second.num_bytes(); @@ -212,46 +230,86 @@ where .window_size_multiple(10) .start(); - // Track all of the message IDs that we've seen so far. If we exceed the - // max_nodes_per_connection limit we ignore subsequent message IDs. - let mut message_ids_seen = HashSet::new(); - // This could be a oneshot channel, but it's useful to be able to clone // messages, and we can't clone oneshot channel senders. let (close_connection_tx, close_connection_rx) = flume::bounded(1); // Tell the aggregator about this new connection, and give it a way to close this connection: let init_msg = FromWebsocket::Initialize { - close_connection: close_connection_tx, + close_connection: close_connection_tx.clone(), }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); return (tx_to_aggregator, ws_send); } - // Now we've "initialized", wait for messages from the node. Messages will - // either be `SystemConnected` type messages that inform us that a new set - // of messages with some message ID will be sent (a node could have more - // than one of these), or updates linked to a specific message_id. + // Receiving data isn't cancel safe, so let it happen in a separate task. + // If this loop ends, the outer will receive a `None` message and end too. + // If the outer loop ends, it fires a msg on `close_connection_rx` to ensure this ends too. + let (ws_tx_atomic, mut ws_rx_atomic) = futures::channel::mpsc::unbounded(); + tokio::task::spawn(async move { + loop { + let mut bytes = Vec::new(); + tokio::select! { + // The close channel has fired, so end the loop. `ws_recv.receive_data` is + // *not* cancel safe, but since we're closing the connection we don't care. + _ = close_connection_rx.recv_async() => { + log::info!("connection to {:?} being closed", real_addr); + break + }, + // Receive data and relay it on to our main select loop below. + msg_info = ws_recv.receive_data(&mut bytes) => { + if let Err(soketto::connection::Error::Closed) = msg_info { + break; + } + if let Err(e) = msg_info { + log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + break; + } + if ws_tx_atomic.unbounded_send(bytes).is_err() { + // The other end closed; end this loop. + break; + } + } + } + } + }); + + // A periodic interval to check for stale nodes. + let mut stale_interval = tokio::time::interval(stale_node_timeout / 2); + + // Our main select loop atomically receives and handles telemetry messages from the node, + // and periodically checks for stale connections to keep our ndoe state tidy. loop { - let mut bytes = Vec::new(); tokio::select! { - // The close channel has fired, so end the loop. `ws_recv.receive_data` is - // *not* cancel safe, but since we're closing the connection we don't care. - _ = close_connection_rx.recv_async() => { - log::info!("connection to {:?} being closed by aggregator", real_addr); - break + // We periodically check for stale message IDs and remove nodes associated with + // them, to prevent a buildup. We boot the whole connection if no interpretable + // messages have been sent at all in the time period. + _ = stale_interval.tick() => { + let stale_ids: Vec = allowed_message_ids.iter() + .filter(|(_, last_seen)| last_seen.elapsed() > stale_node_timeout) + .map(|(&id, _)| id) + .collect(); + + for &message_id in &stale_ids { + log::info!("Removing stale node with message ID {} from {:?}", message_id, real_addr); + allowed_message_ids.remove(&message_id); + let _ = tx_to_aggregator.send(FromWebsocket::Remove { message_id } ).await; + } + + if !stale_ids.is_empty() && allowed_message_ids.is_empty() { + // End the entire connection if no recent messages came in for any ID. + log::info!("Closing stale connection from {:?}", real_addr); + break; + } }, - // A message was received; handle it: - msg_info = ws_recv.receive_data(&mut bytes) => { - // Handle the socket closing, or errors receiving the message. - if let Err(soketto::connection::Error::Closed) = msg_info { - break; - } - if let Err(e) = msg_info { - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); - break; - } + // Handle messages received by the connected node. + msg = ws_rx_atomic.next() => { + // No more messages? break. + let bytes = match msg { + Some(bytes) => bytes, + None => { break; } + }; // Keep track of total bytes and bail if average over last 10 secs exceeds preference. rolling_total_bytes.push(bytes.len()); @@ -283,21 +341,21 @@ where let message_id = node_message.id(); let payload = node_message.into_payload(); - // Ignore messages from IDs that exceed our limit: - if message_ids_seen.contains(&message_id) { - // continue on; we're happy - } else if message_ids_seen.len() >= max_nodes_per_connection { - // ignore this message; it's not a "seen" ID and we've hit our limit. - continue; - } else { - // not seen ID, not hit limit; make note of new ID - message_ids_seen.insert(message_id); - } - // Until the aggregator receives an `Add` message, which we can create once // we see one of these SystemConnected ones, it will ignore messages with // the corresponding message_id. if let node_message::Payload::SystemConnected(info) = payload { + // Too many nodes seen on this connection? Ignore this one. + if allowed_message_ids.len() >= max_nodes_per_connection { + log::info!("Ignoring new node from {:?} (we've hit the max of {} nodes per connection)", real_addr, max_nodes_per_connection); + continue; + } + + // Note of the message ID, allowing telemetry for it. + allowed_message_ids.insert(message_id, Instant::now()); + + // Tell the aggregator loop about the new node. + log::info!("Adding node with message ID {} from {:?}", message_id, real_addr); let _ = tx_to_aggregator.send(FromWebsocket::Add { message_id, ip: real_addr, @@ -307,14 +365,22 @@ where } // Anything that's not an "Add" is an Update. The aggregator will ignore // updates against a message_id that hasn't first been Added, above. - else if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { - log::error!("Failed to send node message to aggregator: {}", e); - continue; + else { + if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) { + *last_seen = Instant::now(); + if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { + log::error!("Failed to send node message to aggregator: {}", e); + continue; + } + } } } } } + // Make sure to kill off the receive-messages task if the main select loop ends: + let _ = close_connection_tx.send(()); + // Return what we need to close the connection gracefully: (tx_to_aggregator, ws_send) }