mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-13 01:01:02 +00:00
Begin writing the core utils that we'll use for testing things
This commit is contained in:
Generated
+15
-2
@@ -1437,6 +1437,19 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "test_utils"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"http",
|
||||
"log",
|
||||
"soketto",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "textwrap"
|
||||
version = "0.11.0"
|
||||
@@ -1493,9 +1506,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.7.0"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c79ba603c337335df6ba6dd6afc38c38a7d5e1b0c871678439ea973cd62a118e"
|
||||
checksum = "570c2eb13b3ab38208130eccd41be92520388791207fde783bda7c1e8ace28d4"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bytes",
|
||||
|
||||
+2
-1
@@ -2,7 +2,8 @@
|
||||
members = [
|
||||
"common",
|
||||
"telemetry",
|
||||
"shard"
|
||||
"shard",
|
||||
"test_utils"
|
||||
]
|
||||
|
||||
[profile.dev]
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "test_utils"
|
||||
version = "0.1.0"
|
||||
authors = ["James Wilson <james@jsdw.me>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.15"
|
||||
http = "0.2.4"
|
||||
log = "0.4.14"
|
||||
soketto = "0.6.0"
|
||||
thiserror = "1.0.25"
|
||||
tokio = { version = "1.7.1", features = ["full"] }
|
||||
tokio-util = "0.6.7"
|
||||
@@ -0,0 +1,21 @@
|
||||
use crate::ws_client::{ Sender, Receiver };
|
||||
|
||||
/// We either say where to conenct to, or we start the binaries
|
||||
/// ourselves. Either way, we hand back a `Connection` object
|
||||
/// which allows us to talk to the running instances.
|
||||
pub enum Opts {
|
||||
StartProcesses {
|
||||
shard_command: Option<String>,
|
||||
num_shards: usize,
|
||||
telemetry_command: Option<String>
|
||||
},
|
||||
ConnectToExisting {
|
||||
shard_uris: Vec<http::Uri>,
|
||||
telemetry_uri: http::Uri
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Connection {
|
||||
shard_sockets: Vec<(Sender, Receiver)>,
|
||||
telemetry_socket: Vec<(Sender, Receiver)>
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
/// A wrapper around soketto to simplify the process of establishing connections
|
||||
mod ws_client;
|
||||
mod connect_to_servers;
|
||||
@@ -0,0 +1,191 @@
|
||||
use futures::channel::{ mpsc };
|
||||
use soketto::handshake::{Client, ServerResponse};
|
||||
use tokio_util::compat::{ TokioAsyncReadCompatExt };
|
||||
use tokio::net::TcpStream;
|
||||
use futures::{Sink, SinkExt, Stream, StreamExt};
|
||||
|
||||
/// Send messages into the connection
|
||||
#[derive(Clone)]
|
||||
pub struct Sender {
|
||||
inner: mpsc::UnboundedSender<SentMessage>
|
||||
}
|
||||
|
||||
impl Sender {
|
||||
pub async fn close(&mut self) -> Result<(),SendError> {
|
||||
self.inner.send(SentMessage::Close).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub enum SendError {
|
||||
#[error("Failed to send message: {0}")]
|
||||
ChannelError(#[from] mpsc::SendError)
|
||||
}
|
||||
|
||||
impl Sink<Message> for Sender {
|
||||
type Error = SendError;
|
||||
fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
fn start_send(mut self: std::pin::Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
||||
self.inner.start_send_unpin(SentMessage::Message(item)).map_err(|e| e.into())
|
||||
}
|
||||
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_flush_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_close_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive messages out of a connection
|
||||
pub struct Receiver {
|
||||
inner: mpsc::UnboundedReceiver<Result<Message,RecvError>>
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RecvError {
|
||||
#[error("Text message contains invalid UTF8: {0}")]
|
||||
InvalidUtf8(#[from] std::string::FromUtf8Error)
|
||||
}
|
||||
|
||||
impl Stream for Receiver {
|
||||
type Item = Result<Message, RecvError>;
|
||||
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.inner.poll_next_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// A message type that can be sent or received from the connection
|
||||
pub enum Message {
|
||||
Text(String),
|
||||
Binary(Vec<u8>)
|
||||
}
|
||||
|
||||
/// Sent messages can be anything publically visible, or a close message.
|
||||
enum SentMessage {
|
||||
Message(Message),
|
||||
Close
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ConnectError {
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("Handshake error: {0}")]
|
||||
Handshake(#[from] soketto::handshake::Error),
|
||||
#[error("Redirect not supported (status code: {status_code})")]
|
||||
ConnectionFailedRedirect { status_code: u16 },
|
||||
#[error("Connection rejected (status code: {status_code})")]
|
||||
ConnectionFailedRejected { status_code: u16 },
|
||||
}
|
||||
|
||||
/// Establish a websocket connection that you can send and receive messages from.
|
||||
/// A thin wrapper around Soketto that provides cancel-safe send/receive handles.
|
||||
///
|
||||
/// This must be called within the context of a tokio runtime.
|
||||
pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError> {
|
||||
let host = uri.host().unwrap_or("127.0.0.1");
|
||||
let port = uri.port_u16().unwrap_or(80);
|
||||
let path = uri.path();
|
||||
|
||||
let socket = TcpStream::connect((host, port)).await?;
|
||||
socket.set_nodelay(true).unwrap();
|
||||
|
||||
// Establish a WS connection:
|
||||
let mut client = Client::new(socket.compat(), host, &path);
|
||||
let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? {
|
||||
ServerResponse::Accepted { .. } => {
|
||||
client.into_builder().finish()
|
||||
},
|
||||
ServerResponse::Redirect { status_code, .. } => {
|
||||
return Err(ConnectError::ConnectionFailedRedirect { status_code })
|
||||
},
|
||||
ServerResponse::Rejected { status_code } => {
|
||||
return Err(ConnectError::ConnectionFailedRejected { status_code })
|
||||
}
|
||||
};
|
||||
|
||||
// Soketto sending/receiving isn't cancel safe, so we wrap the message stuff into spawned
|
||||
// tasks and use channels (which are cancel safe) to send/recv messages atomically..
|
||||
|
||||
// Receive messages from the socket and post them out:
|
||||
let (mut tx_to_external, rx_from_ws) = mpsc::unbounded();
|
||||
tokio::spawn(async move {
|
||||
let mut data = Vec::with_capacity(128);
|
||||
loop {
|
||||
// Clear the buffer and wait for the next message to arrive:
|
||||
data.clear();
|
||||
|
||||
let message_data = match ws_from_connection.receive_data(&mut data).await {
|
||||
Err(e) => {
|
||||
// Couldn't receive data may mean all senders are gone, so log
|
||||
// the error and shut this down:
|
||||
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
|
||||
break;
|
||||
},
|
||||
Ok(data) => {
|
||||
data
|
||||
}
|
||||
};
|
||||
|
||||
let msg = match message_data {
|
||||
soketto::Data::Text(_) => {
|
||||
Ok(Message::Binary(data))
|
||||
},
|
||||
soketto::Data::Binary(_) => {
|
||||
String::from_utf8(data)
|
||||
.map(|s| Message::Text(s))
|
||||
.map_err(|e| e.into())
|
||||
},
|
||||
};
|
||||
|
||||
data = Vec::with_capacity(128);
|
||||
|
||||
if let Err(e) = tx_to_external.send(msg).await {
|
||||
// Failure to send likely means that the recv has been dropped,
|
||||
// so let's drop this loop too.
|
||||
log::error!("Shutting down websocket connection: Failed to send data out: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Receive messages externally to send to the socket.
|
||||
let (tx_to_ws, mut rx_from_external) = mpsc::unbounded();
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = rx_from_external.next().await {
|
||||
match msg {
|
||||
SentMessage::Message(Message::Text(s)) => {
|
||||
if let Err(e) = ws_to_connection.send_text_owned(s).await {
|
||||
log::error!("Shutting down websocket connection: Failed to send text data: {}", e);
|
||||
break;
|
||||
}
|
||||
},
|
||||
SentMessage::Message(Message::Binary(bytes)) => {
|
||||
if let Err(e) = ws_to_connection.send_binary_mut(bytes).await {
|
||||
log::error!("Shutting down websocket connection: Failed to send binary data: {}", e);
|
||||
break;
|
||||
}
|
||||
},
|
||||
SentMessage::Close => {
|
||||
if let Err(e) = ws_to_connection.close().await {
|
||||
log::error!("Error attempting to close connection: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = ws_to_connection.flush().await {
|
||||
log::error!("Shutting down websocket connection: Failed to flush data: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok((
|
||||
Sender { inner: tx_to_ws },
|
||||
Receiver { inner: rx_from_ws }
|
||||
))
|
||||
}
|
||||
Reference in New Issue
Block a user