mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-22 18:28:06 +00:00
Get a basic first test working, and lots of changes to supporting code to facilitate this
This commit is contained in:
@@ -1,204 +0,0 @@
|
||||
use crate::ws_client;
|
||||
use tokio::process::{ self, Command };
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{ AsyncRead, AsyncBufReadExt };
|
||||
use tokio::time::Duration;
|
||||
use anyhow::{ anyhow, Context };
|
||||
|
||||
pub struct StartProcessOpts {
|
||||
/// Optional command to run to start a shard (instead of `telemetry_shard`).
|
||||
/// The `--listen` and `--log` arguments will be appended within and shouldn't be provided.
|
||||
pub shard_command: Option<Command>,
|
||||
/// How many shards should we start?
|
||||
pub num_shards: usize,
|
||||
/// Optional command to run to start a telemetry core process (instead of `telemetry_core`).
|
||||
/// The `--listen` and `--log` arguments will be appended within and shouldn't be provided.
|
||||
pub core_command: Option<Command>
|
||||
}
|
||||
|
||||
pub struct ConnectToExistingOpts {
|
||||
/// Details for connections to `telemetry_shard` /submit endpoints
|
||||
pub shard_uris: Vec<http::Uri>,
|
||||
/// Details for connections to `telemetry_core` /feed endpoints
|
||||
pub feed_uri: http::Uri,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
ConnectionError(#[from] ws_client::ConnectError),
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
JoinError(#[from] tokio::task::JoinError),
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Could not obtain port for process: {0}")]
|
||||
ErrorObtainingPort(anyhow::Error),
|
||||
#[error("Whoops; attempt to kill a process we didn't start (and so have no handle to)")]
|
||||
CannotKillNoHandle
|
||||
}
|
||||
|
||||
/// This provides back connections (or groups of connections) that are
|
||||
/// hooked up to the running processes and ready to send/receive messages.
|
||||
pub struct Server {
|
||||
/// Shard processes that we can connect to
|
||||
pub shards: Vec<Process>,
|
||||
/// Core process that we can connect to
|
||||
pub core: Process,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// Start telemetry_core and telemetry_shard processes and establish connections to them.
|
||||
pub async fn start_processes(opts: StartProcessOpts) -> Result<Server, Error> {
|
||||
|
||||
let mut core_cmd = opts.core_command
|
||||
.unwrap_or(Command::new("telemetry_core"))
|
||||
.arg("--listen")
|
||||
.arg("127.0.0.1:0") // 0 to have a port picked by the kernel
|
||||
.arg("--log")
|
||||
.arg("info")
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
// Find out the port that this is running on
|
||||
let core_port = get_port(core_cmd.stdout.take().expect("core stdout"))
|
||||
.await
|
||||
.map_err(|e| Error::ErrorObtainingPort(e))?;
|
||||
|
||||
let mut shard_cmd = opts.shard_command.unwrap_or(Command::new("telemetry_shard"));
|
||||
shard_cmd
|
||||
.arg("--listen")
|
||||
.arg("127.0.0.1:0") // 0 to have a port picked by the kernel
|
||||
.arg("--log")
|
||||
.arg("info")
|
||||
.arg("--core")
|
||||
.arg(format!("127.0.0.1:{}", core_port))
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stdin(std::process::Stdio::piped());
|
||||
|
||||
// Start shards and find out the ports that they are running on
|
||||
let mut shard_handle_and_ports: Vec<(process::Child, u16)> = vec![];
|
||||
for _ in 0..opts.num_shards {
|
||||
let mut shard_process = shard_cmd.spawn()?;
|
||||
let shard_port = get_port(shard_process.stdout.take().expect("shard stdout"))
|
||||
.await
|
||||
.map_err(|e| Error::ErrorObtainingPort(e))?;
|
||||
|
||||
shard_handle_and_ports.push((shard_process, shard_port));
|
||||
}
|
||||
|
||||
// now that we've started the processes, establish connections to them:
|
||||
let shard_handle_and_uris: Vec<(process::Child, http::Uri)> = shard_handle_and_ports
|
||||
.into_iter()
|
||||
.map(|(h,port)| (h,format!("http://127.0.0.1:{}/submit", port).parse().expect("valid submit URI")))
|
||||
.collect();
|
||||
|
||||
let feed_uri = format!("http://127.0.0.1:{}/feed", core_port)
|
||||
.parse()
|
||||
.expect("valid feed URI");
|
||||
|
||||
Ok(Server {
|
||||
shards: shard_handle_and_uris
|
||||
.into_iter()
|
||||
.map(|(handle, uri)| Process {
|
||||
handle: Some(handle),
|
||||
uri,
|
||||
})
|
||||
.collect(),
|
||||
core: Process {
|
||||
handle: Some(core_cmd),
|
||||
uri: feed_uri,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Establshes the requested connections to existing processes.
|
||||
pub fn connect_to_existing(opts: ConnectToExistingOpts) -> Server {
|
||||
Server {
|
||||
shards: opts.shard_uris
|
||||
.into_iter()
|
||||
.map(|uri| Process { uri, handle: None })
|
||||
.collect(),
|
||||
core: Process { uri: opts.feed_uri, handle: None }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This represents a running process that we can connect to, which
|
||||
/// may be either a `telemetry_shard` or `telemetry_core`.
|
||||
pub struct Process {
|
||||
/// If we started the processes ourselves, we'll have a handle to
|
||||
/// them which we can use to kill them. Else, we may not.
|
||||
handle: Option<process::Child>,
|
||||
/// The URI that we can use to connect to the process socket.
|
||||
uri: http::Uri
|
||||
}
|
||||
|
||||
impl Process {
|
||||
/// Establish a connection to the process
|
||||
pub async fn connect(&self) -> Result<(ws_client::Sender, ws_client::Receiver), Error> {
|
||||
ws_client::connect(&self.uri)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
/// Establish multiple connections to the process
|
||||
pub async fn connect_multiple(&self, num_connections: usize) -> Result<Vec<(ws_client::Sender, ws_client::Receiver)>, Error> {
|
||||
connect_multiple_to_uri(&self.uri, num_connections)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
/// Kill the process and wait for this to complete
|
||||
pub async fn kill(self) -> Result<(), Error> {
|
||||
match self.handle {
|
||||
Some(mut handle) => Ok(handle.kill().await?),
|
||||
None => Err(Error::CannotKillNoHandle)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads from the stdout of the shard/core process to extract the port that was assigned to it,
|
||||
/// with the side benefit that we'll wait for it to start listening before returning. We do this
|
||||
/// because we want to allow the kernel to assign ports and so don't specify a port as an arg.
|
||||
async fn get_port<R: AsyncRead + Unpin>(reader: R) -> Result<u16, anyhow::Error> {
|
||||
let reader = BufReader::new(reader);
|
||||
let mut reader_lines = reader.lines();
|
||||
|
||||
loop {
|
||||
let line = tokio::time::timeout(
|
||||
Duration::from_secs(1),
|
||||
reader_lines.next_line()
|
||||
).await;
|
||||
|
||||
let line = match line {
|
||||
// timeout expired; couldn't get port:
|
||||
Err(_) => return Err(anyhow!("Timeout expired waiting to discover port")),
|
||||
// Something went wrong reading line; bail:
|
||||
Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)),
|
||||
// No more output; process ended? bail:
|
||||
Ok(Ok(None)) => return Err(anyhow!("No more output from stdout; has the process ended?")),
|
||||
// All OK, and a line is given back; phew!
|
||||
Ok(Ok(Some(line))) => line
|
||||
};
|
||||
|
||||
let (_, port_str) = match line.rsplit_once("listening on http://127.0.0.1:") {
|
||||
Some(m) => m,
|
||||
None => continue
|
||||
};
|
||||
|
||||
return port_str.parse().with_context(|| "Could not parse output to port");
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_multiple_to_uri(uri: &http::Uri, num_connections: usize) -> Result<Vec<(ws_client::Sender, ws_client::Receiver)>, ws_client::ConnectError> {
|
||||
let connect_futs = (0..num_connections)
|
||||
.map(|_| ws_client::connect(uri));
|
||||
let sockets: Result<Vec<_>,_> = futures::future::join_all(connect_futs)
|
||||
.await
|
||||
.into_iter()
|
||||
.collect();
|
||||
sockets
|
||||
}
|
||||
@@ -0,0 +1,277 @@
|
||||
use common::node_types::{BlockDetails, BlockHash, BlockNumber, NodeLocation, NodeStats, Timestamp};
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum FeedMessage {
|
||||
Version(usize),
|
||||
BestBlock {
|
||||
block_number: BlockNumber,
|
||||
timestamp: Timestamp,
|
||||
avg_block_time: Option<u64>,
|
||||
},
|
||||
BestFinalized {
|
||||
block_number: BlockNumber,
|
||||
block_hash: BlockHash,
|
||||
},
|
||||
AddedNode {
|
||||
node_id: usize,
|
||||
node: NodeDetails,
|
||||
stats: NodeStats,
|
||||
// io: NodeIO, // can't losslessly deserialize
|
||||
// hardware: NodeHardware, // can't losslessly deserialize
|
||||
block_details: BlockDetails,
|
||||
location: NodeLocation,
|
||||
startup_time: Option<Timestamp>,
|
||||
},
|
||||
RemovedNode {
|
||||
node_id: usize,
|
||||
},
|
||||
LocatedNode {
|
||||
node_id: usize,
|
||||
lat: f32,
|
||||
long: f32,
|
||||
city: String,
|
||||
},
|
||||
ImportedBlock {
|
||||
node_id: usize,
|
||||
block_details: BlockDetails,
|
||||
},
|
||||
FinalizedBlock {
|
||||
node_id: usize,
|
||||
block_number: BlockNumber,
|
||||
block_hash: BlockHash,
|
||||
},
|
||||
NodeStatsUpdate {
|
||||
node_id: usize,
|
||||
stats: NodeStats,
|
||||
},
|
||||
Hardware {
|
||||
node_id: usize,
|
||||
// hardware: NodeHardware, // Can't losslessly deserialize
|
||||
},
|
||||
TimeSync {
|
||||
time: Timestamp,
|
||||
},
|
||||
AddedChain {
|
||||
name: String,
|
||||
node_count: usize,
|
||||
},
|
||||
RemovedChain {
|
||||
name: String,
|
||||
},
|
||||
SubscribedTo {
|
||||
name: String,
|
||||
},
|
||||
UnsubscribedFrom {
|
||||
name: String,
|
||||
},
|
||||
Pong {
|
||||
msg: String,
|
||||
},
|
||||
AfgFinalized {
|
||||
address: String,
|
||||
block_number: BlockNumber,
|
||||
block_hash: BlockHash,
|
||||
},
|
||||
AfgReceivedPrevote {
|
||||
address: String,
|
||||
block_number: BlockNumber,
|
||||
block_hash: BlockHash,
|
||||
voter: Option<String>
|
||||
},
|
||||
AfgReceivedPrecommit {
|
||||
address: String,
|
||||
block_number: BlockNumber,
|
||||
block_hash: BlockHash,
|
||||
voter: Option<String>,
|
||||
},
|
||||
AfgAuthoritySet { // Not used currently; not sure what "address" params are:
|
||||
a1: String,
|
||||
a2: String,
|
||||
a3: String,
|
||||
block_number: BlockNumber,
|
||||
block_hash: BlockHash,
|
||||
},
|
||||
StaleNode {
|
||||
node_id: usize,
|
||||
},
|
||||
NodeIOUpdate {
|
||||
node_id: usize,
|
||||
// details: NodeIO, // can't losslessly deserialize
|
||||
},
|
||||
/// A "special" case when we don't know how to decode an action:
|
||||
UnknownValue {
|
||||
action: u8,
|
||||
value: String
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct NodeDetails {
|
||||
pub name: String,
|
||||
pub implementation: String,
|
||||
pub version: String,
|
||||
pub validator: Option<String>,
|
||||
pub network_id: Option<String>,
|
||||
}
|
||||
|
||||
impl FeedMessage {
|
||||
/// Decode a slice of bytes into a vector of feed messages
|
||||
pub fn from_bytes(bytes: &[u8]) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
let v: Vec<&RawValue> = serde_json::from_slice(bytes)?;
|
||||
|
||||
let mut feed_messages = vec![];
|
||||
for raw_keyval in v.windows(2) {
|
||||
let raw_key = raw_keyval[0];
|
||||
let raw_val = raw_keyval[1];
|
||||
feed_messages.push(FeedMessage::decode(raw_key, raw_val)?);
|
||||
}
|
||||
|
||||
Ok(feed_messages)
|
||||
}
|
||||
|
||||
// Deserialize the feed message to a value based on the "action" key
|
||||
fn decode(raw_key: &RawValue, raw_val: &RawValue) -> Result<FeedMessage, anyhow::Error> {
|
||||
let action: u8 = serde_json::from_str(raw_key.get())?;
|
||||
let feed_message = match action {
|
||||
// Version:
|
||||
0 => {
|
||||
let version = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::Version(version)
|
||||
},
|
||||
// BestBlock
|
||||
1 => {
|
||||
let (block_number, timestamp, avg_block_time) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::BestBlock { block_number, timestamp, avg_block_time }
|
||||
},
|
||||
// BestFinalized
|
||||
2 => {
|
||||
let (block_number, block_hash) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::BestFinalized { block_number, block_hash }
|
||||
}
|
||||
// AddNode
|
||||
3 => {
|
||||
let (
|
||||
node_id,
|
||||
( name, implementation, version, validator, network_id ),
|
||||
stats,
|
||||
io,
|
||||
hardware,
|
||||
block_details,
|
||||
location,
|
||||
startup_time,
|
||||
) = serde_json::from_str(raw_val.get())?;
|
||||
|
||||
// Give these two types but don't use the results:
|
||||
let (_,_): (&RawValue, &RawValue) = (io, hardware);
|
||||
|
||||
FeedMessage::AddedNode {
|
||||
node_id,
|
||||
node: NodeDetails { name, implementation, version, validator, network_id },
|
||||
stats,
|
||||
block_details,
|
||||
location,
|
||||
startup_time,
|
||||
}
|
||||
},
|
||||
// RemoveNode
|
||||
4 => {
|
||||
let node_id = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::RemovedNode { node_id }
|
||||
},
|
||||
// LocatedNode
|
||||
5 => {
|
||||
let (node_id, lat, long, city) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::LocatedNode { node_id, lat, long, city }
|
||||
},
|
||||
// ImportedBlock
|
||||
6 => {
|
||||
let (node_id, block_details) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::ImportedBlock { node_id, block_details }
|
||||
},
|
||||
// FinalizedBlock
|
||||
7 => {
|
||||
let (node_id, block_number, block_hash) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::FinalizedBlock { node_id, block_number, block_hash }
|
||||
},
|
||||
// NodeStatsUpdate
|
||||
8 => {
|
||||
let (node_id, stats) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::NodeStatsUpdate { node_id, stats }
|
||||
},
|
||||
// Hardware
|
||||
9 => {
|
||||
let (node_id, _hardware): (_, &RawValue) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::Hardware { node_id }
|
||||
},
|
||||
// TimeSync
|
||||
10 => {
|
||||
let time = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::TimeSync { time }
|
||||
},
|
||||
// AddedChain
|
||||
11 => {
|
||||
let (name, node_count) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::AddedChain { name, node_count }
|
||||
},
|
||||
// RemovedChain
|
||||
12 => {
|
||||
let name = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::RemovedChain { name }
|
||||
},
|
||||
// SubscribedTo
|
||||
13 => {
|
||||
let name = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::SubscribedTo { name }
|
||||
},
|
||||
// UnsubscribedFrom
|
||||
14 => {
|
||||
let name = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::UnsubscribedFrom { name }
|
||||
},
|
||||
// Pong
|
||||
15 => {
|
||||
let msg = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::Pong { msg }
|
||||
},
|
||||
// AfgFinalized
|
||||
16 => {
|
||||
let (address, block_number, block_hash) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::AfgFinalized { address, block_number, block_hash }
|
||||
},
|
||||
// AfgReceivedPrevote
|
||||
17 => {
|
||||
let (address, block_number, block_hash, voter) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::AfgReceivedPrevote { address, block_number, block_hash, voter }
|
||||
},
|
||||
// AfgReceivedPrecommit
|
||||
18 => {
|
||||
let (address, block_number, block_hash, voter) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::AfgReceivedPrecommit { address, block_number, block_hash, voter }
|
||||
},
|
||||
// AfgAuthoritySet
|
||||
19 => {
|
||||
let (a1, a2, a3, block_number, block_hash) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::AfgAuthoritySet { a1, a2, a3, block_number, block_hash }
|
||||
},
|
||||
// StaleNode
|
||||
20 => {
|
||||
let node_id = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::StaleNode { node_id }
|
||||
},
|
||||
// NodeIOUpdate
|
||||
21 => {
|
||||
// ignore NodeIO for now:
|
||||
let (node_id, _node_io): (_, &RawValue) = serde_json::from_str(raw_val.get())?;
|
||||
FeedMessage::NodeIOUpdate { node_id }
|
||||
},
|
||||
// A catchall for messages we don't know/care about yet:
|
||||
_ => {
|
||||
let value = raw_val.to_string();
|
||||
FeedMessage::UnknownValue { action, value }
|
||||
},
|
||||
};
|
||||
|
||||
Ok(feed_message)
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
// A helper to spawn or connect to shard/core processes and hand back connections to them
|
||||
pub mod connect_to_servers;
|
||||
/// Create/connect to a server consisting of shards and a core process that we can interact with.
|
||||
pub mod server;
|
||||
|
||||
/// Test support for deserializing feed messages from the feed processes. This basically
|
||||
/// is the slightly-lossy inverse of the custom serialization we do to feed messages.
|
||||
pub mod feed_message_de;
|
||||
|
||||
/// A wrapper around soketto to simplify the process of establishing connections
|
||||
/// and sending messages. Provides cancel-safe message channels.
|
||||
pub mod ws_client;
|
||||
|
||||
/// A helper to construct simple test cases involving a single shard and feed.
|
||||
pub mod test_simple;
|
||||
@@ -0,0 +1,117 @@
|
||||
use crate::ws_client;
|
||||
use futures::{Sink, SinkExt, Stream, StreamExt};
|
||||
use crate::feed_message_de::FeedMessage;
|
||||
|
||||
/// Wrap a `ws_client::Sender` with convenient utility methods for shard connections
|
||||
pub struct ShardSender(ws_client::Sender);
|
||||
|
||||
impl From<ws_client::Sender> for ShardSender {
|
||||
fn from(c: ws_client::Sender) -> Self { ShardSender(c) }
|
||||
}
|
||||
|
||||
impl Sink<ws_client::Message> for ShardSender {
|
||||
type Error = ws_client::SendError;
|
||||
fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready_unpin(cx)
|
||||
}
|
||||
fn start_send(mut self: std::pin::Pin<&mut Self>, item: ws_client::Message) -> Result<(), Self::Error> {
|
||||
self.0.start_send_unpin(item)
|
||||
}
|
||||
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_flush_unpin(cx)
|
||||
}
|
||||
fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_close_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShardSender {
|
||||
pub async fn send_json_binary(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> {
|
||||
let bytes = serde_json::to_vec(&json).expect("valid bytes");
|
||||
self.send(ws_client::Message::Binary(bytes)).await
|
||||
}
|
||||
pub async fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> {
|
||||
let s = serde_json::to_string(&json).expect("valid string");
|
||||
self.send(ws_client::Message::Text(s)).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap a `ws_client::Receiver` with convenient utility methods for shard connections
|
||||
pub struct ShardReceiver(ws_client::Receiver);
|
||||
|
||||
impl From<ws_client::Receiver> for ShardReceiver {
|
||||
fn from(c: ws_client::Receiver) -> Self { ShardReceiver(c) }
|
||||
}
|
||||
|
||||
impl Stream for ShardReceiver {
|
||||
type Item = Result<ws_client::Message, ws_client::RecvError>;
|
||||
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.0.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Wrap a `ws_client::Sender` with convenient utility methods for feed connections
|
||||
pub struct FeedSender(ws_client::Sender);
|
||||
|
||||
impl From<ws_client::Sender> for FeedSender {
|
||||
fn from(c: ws_client::Sender) -> Self { FeedSender(c) }
|
||||
}
|
||||
|
||||
impl Sink<ws_client::Message> for FeedSender {
|
||||
type Error = ws_client::SendError;
|
||||
fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready_unpin(cx)
|
||||
}
|
||||
fn start_send(mut self: std::pin::Pin<&mut Self>, item: ws_client::Message) -> Result<(), Self::Error> {
|
||||
self.0.start_send_unpin(item)
|
||||
}
|
||||
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_flush_unpin(cx)
|
||||
}
|
||||
fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_close_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl FeedSender {
|
||||
pub async fn send_command<S: AsRef<str>>(&mut self, command: S, param: S) -> Result<(), ws_client::SendError> {
|
||||
self.send(ws_client::Message::Text(format!("{}:{}", command.as_ref(), param.as_ref()))).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap a `ws_client::Receiver` with convenient utility methods for feed connections
|
||||
pub struct FeedReceiver(ws_client::Receiver);
|
||||
|
||||
impl From<ws_client::Receiver> for FeedReceiver {
|
||||
fn from(c: ws_client::Receiver) -> Self { FeedReceiver(c) }
|
||||
}
|
||||
|
||||
impl Stream for FeedReceiver {
|
||||
type Item = Result<ws_client::Message, ws_client::RecvError>;
|
||||
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.0.poll_next_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl FeedReceiver {
|
||||
/// Wait for the next set of feed messages to arrive. Returns an error if the connection
|
||||
/// is closed, or the messages that come back cannot be properly decoded.
|
||||
pub async fn recv_feed_messages(&mut self) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
let msg = self.0
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))??;
|
||||
|
||||
match msg {
|
||||
ws_client::Message::Binary(data) => {
|
||||
let messages = FeedMessage::from_bytes(&data)?;
|
||||
Ok(messages)
|
||||
},
|
||||
ws_client::Message::Text(text) => {
|
||||
let messages = FeedMessage::from_bytes(text.as_bytes())?;
|
||||
Ok(messages)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
use super::Command;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub fn default_telemetry_shard_command() -> Result<Command, std::io::Error> {
|
||||
default_telemetry_command("telemetry_shard")
|
||||
}
|
||||
|
||||
pub fn default_telemetry_core_command() -> Result<Command, std::io::Error> {
|
||||
default_telemetry_command("telemetry_core")
|
||||
}
|
||||
|
||||
fn default_telemetry_command(bin: &'static str) -> Result<Command, std::io::Error> {
|
||||
let mut workspace_dir = try_find_workspace_dir()?;
|
||||
workspace_dir.push("Cargo.toml");
|
||||
Ok(Command::new("cargo")
|
||||
.arg("run")
|
||||
.arg("--bin")
|
||||
.arg(bin)
|
||||
.arg("--manifest-path")
|
||||
.arg(workspace_dir)
|
||||
.arg("--"))
|
||||
}
|
||||
|
||||
/// A _very_ naive way to find the workspace ("backend") directory
|
||||
/// from the current path (which is assumed to be inside it).
|
||||
fn try_find_workspace_dir() -> Result<PathBuf, std::io::Error> {
|
||||
let mut dir = std::env::current_dir()?;
|
||||
while !dir.ends_with("backend") && dir.pop() {}
|
||||
Ok(dir)
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
mod utils;
|
||||
mod server;
|
||||
mod default_commands;
|
||||
|
||||
pub mod channels;
|
||||
pub use server::*;
|
||||
@@ -0,0 +1,324 @@
|
||||
use std::ffi::OsString;
|
||||
use std::marker::PhantomData;
|
||||
use crate::ws_client;
|
||||
use tokio::process::{ self, Command as TokioCommand };
|
||||
use super::{ channels, utils };
|
||||
use common::{ id_type, DenseMap };
|
||||
|
||||
id_type! {
|
||||
/// The ID of a running process. Cannot be constructed externally.
|
||||
pub struct ProcessId(usize);
|
||||
}
|
||||
|
||||
pub struct StartOpts {
|
||||
/// Optional command to run to start a shard (instead of `telemetry_shard`).
|
||||
/// The `--listen` and `--log` arguments will be appended within and shouldn't be provided.
|
||||
pub shard_command: Option<Command>,
|
||||
/// Optional command to run to start a telemetry core process (instead of `telemetry_core`).
|
||||
/// The `--listen` and `--log` arguments will be appended within and shouldn't be provided.
|
||||
pub core_command: Option<Command>
|
||||
}
|
||||
|
||||
impl Default for StartOpts {
|
||||
fn default() -> Self {
|
||||
StartOpts {
|
||||
shard_command: None,
|
||||
core_command: None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectToExistingOpts {
|
||||
/// Details for connections to `telemetry_shard` /submit endpoints
|
||||
pub shard_uris: Vec<http::Uri>,
|
||||
/// Details for connections to `telemetry_core` /feed endpoints
|
||||
pub feed_uri: http::Uri,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
ConnectionError(#[from] ws_client::ConnectError),
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
JoinError(#[from] tokio::task::JoinError),
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Could not obtain port for process: {0}")]
|
||||
ErrorObtainingPort(anyhow::Error),
|
||||
#[error("Whoops; attempt to kill a process we didn't start (and so have no handle to)")]
|
||||
CannotKillNoHandle,
|
||||
#[error("Whoops; attempt to add a shard to a server we didn't start (and so have no handle to)")]
|
||||
CannotAddShardNoHandle,
|
||||
}
|
||||
|
||||
/// This provides back connections (or groups of connections) that are
|
||||
/// hooked up to the running processes and ready to send/receive messages.
|
||||
pub struct Server {
|
||||
/// URI to connect a shard to core:
|
||||
core_shard_submit_uri: Option<http::Uri>,
|
||||
/// Command to run to start a new shard:
|
||||
shard_command: Option<Command>,
|
||||
/// Shard processes that we can connect to
|
||||
shards: DenseMap<ProcessId, ShardProcess>,
|
||||
/// Core process that we can connect to
|
||||
core: CoreProcess,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn get_core(&self) -> &CoreProcess {
|
||||
&self.core
|
||||
}
|
||||
|
||||
pub fn get_shard(&self, id: ProcessId) -> Option<&ShardProcess> {
|
||||
self.shards.get(id)
|
||||
}
|
||||
|
||||
pub fn iter_shards(&self) -> impl Iterator<Item = &ShardProcess> {
|
||||
self.shards.iter().map(|(_,v)| v)
|
||||
}
|
||||
|
||||
pub async fn kill_shard(&mut self, id: ProcessId) -> bool {
|
||||
let shard = match self.shards.remove(id) {
|
||||
Some(shard) => shard,
|
||||
None => return false
|
||||
};
|
||||
|
||||
// With this, killing will complete even if the promise returned is cancelled
|
||||
// (it should regardless, but just to play it safe..)
|
||||
let _ = tokio::spawn(async move {
|
||||
let _ = shard.kill().await;
|
||||
}).await;
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Kill everything and tidy up
|
||||
pub async fn shutdown(self) {
|
||||
// Spawn so we don't need to await cleanup if we don't care.
|
||||
// Run all kill futs simultaneously.
|
||||
let handle = tokio::spawn(async move {
|
||||
let shard_kill_futs = self.shards
|
||||
.into_iter()
|
||||
.map(|(_,s)| s.kill());
|
||||
|
||||
let _ = tokio::join!(
|
||||
futures::future::join_all(shard_kill_futs),
|
||||
self.core.kill()
|
||||
);
|
||||
});
|
||||
|
||||
// You can wait for cleanup but aren't obliged to:
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
/// Connect a new shard and return a process that you can interact with:
|
||||
pub async fn add_shard(&mut self) -> Result<ProcessId, Error> {
|
||||
let core_uri = match &self.core_shard_submit_uri {
|
||||
Some(uri) => uri,
|
||||
None => return Err(Error::CannotAddShardNoHandle)
|
||||
};
|
||||
|
||||
let mut shard_cmd: TokioCommand = match &self.shard_command {
|
||||
Some(cmd) => cmd.clone(),
|
||||
None => super::default_commands::default_telemetry_shard_command()?
|
||||
}.into();
|
||||
|
||||
shard_cmd
|
||||
.arg("--listen")
|
||||
.arg("127.0.0.1:0") // 0 to have a port picked by the kernel
|
||||
.arg("--log")
|
||||
.arg("info")
|
||||
.arg("--core")
|
||||
.arg(core_uri.to_string())
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stdin(std::process::Stdio::piped());
|
||||
|
||||
let mut shard_process = shard_cmd.spawn()?;
|
||||
let mut child_stdout = shard_process.stdout.take().expect("shard stdout");
|
||||
let shard_port = utils::get_port(&mut child_stdout)
|
||||
.await
|
||||
.map_err(|e| Error::ErrorObtainingPort(e))?;
|
||||
|
||||
// Since we're piping stdout from the child process, we need somewhere for it to go
|
||||
// else the process will get stuck when it tries to produce output:
|
||||
utils::drain(child_stdout, tokio::io::stdout());
|
||||
|
||||
let shard_uri = format!("http://127.0.0.1:{}/submit", shard_port)
|
||||
.parse()
|
||||
.expect("valid submit URI");
|
||||
|
||||
let pid = self.shards.add_with(|id| Process {
|
||||
id,
|
||||
handle: Some(shard_process),
|
||||
uri: shard_uri,
|
||||
_channel_type: PhantomData
|
||||
});
|
||||
|
||||
Ok(pid)
|
||||
}
|
||||
|
||||
/// Start a telemetry_core process with default opts. From here, we can add/remove shards as needed.
|
||||
pub async fn start_default() -> Result<Server, Error> {
|
||||
Server::start(StartOpts::default()).await
|
||||
}
|
||||
|
||||
/// Start a telemetry_core process. From here, we can add/remove shards as needed.
|
||||
pub async fn start(opts: StartOpts) -> Result<Server, Error> {
|
||||
|
||||
let mut core_cmd: TokioCommand = match opts.core_command {
|
||||
Some(cmd) => cmd,
|
||||
None => super::default_commands::default_telemetry_core_command()?
|
||||
}.into();
|
||||
|
||||
let mut child = core_cmd
|
||||
.arg("--listen")
|
||||
.arg("127.0.0.1:0") // 0 to have a port picked by the kernel
|
||||
.arg("--log")
|
||||
.arg("info")
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
// Find out the port that this is running on
|
||||
let mut child_stdout = child.stdout.take().expect("core stdout");
|
||||
let core_port = utils::get_port(&mut child_stdout)
|
||||
.await
|
||||
.map_err(|e| Error::ErrorObtainingPort(e))?;
|
||||
|
||||
// Since we're piping stdout from the child process, we need somewhere for it to go
|
||||
// else the process will get stuck when it tries to produce output:
|
||||
utils::drain(child_stdout, tokio::io::stdout());
|
||||
|
||||
// URI for feeds to connect to the core:
|
||||
let feed_uri = format!("http://127.0.0.1:{}/feed", core_port)
|
||||
.parse()
|
||||
.expect("valid feed URI");
|
||||
|
||||
Ok(Server {
|
||||
shard_command: opts.shard_command,
|
||||
core_shard_submit_uri: Some(format!("http://127.0.0.1:{}/shard_submit", core_port)
|
||||
.parse()
|
||||
.expect("valid shard_submit URI")),
|
||||
shards: DenseMap::new(),
|
||||
core: Process {
|
||||
id: ProcessId(0),
|
||||
handle: Some(child),
|
||||
uri: feed_uri,
|
||||
_channel_type: PhantomData,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Establshes the requested connections to existing processes.
|
||||
pub fn connect_to_existing(opts: ConnectToExistingOpts) -> Server {
|
||||
let mut shards = DenseMap::new();
|
||||
for shard_uri in opts.shard_uris {
|
||||
shards.add_with(|id| Process {
|
||||
id,
|
||||
uri: shard_uri,
|
||||
handle: None,
|
||||
_channel_type: PhantomData,
|
||||
});
|
||||
}
|
||||
|
||||
Server {
|
||||
shard_command: None,
|
||||
// We can't add shards if starting in this mode:
|
||||
core_shard_submit_uri: None,
|
||||
shards,
|
||||
core: Process {
|
||||
id: ProcessId(0),
|
||||
uri: opts.feed_uri,
|
||||
handle: None,
|
||||
_channel_type: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// This represents a running process that we can connect to, which
|
||||
/// may be either a `telemetry_shard` or `telemetry_core`.
|
||||
pub struct Process<Channel> {
|
||||
id: ProcessId,
|
||||
/// If we started the processes ourselves, we'll have a handle to
|
||||
/// them which we can use to kill them. Else, we may not.
|
||||
handle: Option<process::Child>,
|
||||
/// The URI that we can use to connect to the process socket.
|
||||
uri: http::Uri,
|
||||
/// The kind of the process (lets us add methods specific to shard/core).
|
||||
_channel_type: PhantomData<Channel>
|
||||
}
|
||||
|
||||
/// A shard process with shard-specific methods.
|
||||
pub type ShardProcess = Process<(channels::ShardSender, channels::ShardReceiver)>;
|
||||
|
||||
/// A core process with core-specific methods.
|
||||
pub type CoreProcess = Process<(channels::FeedSender, channels::FeedReceiver)>;
|
||||
|
||||
impl <Channel> Process<Channel> {
|
||||
/// Get the ID of this process
|
||||
pub fn id(&self) -> ProcessId {
|
||||
self.id
|
||||
}
|
||||
|
||||
/// Kill the process and wait for this to complete
|
||||
/// Not public: Klling done via Server.
|
||||
async fn kill(self) -> Result<(), Error> {
|
||||
match self.handle {
|
||||
Some(mut handle) => Ok(handle.kill().await?),
|
||||
None => Err(Error::CannotKillNoHandle)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl <Send: From<ws_client::Sender>, Recv: From<ws_client::Receiver>> Process<(Send, Recv)> {
|
||||
/// Establish a connection to the process
|
||||
pub async fn connect(&self) -> Result<(Send, Recv), Error> {
|
||||
ws_client::connect(&self.uri)
|
||||
.await
|
||||
.map(|(s,r)| (s.into(), r.into()))
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
/// Establish multiple connections to the process
|
||||
pub async fn connect_multiple(&self, num_connections: usize) -> Result<Vec<(Send, Recv)>, Error> {
|
||||
utils::connect_multiple_to_uri(&self.uri, num_connections)
|
||||
.await
|
||||
.map(|v| v.into_iter().map(|(s,r)| (s.into(), r.into())).collect())
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// This defines a command to run. This exists because [`tokio::process::Command`]
|
||||
/// cannot be cloned, but we need to be able to clone our command to spawn multiple
|
||||
/// processes with it.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Command {
|
||||
command: OsString,
|
||||
args: Vec<OsString>
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub fn new<S: Into<OsString>>(command: S) -> Command {
|
||||
Command {
|
||||
command: command.into(),
|
||||
args: Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn arg<S: Into<OsString>>(mut self, arg: S) -> Command {
|
||||
self.args.push(arg.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<TokioCommand> for Command {
|
||||
fn into(self) -> TokioCommand {
|
||||
let mut cmd = TokioCommand::new(self.command);
|
||||
cmd.args(self.args);
|
||||
cmd
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
use crate::ws_client;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{ AsyncRead, AsyncWrite, AsyncBufReadExt };
|
||||
use tokio::time::Duration;
|
||||
use anyhow::{ anyhow, Context };
|
||||
|
||||
/// Reads from the stdout of the shard/core process to extract the port that was assigned to it,
|
||||
/// with the side benefit that we'll wait for it to start listening before returning. We do this
|
||||
/// because we want to allow the kernel to assign ports and so don't specify a port as an arg.
|
||||
pub async fn get_port<R: AsyncRead + Unpin>(reader: R) -> Result<u16, anyhow::Error> {
|
||||
let reader = BufReader::new(reader);
|
||||
let mut reader_lines = reader.lines();
|
||||
|
||||
loop {
|
||||
let line = tokio::time::timeout(
|
||||
// This has to accomodate pauses during compilation if the cmd is "cargo run --":
|
||||
Duration::from_secs(30),
|
||||
reader_lines.next_line()
|
||||
).await;
|
||||
|
||||
let line = match line {
|
||||
// timeout expired; couldn't get port:
|
||||
Err(e) => return Err(anyhow!("Timeout expired waiting to discover port: {}", e)),
|
||||
// Something went wrong reading line; bail:
|
||||
Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)),
|
||||
// No more output; process ended? bail:
|
||||
Ok(Ok(None)) => return Err(anyhow!("No more output from stdout; has the process ended?")),
|
||||
// All OK, and a line is given back; phew!
|
||||
Ok(Ok(Some(line))) => line
|
||||
};
|
||||
|
||||
let (_, port_str) = match line.rsplit_once("listening on http://127.0.0.1:") {
|
||||
Some(m) => m,
|
||||
None => continue
|
||||
};
|
||||
|
||||
return port_str
|
||||
.trim()
|
||||
.parse()
|
||||
.with_context(|| format!("Could not parse output to port: {}", port_str));
|
||||
}
|
||||
}
|
||||
|
||||
/// Establish multiple connections to a URI and return them all.
|
||||
pub async fn connect_multiple_to_uri(uri: &http::Uri, num_connections: usize) -> Result<Vec<(ws_client::Sender, ws_client::Receiver)>, ws_client::ConnectError> {
|
||||
let connect_futs = (0..num_connections)
|
||||
.map(|_| ws_client::connect(uri));
|
||||
let sockets: Result<Vec<_>,_> = futures::future::join_all(connect_futs)
|
||||
.await
|
||||
.into_iter()
|
||||
.collect();
|
||||
sockets
|
||||
}
|
||||
|
||||
/// Drain output from a reader to stdout. After acquiring port details from spawned processes,
|
||||
/// they expect their stdout to be continue to be consumed, and so we do this here.
|
||||
pub fn drain<R, W>(mut reader: R, mut writer: W)
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
W: AsyncWrite + Unpin + Send + 'static
|
||||
{
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::io::copy(&mut reader, &mut writer).await;
|
||||
});
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
use tokio::process::Command;
|
||||
use crate::connect_to_servers::{ Server, StartProcessOpts, Process };
|
||||
|
||||
pub struct Runner {
|
||||
shard_command: Option<Command>,
|
||||
core_command: Option<Command>
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
pub fn new() -> Runner {
|
||||
Runner {
|
||||
shard_command: None,
|
||||
core_command: None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shard_command(mut self, cmd: Command) -> Self {
|
||||
self.shard_command = Some(cmd);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn core_command(mut self, cmd: Command) -> Self {
|
||||
self.core_command = Some(cmd);
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn build(self) -> Result<Processes, anyhow::Error> {
|
||||
let mut server = Server::start_processes(StartProcessOpts {
|
||||
shard_command: self.shard_command,
|
||||
num_shards: 1,
|
||||
core_command: self.core_command,
|
||||
}).await?;
|
||||
|
||||
let core_process = server.core;
|
||||
let shard_process = server.shards.remove(0);
|
||||
|
||||
Ok(Processes {
|
||||
core_process,
|
||||
shard_process,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A representation of the running processes that we can connect and send messages to.
|
||||
pub struct Processes {
|
||||
shard_process: Process,
|
||||
core_process: Process,
|
||||
}
|
||||
|
||||
impl Processes {
|
||||
pub async fn cleanup(self) {
|
||||
let handle = tokio::spawn(async move {
|
||||
let _ = tokio::join!(
|
||||
self.shard_process.kill(),
|
||||
self.core_process.kill()
|
||||
);
|
||||
});
|
||||
|
||||
// You can wait for cleanup but aren't obliged to:
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
@@ -47,7 +47,9 @@ pub struct Receiver {
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RecvError {
|
||||
#[error("Text message contains invalid UTF8: {0}")]
|
||||
InvalidUtf8(#[from] std::string::FromUtf8Error)
|
||||
InvalidUtf8(#[from] std::string::FromUtf8Error),
|
||||
#[error("Stream finished")]
|
||||
StreamFinished
|
||||
}
|
||||
|
||||
impl Stream for Receiver {
|
||||
|
||||
Reference in New Issue
Block a user