mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-30 14:18:01 +00:00
Allow tests to hook up to running process to help with profiling
This commit is contained in:
@@ -12,3 +12,4 @@ opt-level = 3
|
||||
[profile.release]
|
||||
lto = true
|
||||
panic = "abort"
|
||||
debug = true
|
||||
@@ -16,6 +16,38 @@ sudo sysctl -w kern.ipc.maxsockbuf=16777216
|
||||
|
||||
In general, if you run into issues, it may be better to run this on a linux
|
||||
box; MacOS seems to hit limits quicker in general.
|
||||
|
||||
We can profile execution using cargo-flamegraph to learn more about how long
|
||||
we spend in each function.
|
||||
|
||||
The main thing to do is install it:
|
||||
|
||||
```sh
|
||||
cargo install flamegraph
|
||||
```
|
||||
|
||||
Now, start processes running ourselves (so that we can use flamegraph on them):
|
||||
|
||||
In one terminal (in the `backend` folder of this repo):
|
||||
|
||||
```sh
|
||||
sudo cargo flamegraph -o telemetry_core.svg --bin telemetry_core
|
||||
```
|
||||
|
||||
And in another:
|
||||
|
||||
```sh
|
||||
sudo cargo flamegraph -o telemetry_shard.svg --bin telemetry_shard
|
||||
```
|
||||
|
||||
And then, in a third terminal, we can run our general soak test against these:
|
||||
|
||||
```sh
|
||||
SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' \
|
||||
TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' \
|
||||
TELEMETRY_FEED_HOST='127.0.0.1:8000' \
|
||||
cargo test -- soak_test --ignored --nocapture
|
||||
```
|
||||
*/
|
||||
|
||||
use futures::{ StreamExt };
|
||||
|
||||
@@ -58,6 +58,11 @@ pub enum Server {
|
||||
core: CoreProcess,
|
||||
},
|
||||
ConnectToExistingMode {
|
||||
/// The hosts that we can connect to to submit things.
|
||||
submit_hosts: Vec<String>,
|
||||
/// Which host do we use next (we'll cycle around them
|
||||
/// as shards are "added").
|
||||
next_submit_host_idx: usize,
|
||||
/// Shard processes that we can connect to.
|
||||
shards: DenseMap<ProcessId, ShardProcess>,
|
||||
/// Core process that we can connect to.
|
||||
@@ -151,13 +156,27 @@ impl Server {
|
||||
/// Connect a new shard and return a process that you can interact with:
|
||||
pub async fn add_shard(&mut self) -> Result<ProcessId, Error> {
|
||||
match self {
|
||||
// Always get back the same "shard" in virtual mode; it's just the core anyway.
|
||||
// Always get back the same "virtual" shard; we're always just talking to the core anyway.
|
||||
Server::SingleProcessMode { virtual_shard, .. } => {
|
||||
Ok(virtual_shard.id)
|
||||
},
|
||||
// We're connecting to existing things; nothing sane to hand back.
|
||||
Server::ConnectToExistingMode { .. } => {
|
||||
Err(Error::CannotAddShard)
|
||||
// We're connecting to an existing process. Find the next host we've been told about
|
||||
// round-robin style and use that as our new virtual shard.
|
||||
Server::ConnectToExistingMode { submit_hosts, next_submit_host_idx, shards, .. } => {
|
||||
let host = match submit_hosts.get(*next_submit_host_idx % submit_hosts.len()) {
|
||||
Some(host) => host,
|
||||
None => return Err(Error::CannotAddShard)
|
||||
};
|
||||
*next_submit_host_idx += 1;
|
||||
|
||||
let pid = shards.add_with(|id| Process {
|
||||
id,
|
||||
host: format!("{}", host),
|
||||
handle: None,
|
||||
_channel_type: PhantomData,
|
||||
});
|
||||
|
||||
Ok(pid)
|
||||
},
|
||||
// Start a new process and return that.
|
||||
Server::ShardAndCoreMode { shard_command, shards, core } => {
|
||||
@@ -234,18 +253,10 @@ impl Server {
|
||||
}
|
||||
},
|
||||
StartOpts::ConnectToExisting { feed_host, submit_hosts } => {
|
||||
let mut shards = DenseMap::new();
|
||||
for host in submit_hosts {
|
||||
shards.add_with(|id| Process {
|
||||
id,
|
||||
host,
|
||||
handle: None,
|
||||
_channel_type: PhantomData,
|
||||
});
|
||||
}
|
||||
|
||||
Server::ConnectToExistingMode {
|
||||
shards,
|
||||
submit_hosts,
|
||||
next_submit_host_idx: 0,
|
||||
shards: DenseMap::new(),
|
||||
core: Process {
|
||||
id: ProcessId(0),
|
||||
host: feed_host,
|
||||
|
||||
@@ -21,7 +21,7 @@ fn telemetry_command(bin: &'static str, release_mode: bool) -> Result<Command, s
|
||||
let mut workspace_dir = try_find_workspace_dir()?;
|
||||
workspace_dir.push("Cargo.toml");
|
||||
|
||||
let mut cmd = Command::new("cargo").arg("run");
|
||||
let mut cmd = Command::new("cargo").arg("flamegraph");
|
||||
|
||||
// Release mode?
|
||||
if release_mode {
|
||||
|
||||
@@ -11,23 +11,40 @@ use crate::server::{self, Server, Command};
|
||||
/// - `TELEMETRY_SHARD_BIN` - path to telemetry_shard binary
|
||||
/// - `TELEMETRY_CORE_BIN` - path to telemetry_core binary
|
||||
///
|
||||
/// Whatever is not provided will be substituted with a `cargo run` variant instead.
|
||||
/// (Whatever is not provided will be substituted with a `cargo run` variant instead)
|
||||
///
|
||||
/// Or alternately alternately, we can connect to a running instance by providing:
|
||||
///
|
||||
/// - `TELEMETRY_SUBMIT_HOSTS` - hosts (comma separated) to connect to for telemetry `/submit`s.
|
||||
/// - `TELEMETRY_FEED_HOST` - host to connect to for feeds (eg 127.0.0.1:3000)
|
||||
///
|
||||
pub async fn start_server(release_mode: bool) -> Server {
|
||||
|
||||
// Start to a single process:
|
||||
if let Ok(bin) = std::env::var("TELEMETRY_BIN") {
|
||||
return Server::start(server::StartOpts::SingleProcess {
|
||||
command: Command::new(bin)
|
||||
}).await.unwrap();
|
||||
}
|
||||
|
||||
// Connect to a running instance:
|
||||
if let Ok(feed_host) = std::env::var("TELEMETRY_FEED_HOST") {
|
||||
let feed_host = feed_host.trim().into();
|
||||
let submit_hosts: Vec<_> = std::env::var("TELEMETRY_SUBMIT_HOSTS")
|
||||
.map(|var| var.split(",").map(|var| var.trim().into()).collect())
|
||||
.unwrap_or(Vec::new());
|
||||
return Server::start(server::StartOpts::ConnectToExisting {
|
||||
feed_host,
|
||||
submit_hosts,
|
||||
}).await.unwrap();
|
||||
}
|
||||
|
||||
// Start a shard and core process:
|
||||
let shard_command = std::env::var("TELEMETRY_SHARD_BIN")
|
||||
.map(|val| Command::new(val))
|
||||
.unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("must be in rust workspace to run shard command"));
|
||||
|
||||
let core_command = std::env::var("TELEMETRY_CORE_BIN")
|
||||
.map(|val| Command::new(val))
|
||||
.unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("must be in rust workspace to run core command"));
|
||||
|
||||
Server::start(server::StartOpts::ShardAndCore {
|
||||
shard_command,
|
||||
core_command
|
||||
|
||||
Reference in New Issue
Block a user