Write some basic test utils to get going; time to start trying them

This commit is contained in:
James Wilson
2021-07-07 17:25:59 +01:00
parent f2adead2e9
commit c043393e28
7 changed files with 182 additions and 74 deletions
+94 -73
View File
@@ -1,34 +1,26 @@
use crate::ws_client;
use tokio::process::Command;
use tokio::process::{ self, Command };
use tokio::io::BufReader;
use tokio::io::{ AsyncRead, AsyncBufReadExt };
use tokio::time::Duration;
use anyhow::{ anyhow, Context };
pub struct StartProcesses {
pub struct StartProcessOpts {
/// Optional command to run to start a shard (instead of `telemetry_shard`).
/// The `--listen` argument will be appended here and shouldn't be provided.
/// The `--listen` and `--log` arguments will be appended within and shouldn't be provided.
pub shard_command: Option<Command>,
/// How many shards should we start?
pub num_shards: usize,
/// Optional command to run to start a telemetry core process (instead of `telemetry_core`).
/// The `--listen` argument will be appended here and shouldn't be provided.
pub telemetry_command: Option<Command>,
/// How many connections should we establish to each shard? We'll start
/// up a shard for each entry here.
pub num_shard_connections: Vec<usize>,
/// How many connections should we establish to the telemetry_core feed?
pub num_feed_connections: usize,
/// The `--listen` and `--log` arguments will be appended within and shouldn't be provided.
pub core_command: Option<Command>
}
pub struct ConnectToExisting {
/// URI to shard /submit endpoint
pub shards: Vec<ConnectionDetails>,
/// URI to core /feed endpoint
pub feed: ConnectionDetails,
}
pub struct ConnectionDetails {
pub uri: http::Uri,
pub num_connections: usize
pub struct ConnectToExistingOpts {
/// Details for connections to `telemetry_shard` /submit endpoints
pub shard_uris: Vec<http::Uri>,
/// Details for connections to `telemetry_core` /feed endpoints
pub feed_uri: http::Uri,
}
#[derive(thiserror::Error, Debug)]
@@ -40,26 +32,31 @@ pub enum Error {
#[error("Can't establsih connection: {0}")]
IoError(#[from] std::io::Error),
#[error("Could not obtain port for process: {0}")]
ErrorObtainingPort(anyhow::Error)
ErrorObtainingPort(anyhow::Error),
#[error("Whoops; attempt to kill a process we didn't start (and so have no handle to)")]
CannotKillNoHandle
}
pub struct Connection {
/// Connections to each of the shard submit URIs
pub shard_connections: Vec<Vec<(ws_client::Sender, ws_client::Receiver)>>,
/// Connections to the telemetry feed URI
pub feed_connections: Vec<(ws_client::Sender, ws_client::Receiver)>
/// This provides back connections (or groups of connections) that are
/// hooked up to the running processes and ready to send/receive messages.
pub struct Server {
/// Shard processes that we can connect to
pub shards: Vec<Process>,
/// Core process that we can connect to
pub core: Process,
}
impl Connection {
impl Server {
/// Start telemetry_core and telemetry_shard processes and establish connections to them.
pub async fn start_processes(opts: StartProcesses) -> Result<Connection, Error> {
pub async fn start_processes(opts: StartProcessOpts) -> Result<Server, Error> {
let mut core_cmd = opts.telemetry_command
let mut core_cmd = opts.core_command
.unwrap_or(Command::new("telemetry_core"))
.arg("--listen")
.arg("127.0.0.1:0") // 0 to have a port picked by the kernel
.arg("--log")
.arg("info")
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.spawn()?;
@@ -77,75 +74,95 @@ impl Connection {
.arg("info")
.arg("--core")
.arg(format!("127.0.0.1:{}", core_port))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped());
// Start shards and find out the ports that they are running on
let mut shard_ports: Vec<u16> = vec![];
for _ in 0..opts.num_shard_connections.len() {
let mut shard_handle_and_ports: Vec<(process::Child, u16)> = vec![];
for _ in 0..opts.num_shards {
let mut shard_process = shard_cmd.spawn()?;
let shard_port = get_port(shard_process.stdout.take().expect("shard stdout"))
.await
.map_err(|e| Error::ErrorObtainingPort(e))?;
shard_ports.push(shard_port);
shard_handle_and_ports.push((shard_process, shard_port));
}
// now that we've started the processes, establish connections to them:
let shard_uris: Vec<http::Uri> = shard_ports
let shard_handle_and_uris: Vec<(process::Child, http::Uri)> = shard_handle_and_ports
.into_iter()
.map(|port| format!("http://127.0.0.1:{}/submit", port).parse().expect("valid submit URI"))
.map(|(h,port)| (h,format!("http://127.0.0.1:{}/submit", port).parse().expect("valid submit URI")))
.collect();
let feed_uri = format!("http://127.0.0.1:{}/feed", core_port)
.parse()
.expect("valid feed URI");
ConnectToExisting {
feed: ConnectionDetails {
uri: feed_uri,
num_connections: opts.num_feed_connections
},
shards: opts.num_shard_connections
Ok(Server {
shards: shard_handle_and_uris
.into_iter()
.zip(shard_uris)
.map(|(n, uri)| ConnectionDetails {
.map(|(handle, uri)| Process {
handle: Some(handle),
uri,
num_connections: n
})
.collect()
};
todo!();
.collect(),
core: Process {
handle: Some(core_cmd),
uri: feed_uri,
}
})
}
/// Establshes the requested connections to existing processes.
pub async fn connect_to_existing(opts: ConnectToExisting) -> Result<Connection, Error> {
let shard_details = opts.shards;
// connect to shards in the background:
let shard_groups_fut = tokio::spawn(async move {
let mut shard_results = vec![];
for details in &shard_details {
shard_results.push(connect_to_uri(&details.uri, details.num_connections).await);
}
let result_shard_groups: Result<Vec<_>,_> = shard_results.into_iter().collect();
result_shard_groups
});
// In the meantime, connect feeds:
let feed_connections = connect_to_uri(&opts.feed.uri, opts.feed.num_connections).await?;
// Now feeds are done, wait until shards also connected (this will have been progressing anyway):
let shard_connections = shard_groups_fut.await??;
Ok(Connection {
shard_connections,
feed_connections,
})
pub fn connect_to_existing(opts: ConnectToExistingOpts) -> Server {
Server {
shards: opts.shard_uris
.into_iter()
.map(|uri| Process { uri, handle: None })
.collect(),
core: Process { uri: opts.feed_uri, handle: None }
}
}
}
/// This represents a running process that we can connect to, which
/// may be either a `telemetry_shard` or `telemetry_core`.
pub struct Process {
/// If we started the processes ourselves, we'll have a handle to
/// them which we can use to kill them. Else, we may not.
handle: Option<process::Child>,
/// The URI that we can use to connect to the process socket.
uri: http::Uri
}
impl Process {
/// Establish a connection to the process
pub async fn connect(&self) -> Result<(ws_client::Sender, ws_client::Receiver), Error> {
ws_client::connect(&self.uri)
.await
.map_err(|e| e.into())
}
/// Establish multiple connections to the process
pub async fn connect_multiple(&self, num_connections: usize) -> Result<Vec<(ws_client::Sender, ws_client::Receiver)>, Error> {
connect_multiple_to_uri(&self.uri, num_connections)
.await
.map_err(|e| e.into())
}
/// Kill the process and wait for this to complete
pub async fn kill(self) -> Result<(), Error> {
match self.handle {
Some(mut handle) => Ok(handle.kill().await?),
None => Err(Error::CannotKillNoHandle)
}
}
}
/// Reads from the stdout of the shard/core process to extract the port that was assigned to it,
/// with the side benefit that we'll wait for it to start listening before returning. We do this
/// because we want to allow the kernel to assign ports and so don't specify a port as an arg.
async fn get_port<R: AsyncRead + Unpin>(reader: R) -> Result<u16, anyhow::Error> {
let reader = BufReader::new(reader);
let mut reader_lines = reader.lines();
@@ -176,8 +193,12 @@ async fn get_port<R: AsyncRead + Unpin>(reader: R) -> Result<u16, anyhow::Error>
}
}
async fn connect_to_uri(uri: &http::Uri, num_connections: usize) -> Result<Vec<(ws_client::Sender, ws_client::Receiver)>, ws_client::ConnectError> {
let connect_futs = (0..num_connections).map(|_| ws_client::connect(uri));
let sockets: Result<Vec<_>,_> = futures::future::join_all(connect_futs).await.into_iter().collect();
async fn connect_multiple_to_uri(uri: &http::Uri, num_connections: usize) -> Result<Vec<(ws_client::Sender, ws_client::Receiver)>, ws_client::ConnectError> {
let connect_futs = (0..num_connections)
.map(|_| ws_client::connect(uri));
let sockets: Result<Vec<_>,_> = futures::future::join_all(connect_futs)
.await
.into_iter()
.collect();
sockets
}
+6 -1
View File
@@ -1,3 +1,8 @@
// A helper to spawn or connect to shard/core processes and hand back connections to them
pub mod connect_to_servers;
/// A wrapper around soketto to simplify the process of establishing connections
pub mod ws_client;
pub mod connect_to_servers;
/// A helper to construct simple test cases involving a single shard and feed.
pub mod test_simple;
+62
View File
@@ -0,0 +1,62 @@
use tokio::process::Command;
use crate::connect_to_servers::{ Server, StartProcessOpts, Process };
pub struct Runner {
shard_command: Option<Command>,
core_command: Option<Command>
}
impl Runner {
pub fn new() -> Runner {
Runner {
shard_command: None,
core_command: None
}
}
pub fn shard_command(mut self, cmd: Command) -> Self {
self.shard_command = Some(cmd);
self
}
pub fn core_command(mut self, cmd: Command) -> Self {
self.core_command = Some(cmd);
self
}
pub async fn build(self) -> Result<Processes, anyhow::Error> {
let mut server = Server::start_processes(StartProcessOpts {
shard_command: self.shard_command,
num_shards: 1,
core_command: self.core_command,
}).await?;
let core_process = server.core;
let shard_process = server.shards.remove(0);
Ok(Processes {
core_process,
shard_process,
})
}
}
/// A representation of the running processes that we can connect and send messages to.
pub struct Processes {
shard_process: Process,
core_process: Process,
}
impl Processes {
pub async fn cleanup(self) {
let handle = tokio::spawn(async move {
let _ = tokio::join!(
self.shard_process.kill(),
self.core_process.kill()
);
});
// You can wait for cleanup but aren't obliged to:
let _ = handle.await;
}
}