diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index 16abbff..22e66dc 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -16,7 +16,7 @@ async fn feed_sent_version_on_connect() { let server = start_server_debug().await; // Connect a feed: - let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + let (_feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); // Expect a version response of 31: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); @@ -37,7 +37,7 @@ async fn feed_ping_responded_to_with_pong() { let server = start_server_debug().await; // Connect a feed: - let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); // Ping it: feed_tx.send_command("ping", "hello!").unwrap(); @@ -65,7 +65,7 @@ async fn multiple_feeds_sent_version_on_connect() { // Connect a bunch of feeds: let mut feeds = server .get_core() - .connect_multiple(1000) + .connect_multiple_feeds(1000) .await .unwrap(); @@ -107,7 +107,7 @@ async fn lots_of_mute_messages_dont_cause_a_deadlock() { let mut nodes = server .get_shard(shard_id) .unwrap() - .connect_multiple(2000) // 1500 of these will be overquota. + .connect_multiple_nodes(2000) // 1500 of these will be overquota. .await .expect("nodes can connect"); @@ -139,7 +139,7 @@ async fn lots_of_mute_messages_dont_cause_a_deadlock() { // receive any messages back. let mut feeds = server .get_core() - .connect_multiple(1) + .connect_multiple_feeds(1) .await .expect("feeds can connect"); @@ -168,7 +168,7 @@ async fn feed_add_and_remove_node() { let (mut node_tx, _node_rx) = server .get_shard(shard_id) .unwrap() - .connect() + .connect_node() .await .expect("can connect to shard"); @@ -199,7 +199,7 @@ async fn feed_add_and_remove_node() { tokio::time::sleep(Duration::from_millis(500)).await; // Connect a feed. - let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + let (_feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert!(feed_messages.contains(&FeedMessage::AddedChain { @@ -230,7 +230,7 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { let (mut node_tx, _node_rx) = server .get_shard(shard_id) .unwrap() - .connect() + .connect_node() .await .expect("can connect to shard"); @@ -255,7 +255,7 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { node_tx.send_json_text(node_init_msg(1, "Initial chain name", "Node 1")).unwrap(); // Connect a feed and subscribe to the above chain: - let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); feed_tx.send_command("subscribe", "Initial chain name").unwrap(); // Feed is told about the chain, and the node on this chain: @@ -316,7 +316,7 @@ async fn feed_add_and_remove_shard() { let (mut node_tx, _node_rx) = server .get_shard(shard_id) .unwrap() - .connect() + .connect_node() .await .expect("can connect to shard"); @@ -345,7 +345,7 @@ async fn feed_add_and_remove_shard() { } // Connect a feed. - let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + let (_feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); // The feed should be told about both of the chains that we've sent info about: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); @@ -386,7 +386,7 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { // Start server, add shard, connect node: let mut server = start_server_debug().await; let shard_id = server.add_shard().await.unwrap(); - let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect().await.unwrap(); + let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap(); // Send a "system connected" message for a few nodes/chains: for id in 1..=3 { @@ -413,7 +413,7 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { } // Connect a feed - let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1"); diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 4bd6a2c..c87b9cb 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -11,7 +11,11 @@ sudo sysctl -w kern.maxfiles=50000 sudo sysctl -w kern.maxfilesperproc=50000 ulimit -n 50000 sudo sysctl -w kern.ipc.somaxconn=50000 +sudo sysctl -w kern.ipc.maxsockbuf=16777216 ``` + +In general, if you run into issues, it may be better to run this on a linux +box; MacOS seems to hit limits quicker in general. */ use futures::{ StreamExt }; @@ -30,8 +34,15 @@ use common::node_types::BlockHash; /// ```sh /// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test -- soak_test --ignored --nocapture /// ``` +/// +/// You can also run this test against the pre-sharding actix binary like so: +/// ```sh +/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test -- soak_test --ignored --nocapture +/// ``` +/// +/// Both will establish the same total number of connections and same the same messages. #[ignore] -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] pub async fn soak_test() { let opts = get_soak_test_opts(); run_soak_test(opts).await; @@ -54,7 +65,7 @@ async fn run_soak_test(opts: SoakTestOpts) { let mut conns = server .get_shard(shard_id) .unwrap() - .connect_multiple(opts.nodes) + .connect_multiple_nodes(opts.nodes) .await .expect("node connections failed"); nodes.append(&mut conns); @@ -83,7 +94,7 @@ async fn run_soak_test(opts: SoakTestOpts) { // Connect feeds to the core: let mut feeds = server .get_core() - .connect_multiple(opts.feeds) + .connect_multiple_feeds(opts.feeds) .await .expect("feed connections failed"); @@ -93,33 +104,42 @@ async fn run_soak_test(opts: SoakTestOpts) { } // Start sending "update" messages from nodes at time intervals. + let bytes_in = Arc::new(AtomicUsize::new(0)); + let bytes_in2 = Arc::clone(&bytes_in); let send_handle = tokio::task::spawn(async move { + let msg = json!({ + "id":1, + "payload":{ + "bandwidth_download":576, + "bandwidth_upload":576, + "msg":"system.interval", + "peers":1 + }, + "ts":"2021-07-12T10:37:48.330433+01:00" + }); + let msg_bytes: &'static [u8] = Box::new(serde_json::to_vec(&msg).unwrap()).leak(); + loop { - let msg = json!({ - "id":1, - "payload":{ - "bandwidth_download":576, - "bandwidth_upload":576, - "msg":"system.interval", - "peers":1 - }, - "ts":"2021-07-12T10:37:48.330433+01:00" - }); - let msg_bytes = serde_json::to_vec(&msg).unwrap(); - for (node_tx, _) in &mut nodes { - node_tx.unbounded_send(SentMessage::Binary(msg_bytes.clone())).unwrap(); + // every ~1second we aim to have sent messages from all of the nodes. So we cycle through + // the node IDs and send a message from each at roughly 1s / number_of_nodes. + let mut interval = tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64)); + + for node_id in (0..nodes.len()).cycle() { + interval.tick().await; + let node_tx = &mut nodes[node_id].0; + node_tx.unbounded_send(SentMessage::StaticBinary(msg_bytes)).unwrap(); + bytes_in2.fetch_add(msg_bytes.len(), Ordering::Relaxed); } - tokio::time::sleep(Duration::from_millis(500)).await; } }); // Also start receiving messages, counting the bytes received so far. let bytes_out = Arc::new(AtomicUsize::new(0)); for (_, mut feed_rx) in feeds { - let bytes_out = bytes_out.clone(); + let bytes_out = Arc::clone(&bytes_out); tokio::task::spawn(async move { while let Some(msg) = feed_rx.next().await { - let msg = msg.expect("message coule be received"); + let msg = msg.expect("message could be received"); let num_bytes = msg.len(); bytes_out.fetch_add(num_bytes, Ordering::Relaxed); } @@ -128,20 +148,26 @@ async fn run_soak_test(opts: SoakTestOpts) { // Periodically report on bytes out tokio::task::spawn(async move { - let mut last_bytes = 0; - let mut last_now = std::time::Instant::now(); + let one_mb = 1024.0 * 1024.0; + let mut last_bytes_in = 0; + let mut last_bytes_out = 0; + let mut n = 1; loop { - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(1)).await; + let bytes_in_val = bytes_in.load(Ordering::Relaxed); + let bytes_out_val = bytes_out.load(Ordering::Relaxed); - let curr_now = std::time::Instant::now(); - let curr_bytes_out = bytes_out.load(Ordering::Relaxed); - let secs_elapsed = (curr_now - last_now).as_secs_f64(); - let kbps: f64 = (curr_bytes_out - last_bytes) as f64 / 1024.0 / secs_elapsed; + println!("#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})", + n, + (bytes_in_val - last_bytes_in) as f64 / one_mb, + (bytes_out_val - last_bytes_out) as f64 / one_mb, + bytes_in_val, + bytes_out_val + ); - println!("output kbps: ~{}", kbps); - - last_bytes = curr_bytes_out; - last_now = curr_now; + n += 1; + last_bytes_in = bytes_in_val; + last_bytes_out = bytes_out_val; } }); diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index d70a12e..113f1fa 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -10,22 +10,62 @@ id_type! { pub struct ProcessId(usize); } -pub struct StartOpts { - /// Command to run to start a shard. - /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. - pub shard_command: Command, - /// Command to run to start a telemetry core process. - /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. - pub core_command: Command, +pub enum StartOpts { + /// Start a single core process that is expected + /// to have both `/feed` and `/submit` endpoints + SingleProcess { + /// Command to run to start the process. + /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. + command: Command, + }, + /// Start a core process with a `/feed` andpoint as well as (optionally) + /// multiple shard processes with `/submit` endpoints. + ShardAndCore { + /// Command to run to start a shard. + /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. + shard_command: Command, + /// Command to run to start a telemetry core process. + /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. + core_command: Command, + }, + /// Connect to existing process(es). + ConnectToExisting { + /// Where are the processes that we can `/submit` things to? + /// Eg: `vec![127.0.0.1:12345, 127.0.0.1:9091]` + submit_hosts: Vec, + /// Where is the process that we can subscribe to the `/feed` of? + /// Eg: `127.0.0.1:3000` + feed_host: String, + } } -pub struct ConnectToExistingOpts { - /// Details for connections to `telemetry_shard` /submit endpoints - pub shard_uris: Vec, - /// Details for connections to `telemetry_core` /feed endpoints - pub feed_uri: http::Uri, +/// This represents a telemetry server. It can be in different modes +/// depending on how it was started, but the interface is similar in every case +/// so that tests are somewhat compatible with multiple configurations. +pub enum Server { + SingleProcessMode { + /// A virtual shard that we can hand out. + virtual_shard: ShardProcess, + /// Core process that we can connect to. + core: CoreProcess + }, + ShardAndCoreMode { + /// Command to run to start a new shard. + shard_command: Command, + /// Shard processes that we can connect to. + shards: DenseMap, + /// Core process that we can connect to. + core: CoreProcess, + }, + ConnectToExistingMode { + /// Shard processes that we can connect to. + shards: DenseMap, + /// Core process that we can connect to. + core: CoreProcess, + } } + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Can't establsih connection: {0}")] @@ -34,47 +74,44 @@ pub enum Error { JoinError(#[from] tokio::task::JoinError), #[error("Can't establsih connection: {0}")] IoError(#[from] std::io::Error), - #[error("Could not obtain port for process: {0}")] + #[error("Could not obtain port for process as the line we waited for in log output didn't show up: {0}")] ErrorObtainingPort(anyhow::Error), #[error("Whoops; attempt to kill a process we didn't start (and so have no handle to)")] CannotKillNoHandle, #[error( - "Whoops; attempt to add a shard to a server we didn't start (and so have no handle to)" + "Can't add a shard: command not provided, or we are not in charge of spawning processes" )] - CannotAddShardNoHandle, -} - -/// This represents a telemetry core process and zero or more connected shards. -/// From this, you can add/remove shards, establish node/feed connections, and -/// send/receive relevant messages from each. -pub struct Server { - /// URI to connect a shard to core: - core_shard_submit_uri: Option, - /// Command to run to start a new shard. Optional - /// because if we connect to running instances it'll - /// be unset. - shard_command: Option, - /// Shard processes that we can connect to - shards: DenseMap, - /// Core process that we can connect to - core: CoreProcess, + CannotAddShard, + #[error("The URI provided was invalid: {0}")] + InvalidUri(#[from] http::uri::InvalidUri) } impl Server { pub fn get_core(&self) -> &CoreProcess { - &self.core + match self { + Server::SingleProcessMode { core, .. } => core, + Server::ShardAndCoreMode { core, ..} => core, + Server::ConnectToExistingMode { core, .. } => core + } } pub fn get_shard(&self, id: ProcessId) -> Option<&ShardProcess> { - self.shards.get(id) - } - - pub fn iter_shards(&self) -> impl Iterator { - self.shards.iter().map(|(_, v)| v) + match self { + Server::SingleProcessMode { virtual_shard, .. } => Some(virtual_shard), + Server::ShardAndCoreMode { shards, ..} => shards.get(id), + Server::ConnectToExistingMode { shards, .. } => shards.get(id) + } } pub async fn kill_shard(&mut self, id: ProcessId) -> bool { - let shard = match self.shards.remove(id) { + let shard = match self { + // Can't remove the pretend shard: + Server::SingleProcessMode { .. } => return false, + Server::ShardAndCoreMode { shards, ..} => shards.remove(id), + Server::ConnectToExistingMode { shards, .. } => shards.remove(id) + }; + + let shard = match shard { Some(shard) => shard, None => return false, }; @@ -94,9 +131,17 @@ impl Server { // Spawn so we don't need to await cleanup if we don't care. // Run all kill futs simultaneously. let handle = tokio::spawn(async move { - let shard_kill_futs = self.shards.into_iter().map(|(_, s)| s.kill()); + let (core, shards) = match self { + Server::SingleProcessMode { core, .. } + => (core, DenseMap::new()), + Server::ShardAndCoreMode { core, shards, ..} + => (core, shards), + Server::ConnectToExistingMode { core, shards, .. } + => (core, shards) + }; - let _ = tokio::join!(futures::future::join_all(shard_kill_futs), self.core.kill()); + let shard_kill_futs = shards.into_iter().map(|(_, s)| s.kill()); + let _ = tokio::join!(futures::future::join_all(shard_kill_futs), core.kill()); }); // You can wait for cleanup but aren't obliged to: @@ -105,68 +150,119 @@ impl Server { /// Connect a new shard and return a process that you can interact with: pub async fn add_shard(&mut self) -> Result { - let core_uri = match &self.core_shard_submit_uri { - Some(uri) => uri, - None => return Err(Error::CannotAddShardNoHandle), - }; + match self { + // Always get back the same "shard" in virtual mode; it's just the core anyway. + Server::SingleProcessMode { virtual_shard, .. } => { + Ok(virtual_shard.id) + }, + // We're connecting to existing things; nothing sane to hand back. + Server::ConnectToExistingMode { .. } => { + Err(Error::CannotAddShard) + }, + // Start a new process and return that. + Server::ShardAndCoreMode { shard_command, shards, core } => { + // Where is the URI we'll want to submit things to? + let core_shard_submit_uri = format!("http://{}/shard_submit", core.host); - let mut shard_cmd: TokioCommand = self - .shard_command - .clone() - .ok_or_else(|| Error::CannotAddShardNoHandle)? - .into(); + let mut shard_cmd: TokioCommand = shard_command.clone().into(); + shard_cmd + .arg("--listen") + .arg("127.0.0.1:0") // 0 to have a port picked by the kernel + .arg("--log") + .arg("info") + .arg("--core") + .arg(core_shard_submit_uri) + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stdin(std::process::Stdio::piped()); - shard_cmd - .arg("--listen") - .arg("127.0.0.1:0") // 0 to have a port picked by the kernel - .arg("--log") - .arg("info") - .arg("--core") - .arg(core_uri.to_string()) - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stdin(std::process::Stdio::piped()); + let mut shard_process = shard_cmd.spawn()?; + let mut child_stdout = shard_process.stdout.take().expect("shard stdout"); + let shard_port = utils::get_port(&mut child_stdout) + .await + .map_err(|e| Error::ErrorObtainingPort(e))?; - let mut shard_process = shard_cmd.spawn()?; - let mut child_stdout = shard_process.stdout.take().expect("shard stdout"); - let shard_port = utils::get_port(&mut child_stdout) - .await - .map_err(|e| Error::ErrorObtainingPort(e))?; + // Attempt to wait until we've received word that the shard is connected to the + // core before continuing. If we don't wait for this, the connection may happen + // after we've attempted to connect node sockets, and they would be booted and + // made to reconnect, which we don't want to deal with in general. + let _ = utils::wait_for_line_containing( + &mut child_stdout, + |s| s.contains("Connected to telemetry core"), + std::time::Duration::from_secs(5), + ) + .await; - // Attempt to wait until we've received word that the shard is connected to the - // core before continuing. If we don't wait for this, the connection may happen - // after we've attempted to connect node sockets, and they would be booted and - // made to reconnect, which we don't want to deal with in general. - let _ = utils::wait_for_line_containing( - &mut child_stdout, - "Connected to telemetry core", - std::time::Duration::from_secs(5), - ) - .await; + // Since we're piping stdout from the child process, we need somewhere for it to go + // else the process will get stuck when it tries to produce output: + utils::drain(child_stdout, tokio::io::stderr()); - // Since we're piping stdout from the child process, we need somewhere for it to go - // else the process will get stuck when it tries to produce output: - utils::drain(child_stdout, tokio::io::stderr()); + let pid = shards.add_with(|id| Process { + id, + host: format!("127.0.0.1:{}", shard_port), + handle: Some(shard_process), + _channel_type: PhantomData, + }); - let shard_uri = format!("http://127.0.0.1:{}/submit", shard_port) - .parse() - .expect("valid submit URI"); - - let pid = self.shards.add_with(|id| Process { - id, - handle: Some(shard_process), - uri: shard_uri, - _channel_type: PhantomData, - }); - - Ok(pid) + Ok(pid) + }, + } } - /// Start a telemetry_core process. From here, we can add/remove shards as needed. + /// Start a server. pub async fn start(opts: StartOpts) -> Result { - let mut core_cmd: TokioCommand = opts.core_command.into(); + let server = match opts { + StartOpts::SingleProcess { command } => { + let core_process = Server::start_core(command).await?; + let virtual_shard_host = core_process.host.clone(); + Server::SingleProcessMode { + core: core_process, + virtual_shard: Process { + id: ProcessId(0), + host: virtual_shard_host, + handle: None, + _channel_type: PhantomData + } + } + }, + StartOpts::ShardAndCore { core_command, shard_command } => { + let core_process = Server::start_core(core_command).await?; + Server::ShardAndCoreMode { + core: core_process, + shard_command, + shards: DenseMap::new() + } + }, + StartOpts::ConnectToExisting { feed_host, submit_hosts } => { + let mut shards = DenseMap::new(); + for host in submit_hosts { + shards.add_with(|id| Process { + id, + host, + handle: None, + _channel_type: PhantomData, + }); + } - let mut child = core_cmd + Server::ConnectToExistingMode { + shards, + core: Process { + id: ProcessId(0), + host: feed_host, + handle: None, + _channel_type: PhantomData, + }, + } + } + }; + + Ok(server) + } + + /// Start up a core process and return it. + async fn start_core(command: Command) -> Result { + let mut tokio_core_cmd: TokioCommand = command.into(); + let mut child = tokio_core_cmd .arg("--listen") .arg("127.0.0.1:0") // 0 to have a port picked by the kernel .arg("--log") @@ -186,52 +282,14 @@ impl Server { // else the process will get stuck when it tries to produce output: utils::drain(child_stdout, tokio::io::stderr()); - // URI for feeds to connect to the core: - let feed_uri = format!("http://127.0.0.1:{}/feed", core_port) - .parse() - .expect("valid feed URI"); + let core_process = Process { + id: ProcessId(0), + host: format!("127.0.0.1:{}", core_port), + handle: Some(child), + _channel_type: PhantomData, + }; - Ok(Server { - shard_command: Some(opts.shard_command), - core_shard_submit_uri: Some( - format!("http://127.0.0.1:{}/shard_submit", core_port) - .parse() - .expect("valid shard_submit URI"), - ), - shards: DenseMap::new(), - core: Process { - id: ProcessId(0), - handle: Some(child), - uri: feed_uri, - _channel_type: PhantomData, - }, - }) - } - - /// Establshes the requested connections to existing processes. - pub fn connect_to_existing(opts: ConnectToExistingOpts) -> Server { - let mut shards = DenseMap::new(); - for shard_uri in opts.shard_uris { - shards.add_with(|id| Process { - id, - uri: shard_uri, - handle: None, - _channel_type: PhantomData, - }); - } - - Server { - shard_command: None, - // We can't add shards if starting in this mode: - core_shard_submit_uri: None, - shards, - core: Process { - id: ProcessId(0), - uri: opts.feed_uri, - handle: None, - _channel_type: PhantomData, - }, - } + Ok(core_process) } } @@ -239,11 +297,11 @@ impl Server { /// may be either a `telemetry_shard` or `telemetry_core`. pub struct Process { id: ProcessId, + /// Host that the process is running on (eg 127.0.0.1:8080). + host: String, /// If we started the processes ourselves, we'll have a handle to /// them which we can use to kill them. Else, we may not. handle: Option, - /// The URI that we can use to connect to the process socket. - uri: http::Uri, /// The kind of the process (lets us add methods specific to shard/core). _channel_type: PhantomData, } @@ -272,25 +330,54 @@ impl Process { impl, Recv: From> Process<(Send, Recv)> { /// Establish a connection to the process - pub async fn connect(&self) -> Result<(Send, Recv), Error> { - ws_client::connect(&self.uri) + async fn connect_to_uri(&self, uri: &http::Uri) -> Result<(Send, Recv), Error> { + ws_client::connect(uri) .await .map(|(s, r)| (s.into(), r.into())) .map_err(|e| e.into()) } /// Establish multiple connections to the process - pub async fn connect_multiple( + async fn connect_multiple_to_uri( &self, + uri: &http::Uri, num_connections: usize, ) -> Result, Error> { - utils::connect_multiple_to_uri(&self.uri, num_connections) + utils::connect_multiple_to_uri(uri, num_connections) .await .map(|v| v.into_iter().map(|(s, r)| (s.into(), r.into())).collect()) .map_err(|e| e.into()) } } +impl ShardProcess { + /// Establish a connection to the process + pub async fn connect_node(&self) -> Result<(channels::ShardSender, channels::ShardReceiver), Error> { + let uri = format!("http://{}/submit", self.host).parse()?; + self.connect_to_uri(&uri).await + } + + /// Establish multiple connections to the process + pub async fn connect_multiple_nodes(&self, num_connections: usize) -> Result, Error> { + let uri = format!("http://{}/submit", self.host).parse()?; + self.connect_multiple_to_uri(&uri, num_connections).await + } +} + +impl CoreProcess { + /// Establish a connection to the process + pub async fn connect_feed(&self) -> Result<(channels::FeedSender, channels::FeedReceiver), Error> { + let uri = format!("http://{}/feed", self.host).parse()?; + self.connect_to_uri(&uri).await + } + + /// Establish multiple connections to the process + pub async fn connect_multiple_feeds(&self, num_connections: usize) -> Result, Error> { + let uri = format!("http://{}/feed", self.host).parse()?; + self.connect_multiple_to_uri(&uri, num_connections).await + } +} + /// This defines a command to run. This exists because [`tokio::process::Command`] /// cannot be cloned, but we need to be able to clone our command to spawn multiple /// processes with it. diff --git a/backend/test_utils/src/server/utils.rs b/backend/test_utils/src/server/utils.rs index 354c570..8de17d9 100644 --- a/backend/test_utils/src/server/utils.rs +++ b/backend/test_utils/src/server/utils.rs @@ -8,11 +8,20 @@ use tokio::time::Duration; /// with the side benefit that we'll wait for it to start listening before returning. We do this /// because we want to allow the kernel to assign ports and so don't specify a port as an arg. pub async fn get_port(reader: R) -> Result { - let expected_text = "listening on http://127.0.0.1:"; - wait_for_line_containing(reader, expected_text, Duration::from_secs(240)) + // For the new service: + let new_expected_text = "listening on http://127.0.0.1:"; + // For the older non-sharded actix based service: + let old_expected_text = "service on 127.0.0.1:"; + + let is_text = |s: &str| s.contains(new_expected_text) || s.contains(old_expected_text); + wait_for_line_containing(reader, is_text, Duration::from_secs(240)) .await .and_then(|line| { - let (_, port_str) = line.rsplit_once(expected_text).unwrap(); + // The line must match one of our expected strings: + let (_, port_str) = line + .rsplit_once(new_expected_text) + .unwrap_or_else(|| line.rsplit_once(old_expected_text).unwrap()); + // Grab the port after the string: port_str .trim() .parse() @@ -23,9 +32,9 @@ pub async fn get_port(reader: R) -> Result( +pub async fn wait_for_line_containing bool>( reader: R, - text: &str, + is_match: F, max_wait_between_lines: Duration, ) -> Result { let reader = BufReader::new(reader); @@ -37,13 +46,12 @@ pub async fn wait_for_line_containing( let line = match line { // timeout expired; couldn't get port: Err(_) => { - return Err(anyhow!( - "Timeout expired waiting for output containing: {}", - text - )) + return Err(anyhow!("Timeout elapsed waiting for text match")) } // Something went wrong reading line; bail: - Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)), + Ok(Err(e)) => { + return Err(anyhow!("Could not read line from stdout: {}", e)) + }, // No more output; process ended? bail: Ok(Ok(None)) => { return Err(anyhow!( @@ -54,7 +62,7 @@ pub async fn wait_for_line_containing( Ok(Ok(Some(line))) => line, }; - if line.contains(text) { + if is_match(&line) { return Ok(line); } } diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index 1a11f76..8229d4c 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -1,30 +1,45 @@ use super::commands; use crate::server::{self, Server, Command}; -/// Start a telemetry core server. We'll use `cargo run` by default, to ensure that -/// the code we run is uptodate, but you can also provide env vars to configure the binary -/// that runs for the shard and core process: +/// Start a telemetry server. We'll use `cargo run` by default, but you can also provide +/// env vars to configure the binary that runs for the shard and core process. Either: /// -/// TELEMETRY_SHARD_BIN - path to telemetry_shard binary -/// TELEMETRY_CORE_BIN - path to telemetry_core binary -async fn start_server(release_mode: bool) -> Server { +/// - `TELEMETRY_BIN` - path to the telemetry binary (which can function as shard _and_ core) +/// +/// Or alternately neither/one/both of: +/// +/// - `TELEMETRY_SHARD_BIN` - path to telemetry_shard binary +/// - `TELEMETRY_CORE_BIN` - path to telemetry_core binary +/// +/// Whatever is not provided will be substituted with a `cargo run` variant instead. +pub async fn start_server(release_mode: bool) -> Server { + + if let Ok(bin) = std::env::var("TELEMETRY_BIN") { + return Server::start(server::StartOpts::SingleProcess { + command: Command::new(bin) + }).await.unwrap(); + } + let shard_command = std::env::var("TELEMETRY_SHARD_BIN") .map(|val| Command::new(val)) - .unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("valid shard command")); + .unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("must be in rust workspace to run shard command")); let core_command = std::env::var("TELEMETRY_CORE_BIN") .map(|val| Command::new(val)) - .unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("valid core command")); + .unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("must be in rust workspace to run core command")); - Server::start(server::StartOpts { shard_command, core_command }).await.unwrap() + Server::start(server::StartOpts::ShardAndCore { + shard_command, + core_command + }).await.unwrap() } -/// Start a telemetry server using debug builds for compile speed +/// Start a telemetry core server in debug mode. see [`start_server`] for details. pub async fn start_server_debug() -> Server { start_server(false).await } -/// Start a telemetry server using release builds for performance accuracy +/// Start a telemetry core server in release mode. see [`start_server`] for details. pub async fn start_server_release() -> Server { start_server(true).await } \ No newline at end of file