diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 4f00159..9b8f9ad 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1394,6 +1394,7 @@ dependencies = [ "smallvec", "soketto", "structopt", + "test_utils", "thiserror", "tokio", "tokio-util", @@ -1445,6 +1446,7 @@ dependencies = [ "futures", "http", "log", + "serde_json", "soketto", "thiserror", "tokio", diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 3ddca7c..c78a741 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -29,3 +29,6 @@ thiserror = "1.0.25" tokio = { version = "1.7.0", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } warp = "0.3.1" + +[dev-dependencies] +test_utils = { path = "../test_utils" } \ No newline at end of file diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 508c8b5..8126bab 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -308,3 +308,17 @@ where // loop ended; give socket back to parent: (tx_to_aggregator, websocket) } + +#[cfg(test)] +pub mod test { + + use test_utils::test_simple; + + #[test] + fn test_feed_connection () { + + + + } + +} \ No newline at end of file diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml index 8236983..4003856 100644 --- a/backend/test_utils/Cargo.toml +++ b/backend/test_utils/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1.0.41" futures = "0.3.15" http = "0.2.4" log = "0.4.14" +serde_json = "1.0.64" soketto = "0.6.0" thiserror = "1.0.25" tokio = { version = "1.7.1", features = ["full"] } diff --git a/backend/test_utils/src/connect_to_servers.rs b/backend/test_utils/src/connect_to_servers.rs index c1b72ec..9539f36 100644 --- a/backend/test_utils/src/connect_to_servers.rs +++ b/backend/test_utils/src/connect_to_servers.rs @@ -1,34 +1,26 @@ use crate::ws_client; -use tokio::process::Command; +use tokio::process::{ self, Command }; use tokio::io::BufReader; use tokio::io::{ AsyncRead, AsyncBufReadExt }; use tokio::time::Duration; use anyhow::{ anyhow, Context }; -pub struct StartProcesses { +pub struct StartProcessOpts { /// Optional command to run to start a shard (instead of `telemetry_shard`). - /// The `--listen` argument will be appended here and shouldn't be provided. + /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. pub shard_command: Option, + /// How many shards should we start? + pub num_shards: usize, /// Optional command to run to start a telemetry core process (instead of `telemetry_core`). - /// The `--listen` argument will be appended here and shouldn't be provided. - pub telemetry_command: Option, - /// How many connections should we establish to each shard? We'll start - /// up a shard for each entry here. - pub num_shard_connections: Vec, - /// How many connections should we establish to the telemetry_core feed? - pub num_feed_connections: usize, + /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. + pub core_command: Option } -pub struct ConnectToExisting { - /// URI to shard /submit endpoint - pub shards: Vec, - /// URI to core /feed endpoint - pub feed: ConnectionDetails, -} - -pub struct ConnectionDetails { - pub uri: http::Uri, - pub num_connections: usize +pub struct ConnectToExistingOpts { + /// Details for connections to `telemetry_shard` /submit endpoints + pub shard_uris: Vec, + /// Details for connections to `telemetry_core` /feed endpoints + pub feed_uri: http::Uri, } #[derive(thiserror::Error, Debug)] @@ -40,26 +32,31 @@ pub enum Error { #[error("Can't establsih connection: {0}")] IoError(#[from] std::io::Error), #[error("Could not obtain port for process: {0}")] - ErrorObtainingPort(anyhow::Error) + ErrorObtainingPort(anyhow::Error), + #[error("Whoops; attempt to kill a process we didn't start (and so have no handle to)")] + CannotKillNoHandle } -pub struct Connection { - /// Connections to each of the shard submit URIs - pub shard_connections: Vec>, - /// Connections to the telemetry feed URI - pub feed_connections: Vec<(ws_client::Sender, ws_client::Receiver)> +/// This provides back connections (or groups of connections) that are +/// hooked up to the running processes and ready to send/receive messages. +pub struct Server { + /// Shard processes that we can connect to + pub shards: Vec, + /// Core process that we can connect to + pub core: Process, } -impl Connection { +impl Server { /// Start telemetry_core and telemetry_shard processes and establish connections to them. - pub async fn start_processes(opts: StartProcesses) -> Result { + pub async fn start_processes(opts: StartProcessOpts) -> Result { - let mut core_cmd = opts.telemetry_command + let mut core_cmd = opts.core_command .unwrap_or(Command::new("telemetry_core")) .arg("--listen") .arg("127.0.0.1:0") // 0 to have a port picked by the kernel .arg("--log") .arg("info") + .kill_on_drop(true) .stdout(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped()) .spawn()?; @@ -77,75 +74,95 @@ impl Connection { .arg("info") .arg("--core") .arg(format!("127.0.0.1:{}", core_port)) + .kill_on_drop(true) .stdout(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped()); // Start shards and find out the ports that they are running on - let mut shard_ports: Vec = vec![]; - for _ in 0..opts.num_shard_connections.len() { + let mut shard_handle_and_ports: Vec<(process::Child, u16)> = vec![]; + for _ in 0..opts.num_shards { let mut shard_process = shard_cmd.spawn()?; let shard_port = get_port(shard_process.stdout.take().expect("shard stdout")) .await .map_err(|e| Error::ErrorObtainingPort(e))?; - shard_ports.push(shard_port); + shard_handle_and_ports.push((shard_process, shard_port)); } // now that we've started the processes, establish connections to them: - let shard_uris: Vec = shard_ports + let shard_handle_and_uris: Vec<(process::Child, http::Uri)> = shard_handle_and_ports .into_iter() - .map(|port| format!("http://127.0.0.1:{}/submit", port).parse().expect("valid submit URI")) + .map(|(h,port)| (h,format!("http://127.0.0.1:{}/submit", port).parse().expect("valid submit URI"))) .collect(); let feed_uri = format!("http://127.0.0.1:{}/feed", core_port) .parse() .expect("valid feed URI"); - ConnectToExisting { - feed: ConnectionDetails { - uri: feed_uri, - num_connections: opts.num_feed_connections - }, - shards: opts.num_shard_connections + Ok(Server { + shards: shard_handle_and_uris .into_iter() - .zip(shard_uris) - .map(|(n, uri)| ConnectionDetails { + .map(|(handle, uri)| Process { + handle: Some(handle), uri, - num_connections: n }) - .collect() - }; - - todo!(); + .collect(), + core: Process { + handle: Some(core_cmd), + uri: feed_uri, + } + }) } /// Establshes the requested connections to existing processes. - pub async fn connect_to_existing(opts: ConnectToExisting) -> Result { - let shard_details = opts.shards; - - // connect to shards in the background: - let shard_groups_fut = tokio::spawn(async move { - let mut shard_results = vec![]; - for details in &shard_details { - shard_results.push(connect_to_uri(&details.uri, details.num_connections).await); - } - let result_shard_groups: Result,_> = shard_results.into_iter().collect(); - result_shard_groups - }); - - // In the meantime, connect feeds: - let feed_connections = connect_to_uri(&opts.feed.uri, opts.feed.num_connections).await?; - - // Now feeds are done, wait until shards also connected (this will have been progressing anyway): - let shard_connections = shard_groups_fut.await??; - - Ok(Connection { - shard_connections, - feed_connections, - }) + pub fn connect_to_existing(opts: ConnectToExistingOpts) -> Server { + Server { + shards: opts.shard_uris + .into_iter() + .map(|uri| Process { uri, handle: None }) + .collect(), + core: Process { uri: opts.feed_uri, handle: None } + } } } +/// This represents a running process that we can connect to, which +/// may be either a `telemetry_shard` or `telemetry_core`. +pub struct Process { + /// If we started the processes ourselves, we'll have a handle to + /// them which we can use to kill them. Else, we may not. + handle: Option, + /// The URI that we can use to connect to the process socket. + uri: http::Uri +} + +impl Process { + /// Establish a connection to the process + pub async fn connect(&self) -> Result<(ws_client::Sender, ws_client::Receiver), Error> { + ws_client::connect(&self.uri) + .await + .map_err(|e| e.into()) + } + + /// Establish multiple connections to the process + pub async fn connect_multiple(&self, num_connections: usize) -> Result, Error> { + connect_multiple_to_uri(&self.uri, num_connections) + .await + .map_err(|e| e.into()) + } + + /// Kill the process and wait for this to complete + pub async fn kill(self) -> Result<(), Error> { + match self.handle { + Some(mut handle) => Ok(handle.kill().await?), + None => Err(Error::CannotKillNoHandle) + } + } +} + +/// Reads from the stdout of the shard/core process to extract the port that was assigned to it, +/// with the side benefit that we'll wait for it to start listening before returning. We do this +/// because we want to allow the kernel to assign ports and so don't specify a port as an arg. async fn get_port(reader: R) -> Result { let reader = BufReader::new(reader); let mut reader_lines = reader.lines(); @@ -176,8 +193,12 @@ async fn get_port(reader: R) -> Result } } -async fn connect_to_uri(uri: &http::Uri, num_connections: usize) -> Result, ws_client::ConnectError> { - let connect_futs = (0..num_connections).map(|_| ws_client::connect(uri)); - let sockets: Result,_> = futures::future::join_all(connect_futs).await.into_iter().collect(); +async fn connect_multiple_to_uri(uri: &http::Uri, num_connections: usize) -> Result, ws_client::ConnectError> { + let connect_futs = (0..num_connections) + .map(|_| ws_client::connect(uri)); + let sockets: Result,_> = futures::future::join_all(connect_futs) + .await + .into_iter() + .collect(); sockets } \ No newline at end of file diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index 9b2eeb2..a44f4d7 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -1,3 +1,8 @@ +// A helper to spawn or connect to shard/core processes and hand back connections to them +pub mod connect_to_servers; + /// A wrapper around soketto to simplify the process of establishing connections pub mod ws_client; -pub mod connect_to_servers; \ No newline at end of file + +/// A helper to construct simple test cases involving a single shard and feed. +pub mod test_simple; \ No newline at end of file diff --git a/backend/test_utils/src/test_simple.rs b/backend/test_utils/src/test_simple.rs new file mode 100644 index 0000000..5f25ccb --- /dev/null +++ b/backend/test_utils/src/test_simple.rs @@ -0,0 +1,62 @@ +use tokio::process::Command; +use crate::connect_to_servers::{ Server, StartProcessOpts, Process }; + +pub struct Runner { + shard_command: Option, + core_command: Option +} + +impl Runner { + pub fn new() -> Runner { + Runner { + shard_command: None, + core_command: None + } + } + + pub fn shard_command(mut self, cmd: Command) -> Self { + self.shard_command = Some(cmd); + self + } + + pub fn core_command(mut self, cmd: Command) -> Self { + self.core_command = Some(cmd); + self + } + + pub async fn build(self) -> Result { + let mut server = Server::start_processes(StartProcessOpts { + shard_command: self.shard_command, + num_shards: 1, + core_command: self.core_command, + }).await?; + + let core_process = server.core; + let shard_process = server.shards.remove(0); + + Ok(Processes { + core_process, + shard_process, + }) + } +} + +/// A representation of the running processes that we can connect and send messages to. +pub struct Processes { + shard_process: Process, + core_process: Process, +} + +impl Processes { + pub async fn cleanup(self) { + let handle = tokio::spawn(async move { + let _ = tokio::join!( + self.shard_process.kill(), + self.core_process.kill() + ); + }); + + // You can wait for cleanup but aren't obliged to: + let _ = handle.await; + } +}