use ws_client in shard, too, and make it better at handling core disconnecting

This commit is contained in:
James Wilson
2021-07-20 13:04:10 +01:00
parent 36c4e7b0ef
commit cca1df2e21
16 changed files with 138 additions and 190 deletions
+8 -2
View File
@@ -209,14 +209,19 @@ dependencies = [
"bincode",
"bytes",
"fnv",
"futures",
"hex",
"http",
"log",
"num-traits",
"primitive-types",
"rustc-hash",
"serde",
"serde_json",
"soketto",
"thiserror",
"tokio",
"tokio-util",
]
[[package]]
@@ -1677,6 +1682,7 @@ dependencies = [
"anyhow",
"bimap",
"bincode",
"bytes",
"common",
"criterion",
"futures",
@@ -1821,9 +1827,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.8.0"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "570c2eb13b3ab38208130eccd41be92520388791207fde783bda7c1e8ace28d4"
checksum = "c2602b8af3767c285202012822834005f596c811042315fa7e9f5b12b2a43207"
dependencies = [
"autocfg",
"bytes",
+4 -1
View File
@@ -12,4 +12,7 @@ opt-level = 3
[profile.release]
lto = true
panic = "abort"
# debug = true
## Enabling these seems necessary to get
## good debug info in Instruments:
# debug = true
# codegen-units = 1
+5
View File
@@ -9,14 +9,19 @@ license = "GPL-3.0"
bimap = "0.6.1"
bytes = "1.0.1"
fnv = "1.0.7"
futures = "0.3.15"
hex = "0.4.3"
http = "0.2.4"
log = "0.4"
num-traits = "0.2"
primitive-types = { version = "0.9.0", features = ["serde"] }
rustc-hash = "1.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] }
soketto = "0.6.0"
thiserror = "1.0.24"
tokio = { version = "1.8.2", features = ["full"] }
tokio-util = { version = "0.6", features = ["compat"] }
[dev-dependencies]
bincode = "1.3.3"
+1
View File
@@ -3,6 +3,7 @@ pub mod internal_messages;
pub mod node_message;
pub mod node_types;
pub mod time;
pub mod ws_client;
mod assign_id;
mod dense_map;
+1
View File
@@ -9,6 +9,7 @@ license = "GPL-3.0"
anyhow = "1.0.41"
bimap = "0.6.1"
bincode = "1.3.3"
bytes = "1.0.1"
common = { path = "../common" }
futures = "0.3.15"
hex = "0.4.3"
@@ -105,7 +105,7 @@ impl FromStr for FromFeedWebsocket {
/// The aggregator can these messages back to a feed connection.
#[derive(Clone, Debug)]
pub enum ToFeedWebsocket {
Bytes(Vec<u8>),
Bytes(bytes::Bytes),
}
/// Instances of this are responsible for handling incoming and
+4 -4
View File
@@ -68,7 +68,7 @@ impl FeedMessageSerializer {
/// Return the bytes we've serialized so far and prepare a new buffer. If you're
/// finished serializing data, prefer [`FeedMessageSerializer::into_finalized`]
pub fn finalize(&mut self) -> Option<Vec<u8>> {
pub fn finalize(&mut self) -> Option<bytes::Bytes> {
if self.buffer.is_empty() {
return None;
}
@@ -77,17 +77,17 @@ impl FeedMessageSerializer {
let bytes = mem::replace(&mut self.buffer, Vec::with_capacity(BUFCAP));
Some(bytes)
Some(bytes.into())
}
/// Return the bytes that we've serialized so far, consuming the serializer.
pub fn into_finalized(mut self) -> Option<Vec<u8>> {
pub fn into_finalized(mut self) -> Option<bytes::Bytes> {
if self.buffer.is_empty() {
return None;
}
self.buffer.push(b']');
Some(self.buffer)
Some(self.buffer.into())
}
}
+1 -3
View File
@@ -254,9 +254,7 @@ where
ToFeedWebsocket::Bytes(bytes) => bytes
};
log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8"));
if let Err(e) = websocket.send(ws::Message::binary(bytes)).await {
if let Err(e) = websocket.send(ws::Message::binary(&*bytes)).await {
log::warn!("Closing feed websocket due to error: {}", e);
break;
}
+1 -1
View File
@@ -21,7 +21,7 @@ box; MacOS seems to hit limits quicker in general.
use futures::{ StreamExt };
use structopt::StructOpt;
use test_utils::workspace::start_server_release;
use test_utils::ws_client::{ SentMessage };
use common::ws_client::{ SentMessage };
use serde_json::json;
use std::time::Duration;
use std::sync::atomic::{ Ordering, AtomicUsize };
+21 -14
View File
@@ -5,7 +5,7 @@ use common::{
node_types::BlockHash,
AssignId,
};
use futures::{channel::mpsc, future};
use futures::{channel::mpsc};
use futures::{Sink, SinkExt, StreamExt};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
@@ -86,26 +86,33 @@ impl Aggregator {
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
// Map responses from our connection into messages that will be sent to the aggregator:
let tx_from_connection = tx_to_aggregator.clone().with(|msg| {
future::ok::<_, mpsc::SendError>(match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
})
// Establish a resiliant connection to the core (this retries as needed):
let (tx_to_telemetry_core, mut rx_from_telemetry_core) =
create_ws_connection_to_core(telemetry_uri).await;
// Forward messages from the telemetry core into the aggregator:
let mut tx_to_aggregator2 = tx_to_aggregator.clone();
tokio::spawn(async move {
while let Some(msg) = rx_from_telemetry_core.next().await {
let msg_to_aggregator = match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
};
if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await {
// This will close the ws channels, which themselves log messages.
break
}
}
});
// Establish a resiliant connection to the core (this retries as needed):
let tx_to_telemetry_core =
create_ws_connection_to_core(tx_from_connection, telemetry_uri).await;
// Handle any incoming messages in our handler loop:
// Start our aggregator loop, handling any incoming messages:
tokio::spawn(Aggregator::handle_messages(
rx_from_external,
tx_to_telemetry_core,
));
// Return a handle to our aggregator:
// Return a handle to our aggregator so that we can send in messages to it:
Ok(Aggregator(Arc::new(AggregatorInternal {
conn_id: AtomicU64::new(1),
tx_to_aggregator,
+88 -157
View File
@@ -1,7 +1,7 @@
use futures::channel::mpsc;
use futures::{Sink, SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use futures::{SinkExt, StreamExt};
use common::ws_client;
use bincode::Options;
#[derive(Clone, Debug)]
pub enum Message<Out> {
@@ -18,181 +18,112 @@ pub enum Message<Out> {
///
/// Note: have a look at [`common::internal_messages`] to see the different message types exchanged
/// between aggregator and core.
pub async fn create_ws_connection_to_core<In, Out, S, E>(
mut tx_to_external: S,
pub async fn create_ws_connection_to_core<In, Out>(
telemetry_uri: http::Uri,
) -> mpsc::Sender<In>
) -> (mpsc::Sender<In>, mpsc::Receiver<Message<Out>>)
where
S: Sink<Message<Out>, Error = E> + Unpin + Send + Clone + 'static,
E: std::fmt::Debug + std::fmt::Display + Send + 'static,
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
// Set up a proxy channel to relay messages to the telemetry core, and return one end of it.
// Once a connection to the backend is established, we pass messages along to it. If the connection
// fails, we
let (tx_to_connection_proxy, mut rx_from_external_proxy) = mpsc::channel(10);
tokio::spawn(async move {
let mut connected = false;
let (tx_in, mut rx_in) = mpsc::channel(10);
let (mut tx_out, rx_out) = mpsc::channel(10);
let mut is_connected = false;
tokio::spawn(async move {
loop {
// Throw away any pending messages from the incoming channel so that it
// doesn't get filled up and begin blocking while we're looping and waiting
// for a reconnection.
while let Ok(Some(_)) = rx_from_external_proxy.try_next() {}
while let Ok(Some(_)) = rx_in.try_next() {}
// The connection will pass messages back to this.
let tx_from_connection = tx_to_external.clone();
// Try to connect. If connection established, we serialize and forward messages
// to/from the core. If the external channels break, we end for good. If the internal
// channels break, we loop around and try connecting again.
match ws_client::connect(&telemetry_uri).await {
Ok((tx_to_core, mut rx_from_core)) => {
is_connected = true;
let mut tx_out = tx_out.clone();
// Attempt to reconnect.
match create_ws_connection_no_retry(tx_from_connection, telemetry_uri.clone()).await {
Ok(mut tx_to_connection) => {
connected = true;
// Inform the handler loop that we've reconnected.
tx_to_external
.send(Message::Connected)
.await
.expect("must be able to send reconnect msg");
// Start forwarding messages on to the backend.
while let Some(msg) = rx_from_external_proxy.next().await {
if let Err(e) = tx_to_connection.send(msg).await {
// Issue forwarding a message to the telemetry core?
// Give up and try to reconnect on the next outer loop iteration.
log::error!(
"Error sending message to websocker server (will reconnect): {}",
e
);
break;
}
if let Err(e) = tx_out.send(Message::Connected).await {
// If receiving end is closed, bail now.
log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return
}
}
Err(e) => {
// Loop, forwarding messages to and from the core until something goes wrong.
loop {
tokio::select! {
msg = rx_from_core.next() => {
let msg = match msg {
Some(msg) => msg,
// No more messages from core? core WS is disconnected.
None => {
log::warn!("No more messages from core: shutting down connection (will reconnect)");
break
}
};
let bytes = match msg {
Ok(ws_client::RecvMessage::Binary(bytes)) => bytes,
Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(),
Err(e) => {
log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e);
break;
}
};
let msg = bincode::options()
.deserialize(&bytes)
.expect("internal messages must be deserializable");
if let Err(e) = tx_out.send(Message::Data(msg)).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
},
msg = rx_in.next() => {
let msg = match msg {
Some(msg) => msg,
None => {
log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)");
return
}
};
let bytes = bincode::options()
.serialize(&msg)
.expect("internal messages must be serializable");
let ws_msg = ws_client::SentMessage::Binary(bytes);
if let Err(e) = tx_to_core.unbounded_send(ws_msg) {
log::warn!("Unable to send message to core; shutting down connection (will reconnect): {}", e);
break;
}
}
};
}
},
Err(connect_err) => {
// Issue connecting? Wait and try again on the next loop iteration.
log::error!(
"Error connecting to websocker server (will reconnect): {}",
e
connect_err
);
}
};
// Tell the aggregator that we're disconnected so that, if we like, we can discard
// messages without doing any futher processing on them.
if connected {
connected = false;
let _ = tx_to_external.send(Message::Disconnected).await;
}
// Wait a little before trying to reconnect.
if is_connected {
is_connected = false;
if let Err(e) = tx_out.send(Message::Disconnected).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
}
// Wait a little before we try to connect again.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
tx_to_connection_proxy
}
/// This spawns a connection to a websocket server, serializing/deserialziing
/// to/from bincode as messages are sent or received.
async fn create_ws_connection_no_retry<In, Out, S, E>(
mut tx_to_external: S,
telemetry_uri: http::Uri,
) -> anyhow::Result<mpsc::Sender<In>>
where
S: Sink<Message<Out>, Error = E> + Unpin + Send + 'static,
E: std::fmt::Debug + std::fmt::Display,
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
use bincode::Options;
use soketto::handshake::{Client, ServerResponse};
let host = telemetry_uri.host().unwrap_or("127.0.0.1");
let port = telemetry_uri.port_u16().unwrap_or(8000);
let path = telemetry_uri.path();
let socket = TcpStream::connect((host, port)).await?;
socket.set_nodelay(true).expect("socket set_nodelay failed");
// Open a websocket connection with the telemetry core:
let mut client = Client::new(socket.compat(), host, &path);
let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? {
ServerResponse::Accepted { .. } => client.into_builder().finish(),
ServerResponse::Redirect { status_code, .. } | ServerResponse::Rejected { status_code } => {
return Err(anyhow::anyhow!(
"Failed to connect to {}{}, status code: {}",
host,
path,
status_code
));
}
};
// This task reads data sent from the telemetry core and
// forwards it to our aggregator loop:
tokio::spawn(async move {
loop {
let mut data = Vec::new();
if let Err(e) = ws_from_connection.receive_data(&mut data).await {
// Couldn't receive data may mean all senders are gone, so log
// the error and shut this down:
log::error!(
"Shutting down websocket connection: Failed to receive data: {}",
e
);
return;
}
// Attempt to deserialize, and send to our handler loop:
match bincode::options().deserialize(&data) {
Ok(msg) => {
if let Err(e) = tx_to_external.send(Message::Data(msg)).await {
// Failure to send to our loop likely means it's hit an
// issue and shut down, so bail on this loop as well:
log::error!(
"Shutting down websocket connection: Failed to send data out: {}",
e
);
return;
}
}
Err(err) => {
// Log the error but otherwise ignore it and keep running:
log::warn!("Failed to decode message from Backend Core: {:?}", err);
}
}
}
});
// This task receives messages from the aggregator,
// encodes them and sends them to the telemetry core:
let (tx_to_connection, mut rx_from_aggregator) = mpsc::channel(10);
tokio::spawn(async move {
while let Some(msg) = rx_from_aggregator.next().await {
let bytes = bincode::options()
.serialize(&msg)
.expect("must be able to serialize msg");
// Any errors sending the message leads to this task ending, which should cascade to
// the entire connection being ended.
if let Err(e) = ws_to_connection.send_binary_mut(bytes).await {
log::error!(
"Shutting down websocket connection: Failed to send data in: {}",
e
);
return;
}
if let Err(e) = ws_to_connection.flush().await {
log::error!(
"Shutting down websocket connection: Failed to flush data: {}",
e
);
return;
}
}
});
// We return a channel that you can send messages down in order to have
// them sent to the telemetry core:
Ok(tx_to_connection)
}
(tx_in, rx_out)
}
-4
View File
@@ -5,10 +5,6 @@ pub mod server;
/// is the slightly-lossy inverse of the custom serialization we do to feed messages.
pub mod feed_message_de;
/// A wrapper around soketto to simplify the process of establishing connections
/// and sending messages. Provides cancel-safe message channels.
pub mod ws_client;
/// A couple of macros to make it easier to test for the presense of things (mainly, feed messages)
/// in an iterable container.
#[macro_use]
+1 -1
View File
@@ -1,7 +1,7 @@
use std::{ops::{Deref, DerefMut}, time::Duration};
use crate::feed_message_de::FeedMessage;
use crate::ws_client;
use common::ws_client;
use futures::{Sink, SinkExt, Stream, StreamExt};
/// Wrap a `ws_client::Sender` with convenient utility methods for shard connections
+1 -1
View File
@@ -1,5 +1,5 @@
use super::{channels, utils};
use crate::ws_client;
use common::ws_client;
use common::{id_type, DenseMap};
use std::ffi::OsString;
use std::marker::PhantomData;
+1 -1
View File
@@ -1,4 +1,4 @@
use crate::ws_client;
use common::ws_client;
use anyhow::{anyhow, Context};
use tokio::io::BufReader;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite};