default core/shard worker threads to 8/4 respectively

This commit is contained in:
James Wilson
2021-08-05 12:18:44 +01:00
parent 4da299bf76
commit bc75ebb068
2 changed files with 25 additions and 26 deletions
+16 -19
View File
@@ -59,10 +59,11 @@ struct Opts {
/// to a feed, the feed connection will be closed.
#[structopt(long, default_value = "10")]
feed_timeout: u64,
/// Number of worker threads to spawn. Defaults to the number of CPUs on the machine.
/// If "0" is given, use the number of CPUs available on the machine.
#[structopt(long)]
worker_threads: Option<usize>,
/// Number of worker threads to spawn. If "0" is given, use the number of CPUs available
/// on the machine. Note that the tokio runtime performance seems to degrade when this number
/// gets too high.
#[structopt(long, default_value = "8")]
worker_threads: usize,
}
fn main() {
@@ -75,9 +76,10 @@ fn main() {
log::info!("Starting Telemetry Core version: {}", VERSION);
let worker_threads = opts.worker_threads
.and_then(|n| if n == 0 { None } else { Some(n) })
.unwrap_or_else(|| num_cpus::get());
let worker_threads = match opts.worker_threads {
0 => num_cpus::get(),
n => n
};
tokio::runtime::Builder::new_multi_thread()
.enable_all()
@@ -307,7 +309,7 @@ async fn handle_feed_websocket_connection<S>(
mut ws_recv: http_utils::WsReceiver,
mut tx_to_aggregator: S,
feed_timeout: u64,
feed_id: u64
_feed_id: u64 // <- can be useful for debugging purposes.
) -> (S, http_utils::WsSender)
where
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
@@ -381,7 +383,7 @@ where
drop(send_closer_tx); // Kill the send task if this recv task ends
tx_to_aggregator
});
let mut i: u64 = 0;
// Send messages to the feed:
let send_handle = tokio::spawn(async move {
'outer: loop {
@@ -398,16 +400,11 @@ let mut i: u64 = 0;
None => break,
};
if feed_id == 1 {
i += 1;
println!("FEED #{}, msgs: {}", i, msgs.len());
}
// End the loop when there are more than 10k messages queued up.
// This number is just picked as a fairly high limit that should account
// for many thousands of nodes on a chain. The higher this number is, the
// larger our channel storage and memory usage is liable to grow before the feed
// is dropped.
if msgs.len() > 100_000 {
// End the loop if we see more than 1k messages queued up to send out.
// This is either because the feed is being too slow to receive messages,
// or because the telemetry_core is under too much load and this task isn't
// getting enough time.
if msgs.len() > 1000 {
log::warn!("Closing feed websocket that was too slow to keep up (too many messages buffered)");
break 'outer;
}
+9 -7
View File
@@ -80,10 +80,11 @@ struct Opts {
/// value prevented from reconnecting to this shard for, in seconds.
#[structopt(long, default_value = "600")]
node_block_seconds: u64,
/// Number of worker threads to spawn. Defaults to the number of CPUs on the machine.
/// If "0" is given, use the number of CPUs available on the machine.
#[structopt(long)]
worker_threads: Option<usize>,
/// Number of worker threads to spawn. If "0" is given, use the number of CPUs available
/// on the machine. Note that the tokio runtime performance seems to degrade when this number
/// gets too high.
#[structopt(long, default_value = "4")]
worker_threads: usize,
}
fn main() {
@@ -96,9 +97,10 @@ fn main() {
log::info!("Starting Telemetry Shard version: {}", VERSION);
let worker_threads = opts.worker_threads
.and_then(|n| if n == 0 { None } else { Some(n) })
.unwrap_or_else(|| num_cpus::get());
let worker_threads = match opts.worker_threads {
0 => num_cpus::get(),
n => n
};
tokio::runtime::Builder::new_multi_thread()
.enable_all()