diff --git a/backend/Cargo.lock b/backend/Cargo.lock index a048be6..636f2e9 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -209,14 +209,19 @@ dependencies = [ "bincode", "bytes", "fnv", + "futures", "hex", + "http", "log", "num-traits", "primitive-types", "rustc-hash", "serde", "serde_json", + "soketto", "thiserror", + "tokio", + "tokio-util", ] [[package]] @@ -1677,6 +1682,7 @@ dependencies = [ "anyhow", "bimap", "bincode", + "bytes", "common", "criterion", "futures", @@ -1821,9 +1827,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.8.0" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "570c2eb13b3ab38208130eccd41be92520388791207fde783bda7c1e8ace28d4" +checksum = "c2602b8af3767c285202012822834005f596c811042315fa7e9f5b12b2a43207" dependencies = [ "autocfg", "bytes", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index c197214..518633a 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -12,4 +12,7 @@ opt-level = 3 [profile.release] lto = true panic = "abort" -# debug = true \ No newline at end of file +## Enabling these seems necessary to get +## good debug info in Instruments: +# debug = true +# codegen-units = 1 \ No newline at end of file diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml index 4647890..635483f 100644 --- a/backend/common/Cargo.toml +++ b/backend/common/Cargo.toml @@ -9,14 +9,19 @@ license = "GPL-3.0" bimap = "0.6.1" bytes = "1.0.1" fnv = "1.0.7" +futures = "0.3.15" hex = "0.4.3" +http = "0.2.4" log = "0.4" num-traits = "0.2" primitive-types = { version = "0.9.0", features = ["serde"] } rustc-hash = "1.1.0" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["raw_value"] } +soketto = "0.6.0" thiserror = "1.0.24" +tokio = { version = "1.8.2", features = ["full"] } +tokio-util = { version = "0.6", features = ["compat"] } [dev-dependencies] bincode = "1.3.3" diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 94f2f24..97d597e 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -3,6 +3,7 @@ pub mod internal_messages; pub mod node_message; pub mod node_types; pub mod time; +pub mod ws_client; mod assign_id; mod dense_map; diff --git a/backend/test_utils/src/ws_client.rs b/backend/common/src/ws_client.rs similarity index 100% rename from backend/test_utils/src/ws_client.rs rename to backend/common/src/ws_client.rs diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index a8102ce..c751052 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -9,6 +9,7 @@ license = "GPL-3.0" anyhow = "1.0.41" bimap = "0.6.1" bincode = "1.3.3" +bytes = "1.0.1" common = { path = "../common" } futures = "0.3.15" hex = "0.4.3" diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index b3b7786..da20e2a 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -105,7 +105,7 @@ impl FromStr for FromFeedWebsocket { /// The aggregator can these messages back to a feed connection. #[derive(Clone, Debug)] pub enum ToFeedWebsocket { - Bytes(Vec), + Bytes(bytes::Bytes), } /// Instances of this are responsible for handling incoming and diff --git a/backend/telemetry_core/src/feed_message.rs b/backend/telemetry_core/src/feed_message.rs index d6986ca..e0e108f 100644 --- a/backend/telemetry_core/src/feed_message.rs +++ b/backend/telemetry_core/src/feed_message.rs @@ -68,7 +68,7 @@ impl FeedMessageSerializer { /// Return the bytes we've serialized so far and prepare a new buffer. If you're /// finished serializing data, prefer [`FeedMessageSerializer::into_finalized`] - pub fn finalize(&mut self) -> Option> { + pub fn finalize(&mut self) -> Option { if self.buffer.is_empty() { return None; } @@ -77,17 +77,17 @@ impl FeedMessageSerializer { let bytes = mem::replace(&mut self.buffer, Vec::with_capacity(BUFCAP)); - Some(bytes) + Some(bytes.into()) } /// Return the bytes that we've serialized so far, consuming the serializer. - pub fn into_finalized(mut self) -> Option> { + pub fn into_finalized(mut self) -> Option { if self.buffer.is_empty() { return None; } self.buffer.push(b']'); - Some(self.buffer) + Some(self.buffer.into()) } } diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index bf45160..15c1b50 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -254,9 +254,7 @@ where ToFeedWebsocket::Bytes(bytes) => bytes }; - log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8")); - - if let Err(e) = websocket.send(ws::Message::binary(bytes)).await { + if let Err(e) = websocket.send(ws::Message::binary(&*bytes)).await { log::warn!("Closing feed websocket due to error: {}", e); break; } diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 6b05248..e832bec 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -21,7 +21,7 @@ box; MacOS seems to hit limits quicker in general. use futures::{ StreamExt }; use structopt::StructOpt; use test_utils::workspace::start_server_release; -use test_utils::ws_client::{ SentMessage }; +use common::ws_client::{ SentMessage }; use serde_json::json; use std::time::Duration; use std::sync::atomic::{ Ordering, AtomicUsize }; diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index e6731f9..0f0e04c 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -5,7 +5,7 @@ use common::{ node_types::BlockHash, AssignId, }; -use futures::{channel::mpsc, future}; +use futures::{channel::mpsc}; use futures::{Sink, SinkExt, StreamExt}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU64; @@ -86,26 +86,33 @@ impl Aggregator { pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result { let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); - // Map responses from our connection into messages that will be sent to the aggregator: - let tx_from_connection = tx_to_aggregator.clone().with(|msg| { - future::ok::<_, mpsc::SendError>(match msg { - Message::Connected => ToAggregator::ConnectedToTelemetryCore, - Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, - Message::Data(data) => ToAggregator::FromTelemetryCore(data), - }) + // Establish a resiliant connection to the core (this retries as needed): + let (tx_to_telemetry_core, mut rx_from_telemetry_core) = + create_ws_connection_to_core(telemetry_uri).await; + + // Forward messages from the telemetry core into the aggregator: + let mut tx_to_aggregator2 = tx_to_aggregator.clone(); + tokio::spawn(async move { + while let Some(msg) = rx_from_telemetry_core.next().await { + let msg_to_aggregator = match msg { + Message::Connected => ToAggregator::ConnectedToTelemetryCore, + Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, + Message::Data(data) => ToAggregator::FromTelemetryCore(data), + }; + if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await { + // This will close the ws channels, which themselves log messages. + break + } + } }); - // Establish a resiliant connection to the core (this retries as needed): - let tx_to_telemetry_core = - create_ws_connection_to_core(tx_from_connection, telemetry_uri).await; - - // Handle any incoming messages in our handler loop: + // Start our aggregator loop, handling any incoming messages: tokio::spawn(Aggregator::handle_messages( rx_from_external, tx_to_telemetry_core, )); - // Return a handle to our aggregator: + // Return a handle to our aggregator so that we can send in messages to it: Ok(Aggregator(Arc::new(AggregatorInternal { conn_id: AtomicU64::new(1), tx_to_aggregator, diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs index 62e7aa6..edece48 100644 --- a/backend/telemetry_shard/src/connection.rs +++ b/backend/telemetry_shard/src/connection.rs @@ -1,7 +1,7 @@ use futures::channel::mpsc; -use futures::{Sink, SinkExt, StreamExt}; -use tokio::net::TcpStream; -use tokio_util::compat::TokioAsyncReadCompatExt; +use futures::{SinkExt, StreamExt}; +use common::ws_client; +use bincode::Options; #[derive(Clone, Debug)] pub enum Message { @@ -18,181 +18,112 @@ pub enum Message { /// /// Note: have a look at [`common::internal_messages`] to see the different message types exchanged /// between aggregator and core. -pub async fn create_ws_connection_to_core( - mut tx_to_external: S, +pub async fn create_ws_connection_to_core( telemetry_uri: http::Uri, -) -> mpsc::Sender +) -> (mpsc::Sender, mpsc::Receiver>) where - S: Sink, Error = E> + Unpin + Send + Clone + 'static, - E: std::fmt::Debug + std::fmt::Display + Send + 'static, In: serde::Serialize + Send + 'static, Out: serde::de::DeserializeOwned + Send + 'static, { - // Set up a proxy channel to relay messages to the telemetry core, and return one end of it. - // Once a connection to the backend is established, we pass messages along to it. If the connection - // fails, we - let (tx_to_connection_proxy, mut rx_from_external_proxy) = mpsc::channel(10); - tokio::spawn(async move { - let mut connected = false; + let (tx_in, mut rx_in) = mpsc::channel(10); + let (mut tx_out, rx_out) = mpsc::channel(10); + let mut is_connected = false; + + tokio::spawn(async move { loop { // Throw away any pending messages from the incoming channel so that it // doesn't get filled up and begin blocking while we're looping and waiting // for a reconnection. - while let Ok(Some(_)) = rx_from_external_proxy.try_next() {} + while let Ok(Some(_)) = rx_in.try_next() {} - // The connection will pass messages back to this. - let tx_from_connection = tx_to_external.clone(); + // Try to connect. If connection established, we serialize and forward messages + // to/from the core. If the external channels break, we end for good. If the internal + // channels break, we loop around and try connecting again. + match ws_client::connect(&telemetry_uri).await { + Ok((tx_to_core, mut rx_from_core)) => { + is_connected = true; + let mut tx_out = tx_out.clone(); - // Attempt to reconnect. - match create_ws_connection_no_retry(tx_from_connection, telemetry_uri.clone()).await { - Ok(mut tx_to_connection) => { - connected = true; - - // Inform the handler loop that we've reconnected. - tx_to_external - .send(Message::Connected) - .await - .expect("must be able to send reconnect msg"); - - // Start forwarding messages on to the backend. - while let Some(msg) = rx_from_external_proxy.next().await { - if let Err(e) = tx_to_connection.send(msg).await { - // Issue forwarding a message to the telemetry core? - // Give up and try to reconnect on the next outer loop iteration. - log::error!( - "Error sending message to websocker server (will reconnect): {}", - e - ); - break; - } + if let Err(e) = tx_out.send(Message::Connected).await { + // If receiving end is closed, bail now. + log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); + return } - } - Err(e) => { + + // Loop, forwarding messages to and from the core until something goes wrong. + loop { + tokio::select! { + msg = rx_from_core.next() => { + let msg = match msg { + Some(msg) => msg, + // No more messages from core? core WS is disconnected. + None => { + log::warn!("No more messages from core: shutting down connection (will reconnect)"); + break + } + }; + + let bytes = match msg { + Ok(ws_client::RecvMessage::Binary(bytes)) => bytes, + Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(), + Err(e) => { + log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e); + break; + } + }; + let msg = bincode::options() + .deserialize(&bytes) + .expect("internal messages must be deserializable"); + + if let Err(e) = tx_out.send(Message::Data(msg)).await { + log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); + return; + } + }, + msg = rx_in.next() => { + let msg = match msg { + Some(msg) => msg, + None => { + log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)"); + return + } + }; + + let bytes = bincode::options() + .serialize(&msg) + .expect("internal messages must be serializable"); + let ws_msg = ws_client::SentMessage::Binary(bytes); + + if let Err(e) = tx_to_core.unbounded_send(ws_msg) { + log::warn!("Unable to send message to core; shutting down connection (will reconnect): {}", e); + break; + } + } + }; + } + }, + Err(connect_err) => { // Issue connecting? Wait and try again on the next loop iteration. log::error!( "Error connecting to websocker server (will reconnect): {}", - e + connect_err ); } - }; - - // Tell the aggregator that we're disconnected so that, if we like, we can discard - // messages without doing any futher processing on them. - if connected { - connected = false; - let _ = tx_to_external.send(Message::Disconnected).await; } - // Wait a little before trying to reconnect. + if is_connected { + is_connected = false; + if let Err(e) = tx_out.send(Message::Disconnected).await { + log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); + return; + } + } + + // Wait a little before we try to connect again. tokio::time::sleep(std::time::Duration::from_secs(1)).await; } }); - tx_to_connection_proxy -} - -/// This spawns a connection to a websocket server, serializing/deserialziing -/// to/from bincode as messages are sent or received. -async fn create_ws_connection_no_retry( - mut tx_to_external: S, - telemetry_uri: http::Uri, -) -> anyhow::Result> -where - S: Sink, Error = E> + Unpin + Send + 'static, - E: std::fmt::Debug + std::fmt::Display, - In: serde::Serialize + Send + 'static, - Out: serde::de::DeserializeOwned + Send + 'static, -{ - use bincode::Options; - use soketto::handshake::{Client, ServerResponse}; - - let host = telemetry_uri.host().unwrap_or("127.0.0.1"); - let port = telemetry_uri.port_u16().unwrap_or(8000); - let path = telemetry_uri.path(); - - let socket = TcpStream::connect((host, port)).await?; - socket.set_nodelay(true).expect("socket set_nodelay failed"); - - // Open a websocket connection with the telemetry core: - let mut client = Client::new(socket.compat(), host, &path); - let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? { - ServerResponse::Accepted { .. } => client.into_builder().finish(), - ServerResponse::Redirect { status_code, .. } | ServerResponse::Rejected { status_code } => { - return Err(anyhow::anyhow!( - "Failed to connect to {}{}, status code: {}", - host, - path, - status_code - )); - } - }; - - // This task reads data sent from the telemetry core and - // forwards it to our aggregator loop: - tokio::spawn(async move { - loop { - let mut data = Vec::new(); - if let Err(e) = ws_from_connection.receive_data(&mut data).await { - // Couldn't receive data may mean all senders are gone, so log - // the error and shut this down: - log::error!( - "Shutting down websocket connection: Failed to receive data: {}", - e - ); - return; - } - - // Attempt to deserialize, and send to our handler loop: - match bincode::options().deserialize(&data) { - Ok(msg) => { - if let Err(e) = tx_to_external.send(Message::Data(msg)).await { - // Failure to send to our loop likely means it's hit an - // issue and shut down, so bail on this loop as well: - log::error!( - "Shutting down websocket connection: Failed to send data out: {}", - e - ); - return; - } - } - Err(err) => { - // Log the error but otherwise ignore it and keep running: - log::warn!("Failed to decode message from Backend Core: {:?}", err); - } - } - } - }); - - // This task receives messages from the aggregator, - // encodes them and sends them to the telemetry core: - let (tx_to_connection, mut rx_from_aggregator) = mpsc::channel(10); - tokio::spawn(async move { - while let Some(msg) = rx_from_aggregator.next().await { - let bytes = bincode::options() - .serialize(&msg) - .expect("must be able to serialize msg"); - - // Any errors sending the message leads to this task ending, which should cascade to - // the entire connection being ended. - if let Err(e) = ws_to_connection.send_binary_mut(bytes).await { - log::error!( - "Shutting down websocket connection: Failed to send data in: {}", - e - ); - return; - } - if let Err(e) = ws_to_connection.flush().await { - log::error!( - "Shutting down websocket connection: Failed to flush data: {}", - e - ); - return; - } - } - }); - - // We return a channel that you can send messages down in order to have - // them sent to the telemetry core: - Ok(tx_to_connection) -} + (tx_in, rx_out) +} \ No newline at end of file diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index 5004ef1..445406e 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -5,10 +5,6 @@ pub mod server; /// is the slightly-lossy inverse of the custom serialization we do to feed messages. pub mod feed_message_de; -/// A wrapper around soketto to simplify the process of establishing connections -/// and sending messages. Provides cancel-safe message channels. -pub mod ws_client; - /// A couple of macros to make it easier to test for the presense of things (mainly, feed messages) /// in an iterable container. #[macro_use] diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index 2490544..2480987 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -1,7 +1,7 @@ use std::{ops::{Deref, DerefMut}, time::Duration}; use crate::feed_message_de::FeedMessage; -use crate::ws_client; +use common::ws_client; use futures::{Sink, SinkExt, Stream, StreamExt}; /// Wrap a `ws_client::Sender` with convenient utility methods for shard connections diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index 0ef9d4a..97d94eb 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -1,5 +1,5 @@ use super::{channels, utils}; -use crate::ws_client; +use common::ws_client; use common::{id_type, DenseMap}; use std::ffi::OsString; use std::marker::PhantomData; diff --git a/backend/test_utils/src/server/utils.rs b/backend/test_utils/src/server/utils.rs index 8de17d9..522d490 100644 --- a/backend/test_utils/src/server/utils.rs +++ b/backend/test_utils/src/server/utils.rs @@ -1,4 +1,4 @@ -use crate::ws_client; +use common::ws_client; use anyhow::{anyhow, Context}; use tokio::io::BufReader; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite};