diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7061f61..21279c2 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -12,3 +12,4 @@ opt-level = 3 [profile.release] lto = true panic = "abort" +debug = true \ No newline at end of file diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 4201bff..6b91a08 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -16,6 +16,38 @@ sudo sysctl -w kern.ipc.maxsockbuf=16777216 In general, if you run into issues, it may be better to run this on a linux box; MacOS seems to hit limits quicker in general. + +We can profile execution using cargo-flamegraph to learn more about how long +we spend in each function. + +The main thing to do is install it: + +```sh +cargo install flamegraph +``` + +Now, start processes running ourselves (so that we can use flamegraph on them): + +In one terminal (in the `backend` folder of this repo): + +```sh +sudo cargo flamegraph -o telemetry_core.svg --bin telemetry_core +``` + +And in another: + +```sh +sudo cargo flamegraph -o telemetry_shard.svg --bin telemetry_shard +``` + +And then, in a third terminal, we can run our general soak test against these: + +```sh +SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' \ +TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' \ +TELEMETRY_FEED_HOST='127.0.0.1:8000' \ +cargo test -- soak_test --ignored --nocapture +``` */ use futures::{ StreamExt }; diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index 113f1fa..0ef9d4a 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -58,6 +58,11 @@ pub enum Server { core: CoreProcess, }, ConnectToExistingMode { + /// The hosts that we can connect to to submit things. + submit_hosts: Vec, + /// Which host do we use next (we'll cycle around them + /// as shards are "added"). + next_submit_host_idx: usize, /// Shard processes that we can connect to. shards: DenseMap, /// Core process that we can connect to. @@ -151,13 +156,27 @@ impl Server { /// Connect a new shard and return a process that you can interact with: pub async fn add_shard(&mut self) -> Result { match self { - // Always get back the same "shard" in virtual mode; it's just the core anyway. + // Always get back the same "virtual" shard; we're always just talking to the core anyway. Server::SingleProcessMode { virtual_shard, .. } => { Ok(virtual_shard.id) }, - // We're connecting to existing things; nothing sane to hand back. - Server::ConnectToExistingMode { .. } => { - Err(Error::CannotAddShard) + // We're connecting to an existing process. Find the next host we've been told about + // round-robin style and use that as our new virtual shard. + Server::ConnectToExistingMode { submit_hosts, next_submit_host_idx, shards, .. } => { + let host = match submit_hosts.get(*next_submit_host_idx % submit_hosts.len()) { + Some(host) => host, + None => return Err(Error::CannotAddShard) + }; + *next_submit_host_idx += 1; + + let pid = shards.add_with(|id| Process { + id, + host: format!("{}", host), + handle: None, + _channel_type: PhantomData, + }); + + Ok(pid) }, // Start a new process and return that. Server::ShardAndCoreMode { shard_command, shards, core } => { @@ -234,18 +253,10 @@ impl Server { } }, StartOpts::ConnectToExisting { feed_host, submit_hosts } => { - let mut shards = DenseMap::new(); - for host in submit_hosts { - shards.add_with(|id| Process { - id, - host, - handle: None, - _channel_type: PhantomData, - }); - } - Server::ConnectToExistingMode { - shards, + submit_hosts, + next_submit_host_idx: 0, + shards: DenseMap::new(), core: Process { id: ProcessId(0), host: feed_host, diff --git a/backend/test_utils/src/workspace/commands.rs b/backend/test_utils/src/workspace/commands.rs index df7f97a..595b2b2 100644 --- a/backend/test_utils/src/workspace/commands.rs +++ b/backend/test_utils/src/workspace/commands.rs @@ -21,7 +21,7 @@ fn telemetry_command(bin: &'static str, release_mode: bool) -> Result Server { - + // Start to a single process: if let Ok(bin) = std::env::var("TELEMETRY_BIN") { return Server::start(server::StartOpts::SingleProcess { command: Command::new(bin) }).await.unwrap(); } + // Connect to a running instance: + if let Ok(feed_host) = std::env::var("TELEMETRY_FEED_HOST") { + let feed_host = feed_host.trim().into(); + let submit_hosts: Vec<_> = std::env::var("TELEMETRY_SUBMIT_HOSTS") + .map(|var| var.split(",").map(|var| var.trim().into()).collect()) + .unwrap_or(Vec::new()); + return Server::start(server::StartOpts::ConnectToExisting { + feed_host, + submit_hosts, + }).await.unwrap(); + } + + // Start a shard and core process: let shard_command = std::env::var("TELEMETRY_SHARD_BIN") .map(|val| Command::new(val)) .unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("must be in rust workspace to run shard command")); - let core_command = std::env::var("TELEMETRY_CORE_BIN") .map(|val| Command::new(val)) .unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("must be in rust workspace to run core command")); - Server::start(server::StartOpts::ShardAndCore { shard_command, core_command