diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 2dbddc6..c6b6fc1 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1271,6 +1271,7 @@ dependencies = [ "http", "hyper", "log", + "num_cpus", "once_cell", "parking_lot", "primitive-types", @@ -1301,6 +1302,7 @@ dependencies = [ "http", "hyper", "log", + "num_cpus", "primitive-types", "serde", "serde_json", diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 9066e40..e41f93c 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -16,6 +16,7 @@ hex = "0.4.3" http = "0.2.4" hyper = "0.14.11" log = "0.4.14" +num_cpus = "1.13.0" once_cell = "1.8.0" parking_lot = "0.11.1" primitive-types = { version = "0.9.0", features = ["serde"] } diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 6ad9153..c8c21af 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -113,7 +113,7 @@ impl Aggregator { /// Return a sink that a feed can send messages into to be handled by the aggregator. pub fn subscribe_feed( &self, - ) -> impl Sink + Send + Sync + Unpin + 'static + ) -> (u64, impl Sink + Send + Sync + Unpin + 'static) { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: @@ -125,11 +125,11 @@ impl Aggregator { // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: - Box::pin(tx_to_aggregator.with(move |msg| async move { + (feed_conn_id, Box::pin(tx_to_aggregator.with(move |msg| async move { Ok(inner_loop::ToAggregator::FromFeedWebsocket( feed_conn_id.into(), msg, )) - })) + }))) } } diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 1088068..03320f6 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -19,7 +19,6 @@ mod feed_message; mod find_location; mod state; use std::str::FromStr; -use std::sync::atomic::AtomicUsize; use tokio::time::{Duration, Instant}; use aggregator::{ @@ -60,10 +59,13 @@ struct Opts { /// to a feed, the feed connection will be closed. #[structopt(long, default_value = "10")] feed_timeout: u64, + /// Number of worker threads to spawn. Defaults to the number of CPUs on the machine. + /// If "0" is given, use the number of CPUs available on the machine. + #[structopt(long)] + num_cpus: Option, } -#[tokio::main] -async fn main() { +fn main() { let opts = Opts::from_args(); SimpleLogger::new() @@ -73,9 +75,20 @@ async fn main() { log::info!("Starting Telemetry Core version: {}", VERSION); - if let Err(e) = start_server(opts).await { - log::error!("Error starting server: {}", e); - } + let num_cpus_to_use = opts.num_cpus + .and_then(|n| if n == 0 { None } else { Some(n) }) + .unwrap_or_else(|| num_cpus::get()); + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(num_cpus_to_use) + .build() + .unwrap() + .block_on(async { + if let Err(e) = start_server(opts).await { + log::error!("Error starting server: {}", e); + } + }); } /// Declare our routes and start the server. @@ -95,13 +108,14 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { Ok(http_utils::upgrade_to_websocket( req, move |ws_send, ws_recv| async move { - let tx_to_aggregator = aggregator.subscribe_feed(); + let (feed_id, tx_to_aggregator) = aggregator.subscribe_feed(); let (mut tx_to_aggregator, mut ws_send) = handle_feed_websocket_connection( ws_send, ws_recv, tx_to_aggregator, feed_timeout, + feed_id, ) .await; log::info!("Closing /feed connection from {:?}", addr); @@ -291,6 +305,7 @@ async fn handle_feed_websocket_connection( mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, feed_timeout: u64, + feed_id: u64 ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, @@ -364,34 +379,48 @@ where drop(send_closer_tx); // Kill the send task if this recv task ends tx_to_aggregator }); - +let mut i: u64 = 0; // Send messages to the feed: let send_handle = tokio::spawn(async move { 'outer: loop { + let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75)); + let msgs = tokio::select! { msgs = rx_from_aggregator_chunks.next() => msgs, _ = &mut send_closer_rx => { break } }; + // End the loop when connection from aggregator ends: let msgs = match msgs { Some(msgs) => msgs, None => break, }; -let total_val = unsafe { total.load(std::sync::atomic::Ordering::Relaxed) }; -if msgs.len() > total_val { - unsafe { total.compare_exchange(total_val, msgs.len(), std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed); }; - println!("Max msgs: {}", msgs.len()); +if feed_id == 1 { + i += 1; + println!("FEED #{}, msgs: {}", i, msgs.len()); + if i > 1000 { + log::error!("TESTING: close feed"); + break + } } + // End the loop when there are more than 10k messages queued up. + // This number is just picked as a fairly high limit that should account + // for many thousands of nodes on a chain. The higher this number is, the + // larger our channel storage and memory usage is liable to grow before the feed + // is dropped. + if msgs.len() > 100_000 { + log::warn!("Closing feed websocket that was too slow to keep up (too many messages buffered)"); + break 'outer; + } + // There is only one message type at the mo; bytes to send // to the websocket. collect them all up to dispatch in one shot. let all_msg_bytes = msgs.into_iter().map(|msg| match msg { ToFeedWebsocket::Bytes(bytes) => bytes, }); - // We have a deadline to send and flush messages. If the client isn't keeping up with our - // messages, the number we obtain from `ReadyChunksAll` will gradually increase and eventually - // we'll hit this deadline and the client will be booted. + // If the feed is too slow to receive the current batch of messages, we'll drop it. let message_send_deadline = Instant::now() + Duration::from_secs(feed_timeout); for bytes in all_msg_bytes { @@ -399,7 +428,7 @@ if msgs.len() > total_val { .await { Err(_) => { - log::warn!("Closing feed websocket that was too slow to keep up (1)"); + log::warn!("Closing feed websocket that was too slow to keep up (too slow to send messages)"); break 'outer; } Ok(Err(e)) => { @@ -411,7 +440,7 @@ if msgs.len() > total_val { } match tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await { Err(_) => { - log::warn!("Closing feed websocket that was too slow to keep up (2)"); + log::warn!("Closing feed websocket that was too slow to keep up (too slow to flush messages)"); break; } Ok(Err(e)) => { @@ -420,6 +449,8 @@ if msgs.len() > total_val { } Ok(_) => {} } + + debounce.await; } drop(recv_closer_tx); // Kill the recv task if this send task ends @@ -434,5 +465,3 @@ if msgs.len() > total_val { // loop ended; give socket back to parent: (tx_to_aggregator, ws_send) } - -static mut total: std::sync::atomic::AtomicUsize = AtomicUsize::new(0); diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index 9c0751a..7319da7 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -22,10 +22,10 @@ able to open a large number of connections and run some of the tests. Try running these: ```sh -sudo sysctl -w kern.maxfiles=50000 -sudo sysctl -w kern.maxfilesperproc=50000 -ulimit -n 50000 -sudo sysctl -w kern.ipc.somaxconn=50000 +sudo sysctl -w kern.maxfiles=100000 +sudo sysctl -w kern.maxfilesperproc=100000 +ulimit -n 100000 +sudo sysctl -w kern.ipc.somaxconn=100000 sudo sysctl -w kern.ipc.maxsockbuf=16777216 ``` */ @@ -580,6 +580,7 @@ async fn slow_feeds_are_disconnected() { // Timeout faster so the test can be quicker: CoreOpts { feed_timeout: Some(1), + ..Default::default() }, // Allow us to send more messages in more easily: ShardOpts { diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index 182568a..2f9d505 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -14,6 +14,7 @@ hex = "0.4.3" http = "0.2.4" hyper = "0.14.11" log = "0.4.14" +num_cpus = "1.13.0" primitive-types = { version = "0.9.0", features = ["serde"] } serde = { version = "1.0.126", features = ["derive"] } serde_json = "1.0.64" diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 9e54265..43159bd 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -80,10 +80,13 @@ struct Opts { /// value prevented from reconnecting to this shard for, in seconds. #[structopt(long, default_value = "600")] node_block_seconds: u64, + /// Number of worker threads to spawn. Defaults to the number of CPUs on the machine. + /// If "0" is given, use the number of CPUs available on the machine. + #[structopt(long)] + num_cpus: Option, } -#[tokio::main] -async fn main() { +fn main() { let opts = Opts::from_args(); SimpleLogger::new() @@ -93,9 +96,20 @@ async fn main() { log::info!("Starting Telemetry Shard version: {}", VERSION); - if let Err(e) = start_server(opts).await { - log::error!("Error starting server: {}", e); - } + let num_cpus_to_use = opts.num_cpus + .and_then(|n| if n == 0 { None } else { Some(n) }) + .unwrap_or_else(|| num_cpus::get()); + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(num_cpus_to_use) + .build() + .unwrap() + .block_on(async { + if let Err(e) = start_server(opts).await { + log::error!("Error starting server: {}", e); + } + }); } /// Declare our routes and start the server. diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index 4e95261..a51d0ed 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -20,11 +20,15 @@ use crate::server::{self, Command, Server}; /// Additional options to pass to the core command. pub struct CoreOpts { pub feed_timeout: Option, + pub num_cpus: Option, } impl Default for CoreOpts { fn default() -> Self { - Self { feed_timeout: None } + Self { + feed_timeout: None, + num_cpus: None + } } } @@ -33,6 +37,7 @@ pub struct ShardOpts { pub max_nodes_per_connection: Option, pub max_node_data_per_second: Option, pub node_block_seconds: Option, + pub num_cpus: Option, } impl Default for ShardOpts { @@ -41,6 +46,7 @@ impl Default for ShardOpts { max_nodes_per_connection: None, max_node_data_per_second: None, node_block_seconds: None, + num_cpus: None } } } @@ -114,6 +120,11 @@ pub async fn start_server( .arg("--node-block-seconds") .arg(val.to_string()); } + if let Some(val) = shard_opts.num_cpus { + shard_command = shard_command + .arg("--num-cpus") + .arg(val.to_string()); + } // Build the core command let mut core_command = std::env::var("TELEMETRY_CORE_BIN") @@ -127,6 +138,9 @@ pub async fn start_server( if let Some(val) = core_opts.feed_timeout { core_command = core_command.arg("--feed-timeout").arg(val.to_string()); } + if let Some(val) = core_opts.num_cpus { + core_command = core_command.arg("--num-cpus").arg(val.to_string()); + } // Star the server Server::start(server::StartOpts::ShardAndCore {