diff --git a/backend/common/src/rolling_total.rs b/backend/common/src/rolling_total.rs index 7411b52..a2af5e2 100644 --- a/backend/common/src/rolling_total.rs +++ b/backend/common/src/rolling_total.rs @@ -232,7 +232,11 @@ mod test { // Regardless of the exact time that's elapsed, we'll end up with buckets that // are exactly granularity spacing (or multiples of) apart. assert_eq!( - rolling_total.averages().into_iter().copied().collect::>(), + rolling_total + .averages() + .into_iter() + .copied() + .collect::>(), vec![ (start_time, 1), (start_time + granularity, 2), diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 48979f1..5fe294c 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -63,20 +63,22 @@ impl Connection { // Receive messages from the socket and post them out: let (mut tx_to_external, rx_from_ws) = mpsc::unbounded(); + let (tx_has_closed, mut rx_has_closed) = futures::channel::oneshot::channel(); tokio::spawn(async move { let mut data = Vec::with_capacity(128); loop { // Clear the buffer and wait for the next message to arrive: data.clear(); - let message_data = match ws_from_connection.receive_data(&mut data).await { Err(e) => { - // Couldn't receive data may mean all senders are gone, so log - // the error and shut this down: + // Couldn't receive data means some issue with the connection. Log + // the error, and close the other half of the connection too, + // so the associated channels close gracefully. log::error!( "Shutting down websocket connection: Failed to receive data: {}", e ); + let _ = tx_has_closed.send(()); break; } Ok(data) => data, @@ -93,11 +95,11 @@ impl Connection { if let Err(e) = tx_to_external.send(msg).await { // Failure to send likely means that the recv has been dropped, - // so let's drop this loop too. - log::error!( - "Shutting down websocket connection: Failed to send data out: {}", - e - ); + // so let's drop this loop too. An issue with the channel doesn't + // mean that our socket connection has failed though, so we make no + // attempt to close the other half of our connection here (we may + // still be happily sending messages even if we dropped the receiver) + log::error!("Failed to send data out: {}", e); break; } } @@ -106,7 +108,18 @@ impl Connection { // Receive messages externally to send to the socket. let (tx_to_ws, mut rx_from_external) = mpsc::unbounded(); tokio::spawn(async move { - while let Some(msg) = rx_from_external.next().await { + loop { + let msg = tokio::select! { + msg = rx_from_external.next() => { msg }, + // Websocket connection closed? Don't wait for incoming message; break immediately. + _ = &mut rx_has_closed => { break }, + }; + + let msg = match msg { + None => break, + Some(msg) => msg, + }; + match msg { SentMessageInternal::Message(SentMessage::Text(s)) => { if let Err(e) = ws_to_connection.send_text_owned(s).await { diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 108828d..b0f47a4 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -56,7 +56,7 @@ impl Sender { Ok(()) } /// Returns whether this channel is closed. - pub fn is_closed(&mut self) -> bool { + pub fn is_closed(&self) -> bool { self.inner.is_closed() } /// Unbounded send will always queue the message and doesn't diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index e0beceb..9c0751a 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -14,9 +14,24 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! General end-to-end tests +/*! +General end-to-end tests + +Note that on MacOS inparticular, you may need to increase some limits to be +able to open a large number of connections and run some of the tests. +Try running these: + +```sh +sudo sysctl -w kern.maxfiles=50000 +sudo sysctl -w kern.maxfilesperproc=50000 +ulimit -n 50000 +sudo sysctl -w kern.ipc.somaxconn=50000 +sudo sysctl -w kern.ipc.maxsockbuf=16777216 +``` +*/ use common::node_types::BlockHash; +use common::ws_client::SentMessage; use serde_json::json; use std::time::Duration; use test_utils::{ @@ -506,6 +521,57 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { server.shutdown().await; } +/// If a node sends more than some rolling average amount of data, it'll be booted. +#[tokio::test] +async fn node_banned_if_it_sends_too_much_data() { + async fn try_send_data(max_bytes: usize, send_msgs: usize, bytes_per_msg: usize) -> bool { + let mut server = start_server( + false, + CoreOpts::default(), + ShardOpts { + // Remember, this is (currently) averaged over the last 10 seconds, + // so we need to send 10x this amount of data for an imemdiate ban: + max_node_data_per_second: Some(max_bytes), + ..Default::default() + }, + ) + .await; + + // Give us a shard to talk to: + let shard_id = server.add_shard().await.unwrap(); + let (node_tx, _node_rx) = server + .get_shard(shard_id) + .unwrap() + .connect_node() + .await + .unwrap(); + + // Send the data requested to the shard: + for _ in 0..send_msgs { + node_tx + .unbounded_send(SentMessage::Binary(vec![1; bytes_per_msg])) + .unwrap(); + } + + // Wait a little for the shard to react and cut off the connection (or not): + tokio::time::sleep(Duration::from_millis(250)).await; + + // Has the connection been closed? + node_tx.is_closed() + } + + assert_eq!( + try_send_data(1000, 10, 1000).await, + false, + "shouldn't be closed; we didn't exceed 10x threshold" + ); + assert_eq!( + try_send_data(999, 10, 1000).await, + true, + "should be closed; we sent just over 10x the block threshold" + ); +} + /// Feeds will be disconnected if they can't receive messages quickly enough. #[tokio::test] async fn slow_feeds_are_disconnected() { diff --git a/backend/telemetry_shard/src/blocked_addrs.rs b/backend/telemetry_shard/src/blocked_addrs.rs index 6cfa193..e384141 100644 --- a/backend/telemetry_shard/src/blocked_addrs.rs +++ b/backend/telemetry_shard/src/blocked_addrs.rs @@ -15,9 +15,9 @@ // along with this program. If not, see . use std::collections::HashMap; -use std::time::{Duration, Instant}; use std::net::IpAddr; -use std::sync::{ Mutex, Arc }; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; /// Keep track of nodes that have been blocked. #[derive(Debug, Clone)] @@ -26,7 +26,7 @@ pub struct BlockedAddrs(Arc); #[derive(Debug)] struct BlockAddrsInner { block_duration: Duration, - inner: Mutex> + inner: Mutex>, } impl BlockedAddrs { @@ -35,7 +35,7 @@ impl BlockedAddrs { pub fn new(block_duration: Duration) -> BlockedAddrs { BlockedAddrs(Arc::new(BlockAddrsInner { block_duration, - inner: Mutex::new(HashMap::new()) + inner: Mutex::new(HashMap::new()), })) } @@ -52,8 +52,8 @@ impl BlockedAddrs { let mut map = self.0.inner.lock().unwrap(); let (reason, time) = match map.get(addr) { - Some(&(reason,time)) => (reason, time), - None => return None + Some(&(reason, time)) => (reason, time), + None => return None, }; if time + self.0.block_duration < Instant::now() { @@ -63,4 +63,4 @@ impl BlockedAddrs { Some(reason) } } -} \ No newline at end of file +} diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index d7e0a9c..9e54265 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -16,14 +16,15 @@ #[warn(missing_docs)] mod aggregator; +mod blocked_addrs; mod connection; mod json_message; mod real_ip; -mod blocked_addrs; use std::{collections::HashSet, net::IpAddr, time::Duration}; use aggregator::{Aggregator, FromWebsocket}; +use blocked_addrs::BlockedAddrs; use common::byte_size::ByteSize; use common::http_utils; use common::node_message; @@ -33,7 +34,6 @@ use http::Uri; use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; -use blocked_addrs::BlockedAddrs; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -79,7 +79,7 @@ struct Opts { /// How many seconds is a "/feed" connection that violates the '--max-node-data-per-second' /// value prevented from reconnecting to this shard for, in seconds. #[structopt(long, default_value = "600")] - node_block_seconds: u64 + node_block_seconds: u64, } #[tokio::main] @@ -118,10 +118,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { let real_addr = real_ip::real_ip(addr, req.headers()); if let Some(reason) = block_list.blocked_reason(&real_addr) { - return Ok(Response::builder() - .status(403) - .body(reason.into()) - .unwrap()) + return Ok(Response::builder().status(403).body(reason.into()).unwrap()); } Ok(http_utils::upgrade_to_websocket( @@ -136,7 +133,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { tx_to_aggregator, max_nodes_per_connection, bytes_per_second, - block_list + block_list, ) .await; log::info!("Closing /submit connection from {:?}", addr); @@ -167,7 +164,7 @@ async fn handle_node_websocket_connection( mut tx_to_aggregator: S, max_nodes_per_connection: usize, bytes_per_second: ByteSize, - block_list: BlockedAddrs + block_list: BlockedAddrs, ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index 1323fb0..4e95261 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -40,7 +40,7 @@ impl Default for ShardOpts { Self { max_nodes_per_connection: None, max_node_data_per_second: None, - node_block_seconds: None + node_block_seconds: None, } } } @@ -125,9 +125,7 @@ pub async fn start_server( // Append additional opts to the core command if let Some(val) = core_opts.feed_timeout { - core_command = core_command - .arg("--feed-timeout") - .arg(val.to_string()); + core_command = core_command.arg("--feed-timeout").arg(val.to_string()); } // Star the server