From 078827075614ea4e3fb77e1ec4e737fec108a96f Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 6 Aug 2021 14:22:01 +0100 Subject: [PATCH] Add 'subscribe' benchmark --- backend/Cargo.lock | 285 ++++++++++++++++++ backend/telemetry_core/Cargo.toml | 5 + backend/telemetry_core/benches/subscribe.rs | 121 ++++++++ backend/telemetry_core/src/main.rs | 5 +- backend/telemetry_core/tests/e2e_tests.rs | 16 +- backend/telemetry_core/tests/soak_tests.rs | 12 +- backend/telemetry_shard/src/main.rs | 5 +- backend/test_utils/src/server/channels.rs | 2 +- backend/test_utils/src/server/server.rs | 123 +++++--- .../test_utils/src/workspace/start_server.rs | 30 +- 10 files changed, 530 insertions(+), 74 deletions(-) create mode 100644 backend/telemetry_core/benches/subscribe.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c6b6fc1..78e230a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -97,6 +97,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bstr" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90682c8d613ad3373e66de8c6411e0ae2ab2571e879d2efbf73558cc66f21279" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bumpalo" version = "3.7.0" @@ -121,6 +133,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +[[package]] +name = "cast" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" +dependencies = [ + "rustc_version", +] + [[package]] name = "cc" version = "1.0.68" @@ -225,12 +246,116 @@ dependencies = [ "libc", ] +[[package]] +name = "criterion" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1604dafd25fba2fe2d5895a9da139f8dc9b319a5fe5354ca137cbbce4e178d10" +dependencies = [ + "atty", + "cast", + "clap", + "criterion-plot", + "csv", + "futures", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_cbor", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" +dependencies = [ + "cast", + "itertools", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crunchy" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "digest" version = "0.9.0" @@ -240,6 +365,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + [[package]] name = "encoding_rs" version = "0.8.28" @@ -432,6 +563,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" + [[package]] name = "hashbrown" version = "0.9.1" @@ -587,6 +724,15 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +[[package]] +name = "itertools" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.7" @@ -644,6 +790,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +[[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -734,6 +889,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "opaque-debug" version = "0.3.0" @@ -834,6 +995,34 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" +[[package]] +name = "plotters" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c" + +[[package]] +name = "plotters-svg" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9" +dependencies = [ + "plotters-backend", +] + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -952,6 +1141,31 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.2.9" @@ -972,6 +1186,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" + [[package]] name = "regex-syntax" version = "0.6.25" @@ -1034,12 +1254,30 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "ryu" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.19" @@ -1079,6 +1317,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" + [[package]] name = "serde" version = "1.0.126" @@ -1088,6 +1332,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.126" @@ -1266,6 +1520,7 @@ dependencies = [ "bincode", "bytes", "common", + "criterion", "futures", "hex", "http", @@ -1394,6 +1649,16 @@ dependencies = [ "libc", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.2.0" @@ -1582,6 +1847,17 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" @@ -1692,6 +1968,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index e41f93c..373c280 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -35,3 +35,8 @@ tokio-util = { version = "0.6", features = ["compat"] } [dev-dependencies] shellwords = "1.1.0" test_utils = { path = "../test_utils" } +criterion = { version = "0.3.4", features = ["async", "async_tokio"] } + +[[bench]] +name = "subscribe" +harness = false \ No newline at end of file diff --git a/backend/telemetry_core/benches/subscribe.rs b/backend/telemetry_core/benches/subscribe.rs new file mode 100644 index 0000000..275901e --- /dev/null +++ b/backend/telemetry_core/benches/subscribe.rs @@ -0,0 +1,121 @@ +use common::node_types::BlockHash; +use criterion::{criterion_group, criterion_main, Criterion}; +use serde_json::json; +use test_utils::workspace::{ start_server, ServerOpts, CoreOpts, ShardOpts }; +use test_utils::feed_message_de::FeedMessage; +use tokio::runtime::Runtime; +use std::time::{Duration, Instant}; + +/// This benchmark roughly times the subscribe function. Note that there's a lot of +/// overhead in other areas, so even with the entire subscribe function commented out +/// By benchmark timings are ~50ms (whereas they are ~320ms with the version of the +/// subscribe handler at the time of writing). +/// +/// If you want to use this benchmark, it's therefore worth commenting out the subscribe +/// logic entirely and running this to give yourself a "baseline". +pub fn benchmark_subscribe_speed(c: &mut Criterion) { + const NUMBER_OF_FEEDS: usize = 100; + const NUMBER_OF_NODES: usize = 10_000; + + let rt = Runtime::new().expect("tokio runtime should start"); + + c.bench_function( + "subscribe speed: time till pong", + move |b| b.to_async(&rt).iter_custom(|iters| async move { + + // Start a server: + let mut server = start_server( + ServerOpts { + release_mode: true, + log_output: false, + }, + CoreOpts { + ..Default::default() + }, + ShardOpts { + max_nodes_per_connection: Some(usize::MAX), + max_node_data_per_second: Some(usize::MAX), + worker_threads: Some(2), + ..Default::default() + } + ).await; + let shard_id = server.add_shard().await.unwrap(); + + // Connect a shard: + let (mut node_tx, _) = server + .get_shard(shard_id) + .unwrap() + .connect_node() + .await + .expect("node can connect"); + + // Add a bunch of actual nodes on the same chain: + for n in 0..NUMBER_OF_NODES { + node_tx.send_json_text(json!({ + "id":n, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Polkadot", // No limit to #nodes on this network. + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Node {}", n), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })).unwrap(); + } + + // Give those messages a chance to be handled. This, of course, + // assumes that those messages _can_ be handled this quickly. If not, + // we'll start to skew benchmark results with the "time taklen to add node". + tokio::time::sleep(Duration::from_millis(250)).await; + + // Now, see how quickly a feed is subscribed. Criterion controls the number of + // iters performed here, but a lot of the time that number is "1". + let mut total_time = Duration::ZERO; + for _n in 0..iters { + + // Start a bunch of feeds: + let mut feeds = server + .get_core() + .connect_multiple_feeds(NUMBER_OF_FEEDS) + .await + .expect("feeds can connect"); + + // Subscribe every feed to the chain: + for (feed_tx, _) in feeds.iter() { + feed_tx.send_command("subscribe", "Polkadot").unwrap(); + } + + // Then, Ping a feed: + feeds[0].0.send_command("ping", "Finished!").unwrap(); + let finished = FeedMessage::Pong { msg: "Finished!".to_owned() }; + + // Wait and see how long it takes to get a pong back: + let start = Instant::now(); + loop { + let msgs = feeds[0].1.recv_feed_messages_once().await.unwrap(); + if msgs.iter().find(|&m| m == &finished).is_some() { + break; + } + } + total_time += start.elapsed(); + + // shut down the feeds + for (mut feed_tx, _) in feeds { + feed_tx.close().await.unwrap(); + } + } + + // The total time spent waiting for subscribes: + total_time + }) + ); +} + +criterion_group!(benches, benchmark_subscribe_speed); +criterion_main!(benches); diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 90621a1..0ac46ab 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -60,9 +60,8 @@ struct Opts { #[structopt(long, default_value = "10")] feed_timeout: u64, /// Number of worker threads to spawn. If "0" is given, use the number of CPUs available - /// on the machine. Note that the tokio runtime performance seems to degrade when this number - /// gets too high. - #[structopt(long, default_value = "8")] + /// on the machine. + #[structopt(long, default_value = "0")] worker_threads: usize, } diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index 7319da7..ee426a5 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -37,7 +37,7 @@ use std::time::Duration; use test_utils::{ assert_contains_matches, feed_message_de::{FeedMessage, NodeDetails}, - workspace::{start_server, start_server_debug, CoreOpts, ShardOpts}, + workspace::{start_server, start_server_debug, ServerOpts, CoreOpts, ShardOpts}, }; /// The simplest test we can run; the main benefit of this test (since we check similar) @@ -68,7 +68,7 @@ async fn feed_ping_responded_to_with_pong() { let server = start_server_debug().await; // Connect a feed: - let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); + let (feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); // Ping it: feed_tx.send_command("ping", "hello!").unwrap(); @@ -283,7 +283,7 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { .unwrap(); // Connect a feed and subscribe to the above chain: - let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); + let (feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); feed_tx .send_command("subscribe", "Initial chain name") .unwrap(); @@ -453,7 +453,7 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { } // Connect a feed - let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); + let (feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1"); @@ -526,7 +526,7 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { async fn node_banned_if_it_sends_too_much_data() { async fn try_send_data(max_bytes: usize, send_msgs: usize, bytes_per_msg: usize) -> bool { let mut server = start_server( - false, + ServerOpts::default(), CoreOpts::default(), ShardOpts { // Remember, this is (currently) averaged over the last 10 seconds, @@ -576,7 +576,7 @@ async fn node_banned_if_it_sends_too_much_data() { #[tokio::test] async fn slow_feeds_are_disconnected() { let mut server = start_server( - false, + ServerOpts::default(), // Timeout faster so the test can be quicker: CoreOpts { feed_timeout: Some(1), @@ -669,7 +669,7 @@ async fn slow_feeds_are_disconnected() { #[tokio::test] async fn max_nodes_per_connection_is_enforced() { let mut server = start_server( - false, + ServerOpts::default(), CoreOpts::default(), // Limit max nodes per connection to 2; any other msgs should be ignored. ShardOpts { @@ -689,7 +689,7 @@ async fn max_nodes_per_connection_is_enforced() { .unwrap(); // Connect a feed. - let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); + let (feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); // We'll send these messages from the node: let json_msg = |n| { diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 8f0ed0a..34e8378 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -42,7 +42,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; -use test_utils::workspace::{start_server, CoreOpts, ShardOpts}; +use test_utils::workspace::{start_server, ServerOpts, CoreOpts, ShardOpts}; /// A configurable soak_test runner. Configure by providing the expected args as /// an environment variable. One example to run this test is: @@ -74,7 +74,10 @@ pub async fn soak_test() { /// the results should be pretty reproducible. async fn run_soak_test(opts: SoakTestOpts) { let mut server = start_server( - true, + ServerOpts { + release_mode: true, + ..Default::default() + }, CoreOpts { worker_threads: opts.core_worker_threads, ..Default::default() @@ -257,7 +260,10 @@ pub async fn realistic_soak_test() { /// circumstances async fn run_realistic_soak_test(opts: SoakTestOpts) { let mut server = start_server( - true, + ServerOpts { + release_mode: true, + ..Default::default() + }, CoreOpts { worker_threads: opts.core_worker_threads, ..Default::default() diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 357a83f..27b464b 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -81,9 +81,8 @@ struct Opts { #[structopt(long, default_value = "600")] node_block_seconds: u64, /// Number of worker threads to spawn. If "0" is given, use the number of CPUs available - /// on the machine. Note that the tokio runtime performance seems to degrade when this number - /// gets too high. - #[structopt(long, default_value = "4")] + /// on the machine. + #[structopt(long, default_value = "0")] worker_threads: usize, } diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index ef932b2..4f7abbd 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -173,7 +173,7 @@ impl FeedSender { /// Send a command into the feed. A command consists of a string /// "command" part, and another string "parameter" part. pub fn send_command>( - &mut self, + &self, command: S, param: S, ) -> Result<(), ws_client::SendError> { diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index 70d0cb7..ac00af5 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -33,6 +33,8 @@ pub enum StartOpts { /// Command to run to start the process. /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. command: Command, + /// Log output from started processes to stderr? + log_output: bool, }, /// Start a core process with a `/feed` andpoint as well as (optionally) /// multiple shard processes with `/submit` endpoints. @@ -43,6 +45,8 @@ pub enum StartOpts { /// Command to run to start a telemetry core process. /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. core_command: Command, + /// Log output from started processes to stderr? + log_output: bool, }, /// Connect to existing process(es). ConnectToExisting { @@ -52,26 +56,32 @@ pub enum StartOpts { /// Where is the process that we can subscribe to the `/feed` of? /// Eg: `127.0.0.1:3000` feed_host: String, + /// Log output from started processes to stderr? + log_output: bool, }, } /// This represents a telemetry server. It can be in different modes /// depending on how it was started, but the interface is similar in every case /// so that tests are somewhat compatible with multiple configurations. -pub enum Server { +pub struct Server { + /// Should we log output from the processes we start? + log_output: bool, + /// Core process that we can connect to. + core: CoreProcess, + /// Things that vary based on the mode we are in. + mode: ServerMode +} +pub enum ServerMode { SingleProcessMode { /// A virtual shard that we can hand out. virtual_shard: ShardProcess, - /// Core process that we can connect to. - core: CoreProcess, }, ShardAndCoreMode { /// Command to run to start a new shard. shard_command: Command, /// Shard processes that we can connect to. shards: DenseMap, - /// Core process that we can connect to. - core: CoreProcess, }, ConnectToExistingMode { /// The hosts that we can connect to to submit things. @@ -81,8 +91,6 @@ pub enum Server { next_submit_host_idx: usize, /// Shard processes that we can connect to. shards: DenseMap, - /// Core process that we can connect to. - core: CoreProcess, }, } @@ -108,27 +116,23 @@ pub enum Error { impl Server { pub fn get_core(&self) -> &CoreProcess { - match self { - Server::SingleProcessMode { core, .. } => core, - Server::ShardAndCoreMode { core, .. } => core, - Server::ConnectToExistingMode { core, .. } => core, - } + &self.core } pub fn get_shard(&self, id: ProcessId) -> Option<&ShardProcess> { - match self { - Server::SingleProcessMode { virtual_shard, .. } => Some(virtual_shard), - Server::ShardAndCoreMode { shards, .. } => shards.get(id), - Server::ConnectToExistingMode { shards, .. } => shards.get(id), + match &self.mode { + ServerMode::SingleProcessMode { virtual_shard, .. } => Some(virtual_shard), + ServerMode::ShardAndCoreMode { shards, .. } => shards.get(id), + ServerMode::ConnectToExistingMode { shards, .. } => shards.get(id), } } pub async fn kill_shard(&mut self, id: ProcessId) -> bool { - let shard = match self { + let shard = match &mut self.mode { // Can't remove the pretend shard: - Server::SingleProcessMode { .. } => return false, - Server::ShardAndCoreMode { shards, .. } => shards.remove(id), - Server::ConnectToExistingMode { shards, .. } => shards.remove(id), + ServerMode::SingleProcessMode { .. } => return false, + ServerMode::ShardAndCoreMode { shards, .. } => shards.remove(id), + ServerMode::ConnectToExistingMode { shards, .. } => shards.remove(id), }; let shard = match shard { @@ -151,10 +155,11 @@ impl Server { // Spawn so we don't need to await cleanup if we don't care. // Run all kill futs simultaneously. let handle = tokio::spawn(async move { - let (core, shards) = match self { - Server::SingleProcessMode { core, .. } => (core, DenseMap::new()), - Server::ShardAndCoreMode { core, shards, .. } => (core, shards), - Server::ConnectToExistingMode { core, shards, .. } => (core, shards), + let core = self.core; + let shards = match self.mode { + ServerMode::SingleProcessMode { .. } => DenseMap::new(), + ServerMode::ShardAndCoreMode { shards, .. } => shards, + ServerMode::ConnectToExistingMode { shards, .. } => shards, }; let shard_kill_futs = shards.into_iter().map(|(_, s)| s.kill()); @@ -167,12 +172,12 @@ impl Server { /// Connect a new shard and return a process that you can interact with: pub async fn add_shard(&mut self) -> Result { - match self { + match &mut self.mode { // Always get back the same "virtual" shard; we're always just talking to the core anyway. - Server::SingleProcessMode { virtual_shard, .. } => Ok(virtual_shard.id), + ServerMode::SingleProcessMode { virtual_shard, .. } => Ok(virtual_shard.id), // We're connecting to an existing process. Find the next host we've been told about // round-robin style and use that as our new virtual shard. - Server::ConnectToExistingMode { + ServerMode::ConnectToExistingMode { submit_hosts, next_submit_host_idx, shards, @@ -194,13 +199,12 @@ impl Server { Ok(pid) } // Start a new process and return that. - Server::ShardAndCoreMode { + ServerMode::ShardAndCoreMode { shard_command, shards, - core, } => { // Where is the URI we'll want to submit things to? - let core_shard_submit_uri = format!("http://{}/shard_submit", core.host); + let core_shard_submit_uri = format!("http://{}/shard_submit", self.core.host); let mut shard_cmd: TokioCommand = shard_command.clone().into(); shard_cmd @@ -233,7 +237,11 @@ impl Server { // Since we're piping stdout from the child process, we need somewhere for it to go // else the process will get stuck when it tries to produce output: - utils::drain(child_stdout, tokio::io::stderr()); + if self.log_output { + utils::drain(child_stdout, tokio::io::stderr()); + } else { + utils::drain(child_stdout, tokio::io::sink()); + } let pid = shards.add_with(|id| Process { id, @@ -250,43 +258,54 @@ impl Server { /// Start a server. pub async fn start(opts: StartOpts) -> Result { let server = match opts { - StartOpts::SingleProcess { command } => { - let core_process = Server::start_core(command).await?; + StartOpts::SingleProcess { command, log_output } => { + let core_process = Server::start_core(log_output, command).await?; let virtual_shard_host = core_process.host.clone(); - Server::SingleProcessMode { + Server { + log_output, core: core_process, - virtual_shard: Process { - id: ProcessId(0), - host: virtual_shard_host, - handle: None, - _channel_type: PhantomData, - }, + mode: ServerMode::SingleProcessMode { + virtual_shard: Process { + id: ProcessId(0), + host: virtual_shard_host, + handle: None, + _channel_type: PhantomData, + }, + } } } StartOpts::ShardAndCore { core_command, shard_command, + log_output } => { - let core_process = Server::start_core(core_command).await?; - Server::ShardAndCoreMode { + let core_process = Server::start_core(log_output, core_command).await?; + Server { + log_output, core: core_process, - shard_command, - shards: DenseMap::new(), + mode: ServerMode::ShardAndCoreMode { + shard_command, + shards: DenseMap::new(), + } } } StartOpts::ConnectToExisting { feed_host, submit_hosts, - } => Server::ConnectToExistingMode { - submit_hosts, - next_submit_host_idx: 0, - shards: DenseMap::new(), + log_output + } => Server { + log_output, core: Process { id: ProcessId(0), host: feed_host, handle: None, _channel_type: PhantomData, }, + mode: ServerMode::ConnectToExistingMode { + submit_hosts, + next_submit_host_idx: 0, + shards: DenseMap::new(), + } }, }; @@ -294,7 +313,7 @@ impl Server { } /// Start up a core process and return it. - async fn start_core(command: Command) -> Result { + async fn start_core(log_output: bool, command: Command) -> Result { let mut tokio_core_cmd: TokioCommand = command.into(); let mut child = tokio_core_cmd .arg("--listen") @@ -314,7 +333,11 @@ impl Server { // Since we're piping stdout from the child process, we need somewhere for it to go // else the process will get stuck when it tries to produce output: - utils::drain(child_stdout, tokio::io::stderr()); + if log_output { + utils::drain(child_stdout, tokio::io::stderr()); + } else { + utils::drain(child_stdout, tokio::io::sink()); + } let core_process = Process { id: ProcessId(0), diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index 7630bda..d34c7dc 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -17,6 +17,21 @@ use super::commands; use crate::server::{self, Command, Server}; +/// Options for the server +pub struct ServerOpts { + pub release_mode: bool, + pub log_output: bool +} + +impl Default for ServerOpts { + fn default() -> Self { + Self { + release_mode: false, + log_output: true, + } + } +} + /// Additional options to pass to the core command. pub struct CoreOpts { pub feed_timeout: Option, @@ -69,7 +84,7 @@ impl Default for ShardOpts { /// - `TELEMETRY_FEED_HOST` - host to connect to for feeds (eg 127.0.0.1:3000) /// pub async fn start_server( - release_mode: bool, + server_opts: ServerOpts, core_opts: CoreOpts, shard_opts: ShardOpts, ) -> Server { @@ -77,6 +92,7 @@ pub async fn start_server( if let Ok(bin) = std::env::var("TELEMETRY_BIN") { return Server::start(server::StartOpts::SingleProcess { command: Command::new(bin), + log_output: server_opts.log_output }) .await .unwrap(); @@ -91,6 +107,7 @@ pub async fn start_server( return Server::start(server::StartOpts::ConnectToExisting { feed_host, submit_hosts, + log_output: server_opts.log_output }) .await .unwrap(); @@ -100,7 +117,7 @@ pub async fn start_server( let mut shard_command = std::env::var("TELEMETRY_SHARD_BIN") .map(|val| Command::new(val)) .unwrap_or_else(|_| { - commands::cargo_run_telemetry_shard(release_mode) + commands::cargo_run_telemetry_shard(server_opts.release_mode) .expect("must be in rust workspace to run shard command") }); @@ -130,7 +147,7 @@ pub async fn start_server( let mut core_command = std::env::var("TELEMETRY_CORE_BIN") .map(|val| Command::new(val)) .unwrap_or_else(|_| { - commands::cargo_run_telemetry_core(release_mode) + commands::cargo_run_telemetry_core(server_opts.release_mode) .expect("must be in rust workspace to run core command") }); @@ -142,10 +159,11 @@ pub async fn start_server( core_command = core_command.arg("--worker-threads").arg(val.to_string()); } - // Star the server + // Start the server Server::start(server::StartOpts::ShardAndCore { shard_command, core_command, + log_output: server_opts.log_output }) .await .unwrap() @@ -153,10 +171,10 @@ pub async fn start_server( /// Start a telemetry core server in debug mode. see [`start_server`] for details. pub async fn start_server_debug() -> Server { - start_server(false, CoreOpts::default(), ShardOpts::default()).await + start_server(ServerOpts::default(), CoreOpts::default(), ShardOpts::default()).await } /// Start a telemetry core server in release mode. see [`start_server`] for details. pub async fn start_server_release() -> Server { - start_server(true, CoreOpts::default(), ShardOpts::default()).await + start_server(ServerOpts::default(), CoreOpts::default(), ShardOpts::default()).await }