diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index ccec9a5..e223911 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -59,10 +59,11 @@ struct Opts { /// to a feed, the feed connection will be closed. #[structopt(long, default_value = "10")] feed_timeout: u64, - /// Number of worker threads to spawn. Defaults to the number of CPUs on the machine. - /// If "0" is given, use the number of CPUs available on the machine. - #[structopt(long)] - worker_threads: Option, + /// Number of worker threads to spawn. If "0" is given, use the number of CPUs available + /// on the machine. Note that the tokio runtime performance seems to degrade when this number + /// gets too high. + #[structopt(long, default_value = "8")] + worker_threads: usize, } fn main() { @@ -75,9 +76,10 @@ fn main() { log::info!("Starting Telemetry Core version: {}", VERSION); - let worker_threads = opts.worker_threads - .and_then(|n| if n == 0 { None } else { Some(n) }) - .unwrap_or_else(|| num_cpus::get()); + let worker_threads = match opts.worker_threads { + 0 => num_cpus::get(), + n => n + }; tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -307,7 +309,7 @@ async fn handle_feed_websocket_connection( mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, feed_timeout: u64, - feed_id: u64 + _feed_id: u64 // <- can be useful for debugging purposes. ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, @@ -381,7 +383,7 @@ where drop(send_closer_tx); // Kill the send task if this recv task ends tx_to_aggregator }); -let mut i: u64 = 0; + // Send messages to the feed: let send_handle = tokio::spawn(async move { 'outer: loop { @@ -398,16 +400,11 @@ let mut i: u64 = 0; None => break, }; -if feed_id == 1 { - i += 1; - println!("FEED #{}, msgs: {}", i, msgs.len()); -} - // End the loop when there are more than 10k messages queued up. - // This number is just picked as a fairly high limit that should account - // for many thousands of nodes on a chain. The higher this number is, the - // larger our channel storage and memory usage is liable to grow before the feed - // is dropped. - if msgs.len() > 100_000 { + // End the loop if we see more than 1k messages queued up to send out. + // This is either because the feed is being too slow to receive messages, + // or because the telemetry_core is under too much load and this task isn't + // getting enough time. + if msgs.len() > 1000 { log::warn!("Closing feed websocket that was too slow to keep up (too many messages buffered)"); break 'outer; } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 8158b24..357a83f 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -80,10 +80,11 @@ struct Opts { /// value prevented from reconnecting to this shard for, in seconds. #[structopt(long, default_value = "600")] node_block_seconds: u64, - /// Number of worker threads to spawn. Defaults to the number of CPUs on the machine. - /// If "0" is given, use the number of CPUs available on the machine. - #[structopt(long)] - worker_threads: Option, + /// Number of worker threads to spawn. If "0" is given, use the number of CPUs available + /// on the machine. Note that the tokio runtime performance seems to degrade when this number + /// gets too high. + #[structopt(long, default_value = "4")] + worker_threads: usize, } fn main() { @@ -96,9 +97,10 @@ fn main() { log::info!("Starting Telemetry Shard version: {}", VERSION); - let worker_threads = opts.worker_threads - .and_then(|n| if n == 0 { None } else { Some(n) }) - .unwrap_or_else(|| num_cpus::get()); + let worker_threads = match opts.worker_threads { + 0 => num_cpus::get(), + n => n + }; tokio::runtime::Builder::new_multi_thread() .enable_all()