diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 80cc188..4f00159 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1259,29 +1259,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "shard" -version = "0.1.0" -dependencies = [ - "anyhow", - "bincode", - "common", - "futures", - "hex", - "http", - "log", - "primitive-types", - "serde", - "serde_json", - "simple_logger", - "soketto", - "structopt", - "thiserror", - "tokio", - "tokio-util", - "warp", -] - [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1395,7 +1372,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] -name = "telemetry" +name = "telemetry_core" version = "0.1.0" dependencies = [ "anyhow", @@ -1423,6 +1400,29 @@ dependencies = [ "warp", ] +[[package]] +name = "telemetry_shard" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "common", + "futures", + "hex", + "http", + "log", + "primitive-types", + "serde", + "serde_json", + "simple_logger", + "soketto", + "structopt", + "thiserror", + "tokio", + "tokio-util", + "warp", +] + [[package]] name = "tempfile" version = "3.2.0" @@ -1441,6 +1441,7 @@ dependencies = [ name = "test_utils" version = "0.1.0" dependencies = [ + "anyhow", "futures", "http", "log", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 451cfff..7061f61 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,8 +1,8 @@ [workspace] members = [ "common", - "telemetry", - "shard", + "telemetry_core", + "telemetry_shard", "test_utils" ] diff --git a/backend/README.md b/backend/README.md index 90bb37b..6cbcbe5 100644 --- a/backend/README.md +++ b/backend/README.md @@ -2,8 +2,8 @@ This folder contains the rust crates and documentation specific to the telemetry backend. A description of the folders: -- [telemetry](./telemetry): The Telemetry Core. This aggregates data received from shards and allows UI feeds to connect and receive this information. -- [shard](./shard): A Shard. It's expected that multiple of these will run. Nodes will connect to Shard instances and send JSON telemetry to them, and Shard instances will each connect to the Telemetry Core and relay on relevant data to it. +- [telemetry_core](./telemetry_core): The Telemetry Core. This aggregates data received from shards and allows UI feeds to connect and receive this information. +- [telemetry_shard](./telemetry_shard): A Shard. It's expected that multiple of these will run. Nodes will connect to Shard instances and send JSON telemetry to them, and Shard instances will each connect to the Telemetry Core and relay on relevant data to it. - [common](./common): common code shared between the telemetry shard and core - [docs](./docs): Material supporting the documentation lives here diff --git a/backend/telemetry/Cargo.toml b/backend/telemetry_core/Cargo.toml similarity index 96% rename from backend/telemetry/Cargo.toml rename to backend/telemetry_core/Cargo.toml index 72d6aea..3ddca7c 100644 --- a/backend/telemetry/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "telemetry" +name = "telemetry_core" version = "0.1.0" authors = ["Parity Technologies Ltd. "] edition = "2018" diff --git a/backend/telemetry/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs similarity index 100% rename from backend/telemetry/src/aggregator/aggregator.rs rename to backend/telemetry_core/src/aggregator/aggregator.rs diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs similarity index 100% rename from backend/telemetry/src/aggregator/inner_loop.rs rename to backend/telemetry_core/src/aggregator/inner_loop.rs diff --git a/backend/telemetry/src/aggregator/mod.rs b/backend/telemetry_core/src/aggregator/mod.rs similarity index 100% rename from backend/telemetry/src/aggregator/mod.rs rename to backend/telemetry_core/src/aggregator/mod.rs diff --git a/backend/telemetry/src/feed_message.rs b/backend/telemetry_core/src/feed_message.rs similarity index 100% rename from backend/telemetry/src/feed_message.rs rename to backend/telemetry_core/src/feed_message.rs diff --git a/backend/telemetry/src/find_location.rs b/backend/telemetry_core/src/find_location.rs similarity index 100% rename from backend/telemetry/src/find_location.rs rename to backend/telemetry_core/src/find_location.rs diff --git a/backend/telemetry/src/main.rs b/backend/telemetry_core/src/main.rs similarity index 100% rename from backend/telemetry/src/main.rs rename to backend/telemetry_core/src/main.rs diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry_core/src/state/chain.rs similarity index 100% rename from backend/telemetry/src/state/chain.rs rename to backend/telemetry_core/src/state/chain.rs diff --git a/backend/telemetry/src/state/mod.rs b/backend/telemetry_core/src/state/mod.rs similarity index 100% rename from backend/telemetry/src/state/mod.rs rename to backend/telemetry_core/src/state/mod.rs diff --git a/backend/telemetry/src/state/node.rs b/backend/telemetry_core/src/state/node.rs similarity index 100% rename from backend/telemetry/src/state/node.rs rename to backend/telemetry_core/src/state/node.rs diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry_core/src/state/state.rs similarity index 100% rename from backend/telemetry/src/state/state.rs rename to backend/telemetry_core/src/state/state.rs diff --git a/backend/shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml similarity index 96% rename from backend/shard/Cargo.toml rename to backend/telemetry_shard/Cargo.toml index e3a64db..ffc28f7 100644 --- a/backend/shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "shard" +name = "telemetry_shard" version = "0.1.0" authors = ["Parity Technologies Ltd. "] edition = "2018" diff --git a/backend/shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs similarity index 100% rename from backend/shard/src/aggregator.rs rename to backend/telemetry_shard/src/aggregator.rs diff --git a/backend/shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs similarity index 100% rename from backend/shard/src/connection.rs rename to backend/telemetry_shard/src/connection.rs diff --git a/backend/shard/src/json_message/hash.rs b/backend/telemetry_shard/src/json_message/hash.rs similarity index 100% rename from backend/shard/src/json_message/hash.rs rename to backend/telemetry_shard/src/json_message/hash.rs diff --git a/backend/shard/src/json_message/mod.rs b/backend/telemetry_shard/src/json_message/mod.rs similarity index 100% rename from backend/shard/src/json_message/mod.rs rename to backend/telemetry_shard/src/json_message/mod.rs diff --git a/backend/shard/src/json_message/node_message.rs b/backend/telemetry_shard/src/json_message/node_message.rs similarity index 100% rename from backend/shard/src/json_message/node_message.rs rename to backend/telemetry_shard/src/json_message/node_message.rs diff --git a/backend/shard/src/main.rs b/backend/telemetry_shard/src/main.rs similarity index 100% rename from backend/shard/src/main.rs rename to backend/telemetry_shard/src/main.rs diff --git a/backend/shard/src/real_ip.rs b/backend/telemetry_shard/src/real_ip.rs similarity index 100% rename from backend/shard/src/real_ip.rs rename to backend/telemetry_shard/src/real_ip.rs diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml index 72ac927..8236983 100644 --- a/backend/test_utils/Cargo.toml +++ b/backend/test_utils/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.41" futures = "0.3.15" http = "0.2.4" log = "0.4.14" diff --git a/backend/test_utils/src/connect_to_servers.rs b/backend/test_utils/src/connect_to_servers.rs index 158561e..c1b72ec 100644 --- a/backend/test_utils/src/connect_to_servers.rs +++ b/backend/test_utils/src/connect_to_servers.rs @@ -1,21 +1,183 @@ -use crate::ws_client::{ Sender, Receiver }; +use crate::ws_client; +use tokio::process::Command; +use tokio::io::BufReader; +use tokio::io::{ AsyncRead, AsyncBufReadExt }; +use tokio::time::Duration; +use anyhow::{ anyhow, Context }; -/// We either say where to conenct to, or we start the binaries -/// ourselves. Either way, we hand back a `Connection` object -/// which allows us to talk to the running instances. -pub enum Opts { - StartProcesses { - shard_command: Option, - num_shards: usize, - telemetry_command: Option - }, - ConnectToExisting { - shard_uris: Vec, - telemetry_uri: http::Uri - } +pub struct StartProcesses { + /// Optional command to run to start a shard (instead of `telemetry_shard`). + /// The `--listen` argument will be appended here and shouldn't be provided. + pub shard_command: Option, + /// Optional command to run to start a telemetry core process (instead of `telemetry_core`). + /// The `--listen` argument will be appended here and shouldn't be provided. + pub telemetry_command: Option, + /// How many connections should we establish to each shard? We'll start + /// up a shard for each entry here. + pub num_shard_connections: Vec, + /// How many connections should we establish to the telemetry_core feed? + pub num_feed_connections: usize, +} + +pub struct ConnectToExisting { + /// URI to shard /submit endpoint + pub shards: Vec, + /// URI to core /feed endpoint + pub feed: ConnectionDetails, +} + +pub struct ConnectionDetails { + pub uri: http::Uri, + pub num_connections: usize +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Can't establsih connection: {0}")] + ConnectionError(#[from] ws_client::ConnectError), + #[error("Can't establsih connection: {0}")] + JoinError(#[from] tokio::task::JoinError), + #[error("Can't establsih connection: {0}")] + IoError(#[from] std::io::Error), + #[error("Could not obtain port for process: {0}")] + ErrorObtainingPort(anyhow::Error) } pub struct Connection { - shard_sockets: Vec<(Sender, Receiver)>, - telemetry_socket: Vec<(Sender, Receiver)> + /// Connections to each of the shard submit URIs + pub shard_connections: Vec>, + /// Connections to the telemetry feed URI + pub feed_connections: Vec<(ws_client::Sender, ws_client::Receiver)> +} + +impl Connection { + /// Start telemetry_core and telemetry_shard processes and establish connections to them. + pub async fn start_processes(opts: StartProcesses) -> Result { + + let mut core_cmd = opts.telemetry_command + .unwrap_or(Command::new("telemetry_core")) + .arg("--listen") + .arg("127.0.0.1:0") // 0 to have a port picked by the kernel + .arg("--log") + .arg("info") + .stdout(std::process::Stdio::piped()) + .stdin(std::process::Stdio::piped()) + .spawn()?; + + // Find out the port that this is running on + let core_port = get_port(core_cmd.stdout.take().expect("core stdout")) + .await + .map_err(|e| Error::ErrorObtainingPort(e))?; + + let mut shard_cmd = opts.shard_command.unwrap_or(Command::new("telemetry_shard")); + shard_cmd + .arg("--listen") + .arg("127.0.0.1:0") // 0 to have a port picked by the kernel + .arg("--log") + .arg("info") + .arg("--core") + .arg(format!("127.0.0.1:{}", core_port)) + .stdout(std::process::Stdio::piped()) + .stdin(std::process::Stdio::piped()); + + // Start shards and find out the ports that they are running on + let mut shard_ports: Vec = vec![]; + for _ in 0..opts.num_shard_connections.len() { + let mut shard_process = shard_cmd.spawn()?; + let shard_port = get_port(shard_process.stdout.take().expect("shard stdout")) + .await + .map_err(|e| Error::ErrorObtainingPort(e))?; + + shard_ports.push(shard_port); + } + + // now that we've started the processes, establish connections to them: + let shard_uris: Vec = shard_ports + .into_iter() + .map(|port| format!("http://127.0.0.1:{}/submit", port).parse().expect("valid submit URI")) + .collect(); + + let feed_uri = format!("http://127.0.0.1:{}/feed", core_port) + .parse() + .expect("valid feed URI"); + + ConnectToExisting { + feed: ConnectionDetails { + uri: feed_uri, + num_connections: opts.num_feed_connections + }, + shards: opts.num_shard_connections + .into_iter() + .zip(shard_uris) + .map(|(n, uri)| ConnectionDetails { + uri, + num_connections: n + }) + .collect() + }; + + todo!(); + } + + /// Establshes the requested connections to existing processes. + pub async fn connect_to_existing(opts: ConnectToExisting) -> Result { + let shard_details = opts.shards; + + // connect to shards in the background: + let shard_groups_fut = tokio::spawn(async move { + let mut shard_results = vec![]; + for details in &shard_details { + shard_results.push(connect_to_uri(&details.uri, details.num_connections).await); + } + let result_shard_groups: Result,_> = shard_results.into_iter().collect(); + result_shard_groups + }); + + // In the meantime, connect feeds: + let feed_connections = connect_to_uri(&opts.feed.uri, opts.feed.num_connections).await?; + + // Now feeds are done, wait until shards also connected (this will have been progressing anyway): + let shard_connections = shard_groups_fut.await??; + + Ok(Connection { + shard_connections, + feed_connections, + }) + } +} + +async fn get_port(reader: R) -> Result { + let reader = BufReader::new(reader); + let mut reader_lines = reader.lines(); + + loop { + let line = tokio::time::timeout( + Duration::from_secs(1), + reader_lines.next_line() + ).await; + + let line = match line { + // timeout expired; couldn't get port: + Err(_) => return Err(anyhow!("Timeout expired waiting to discover port")), + // Something went wrong reading line; bail: + Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)), + // No more output; process ended? bail: + Ok(Ok(None)) => return Err(anyhow!("No more output from stdout; has the process ended?")), + // All OK, and a line is given back; phew! + Ok(Ok(Some(line))) => line + }; + + let (_, port_str) = match line.rsplit_once("listening on http://127.0.0.1:") { + Some(m) => m, + None => continue + }; + + return port_str.parse().with_context(|| "Could not parse output to port"); + } +} + +async fn connect_to_uri(uri: &http::Uri, num_connections: usize) -> Result, ws_client::ConnectError> { + let connect_futs = (0..num_connections).map(|_| ws_client::connect(uri)); + let sockets: Result,_> = futures::future::join_all(connect_futs).await.into_iter().collect(); + sockets } \ No newline at end of file diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index 6802eb8..9b2eeb2 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -1,3 +1,3 @@ /// A wrapper around soketto to simplify the process of establishing connections -mod ws_client; -mod connect_to_servers; \ No newline at end of file +pub mod ws_client; +pub mod connect_to_servers; \ No newline at end of file