Migrate telemetry_core to Hyper+Soketto

This commit is contained in:
James Wilson
2021-07-23 15:57:50 +01:00
parent 649fb966d2
commit 0eff32d10e
7 changed files with 389 additions and 224 deletions
@@ -74,7 +74,7 @@ impl Aggregator {
/// Return a sink that a shard can send messages into to be handled by the aggregator.
pub fn subscribe_shard(
&self,
) -> impl Sink<inner_loop::FromShardWebsocket, Error = anyhow::Error> + Unpin {
) -> impl Sink<inner_loop::FromShardWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let shard_conn_id = self
@@ -96,7 +96,7 @@ impl Aggregator {
/// Return a sink that a feed can send messages into to be handled by the aggregator.
pub fn subscribe_feed(
&self,
) -> impl Sink<inner_loop::FromFeedWebsocket, Error = anyhow::Error> + Unpin {
) -> impl Sink<inner_loop::FromFeedWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let feed_conn_id = self
+233 -218
View File
@@ -2,8 +2,6 @@ mod aggregator;
mod feed_message;
mod find_location;
mod state;
use std::net::SocketAddr;
use std::str::FromStr;
use aggregator::{
@@ -15,8 +13,8 @@ use common::ready_chunks_all::ReadyChunksAll;
use futures::{channel::mpsc, SinkExt, StreamExt};
use simple_logger::SimpleLogger;
use structopt::StructOpt;
use warp::filters::ws;
use warp::Filter;
use hyper::{ Response, Method };
use common::http_utils;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
@@ -60,65 +58,63 @@ async fn main() {
/// Declare our routes and start the server.
async fn start_server(opts: Opts) -> anyhow::Result<()> {
let shard_aggregator = Aggregator::spawn(opts.denylist).await?;
let feed_aggregator = shard_aggregator.clone();
let aggregator = Aggregator::spawn(opts.denylist).await?;
let server = http_utils::start_server(opts.socket, move |addr, req| {
let aggregator = aggregator.clone();
println!("REQUEST: {:?}", (req.method(), req.uri().path()));
async move {
match (req.method(), req.uri().path().trim_end_matches('/')) {
// Check that the server is up and running:
(&Method::GET, "/health") => {
Ok(Response::new("OK".into()))
},
// Subscribe to feed messages:
(&Method::GET, "/feed") => {
Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move {
let tx_to_aggregator = aggregator.subscribe_feed();
let (mut tx_to_aggregator, mut ws_send)
= handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator).await;
log::info!("Closing /feed connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await;
let _ = ws_send.close().await;
}))
},
// Subscribe to shard messages:
(&Method::GET, "/shard_submit") => {
Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move {
let tx_to_aggregator = aggregator.subscribe_shard();
let (mut tx_to_aggregator, mut ws_send)
= handle_shard_websocket_connection(ws_send, ws_recv, tx_to_aggregator).await;
log::info!("Closing /shard_submit connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromShardWebsocket::Disconnected).await;
let _ = ws_send.close().await;
}))
},
// 404 for anything else:
_ => {
Ok(Response::builder()
.status(404)
.body("Not found".into())
.unwrap())
}
}
}
});
// Handle requests to /health by returning OK.
let health_route = warp::path("health").map(|| "OK");
// Handle websocket requests from shards.
let ws_shard_submit_route = warp::path("shard_submit")
.and(warp::ws())
.and(warp::filters::addr::remote())
.map(move |ws: ws::Ws, addr: Option<SocketAddr>| {
let tx_to_aggregator = shard_aggregator.subscribe_shard();
log::info!("Opening /shard_submit connection from {:?}", addr);
ws.on_upgrade(move |websocket| async move {
let (mut tx_to_aggregator, websocket) =
handle_shard_websocket_connection(websocket, tx_to_aggregator).await;
log::info!("Closing /shard_submit connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator
.send(FromShardWebsocket::Disconnected)
.await;
let _ = websocket.close().await;
})
});
// Handle websocket requests from frontends.
let ws_feed_route = warp::path("feed")
.and(warp::ws())
.and(warp::filters::addr::remote())
.map(move |ws: ws::Ws, addr: Option<SocketAddr>| {
let tx_to_aggregator = feed_aggregator.subscribe_feed();
log::info!("Opening /feed connection from {:?}", addr);
// We can decide how many messages can be buffered to be sent, but not specifically how
// large those messages are cumulatively allowed to be:
ws.max_send_queue(1_000)
.on_upgrade(move |websocket| async move {
let (mut tx_to_aggregator, websocket) =
handle_feed_websocket_connection(websocket, tx_to_aggregator).await;
log::info!("Closing /feed connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await;
let _ = websocket.close().await;
})
});
// Merge the routes and start our server:
let routes = ws_shard_submit_route.or(ws_feed_route).or(health_route);
warp::serve(routes).run(opts.socket).await;
server.await?;
Ok(())
}
/// This handles messages coming to/from a shard connection
async fn handle_shard_websocket_connection<S>(
mut websocket: ws::WebSocket,
mut ws_send: http_utils::WsSender,
mut ws_recv: http_utils::WsReceiver,
mut tx_to_aggregator: S,
) -> (S, ws::WebSocket)
) -> (S, http_utils::WsSender)
where
S: futures::Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin,
S: futures::Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
{
let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::unbounded();
@@ -128,102 +124,119 @@ where
};
if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e);
return (tx_to_aggregator, websocket);
return (tx_to_aggregator, ws_send);
}
// Loop, handling new messages from the shard or from the aggregator:
loop {
tokio::select! {
// AGGREGATOR -> SHARD
msg = rx_from_aggregator.next() => {
// End the loop when connection from aggregator ends:
let msg = match msg {
Some(msg) => msg,
None => break
};
// Channels to notify each loop if the other closes:
let (recv_closer_tx, mut recv_closer_rx) = tokio::sync::oneshot::channel::<()>();
let (send_closer_tx, mut send_closer_rx) = tokio::sync::oneshot::channel::<()>();
let internal_msg = match msg {
ToShardWebsocket::Mute { local_id, reason } => {
internal_messages::FromTelemetryCore::Mute { local_id, reason }
}
};
// Receive messages from a shard:
let recv_handle = tokio::spawn(async move {
loop {
let mut bytes = Vec::new();
let bytes = bincode::options()
.serialize(&internal_msg)
.expect("message to shard should serialize");
// Receive a message, or bail if closer called. We don't care about cancel safety;
// if we're halfway through receiving a message, no biggie since we're closing the
// connection anyway.
let msg_info = tokio::select! {
msg_info = ws_recv.receive_data(&mut bytes) => msg_info,
_ = &mut recv_closer_rx => { break }
};
if let Err(e) = websocket.send(ws::Message::binary(bytes)).await {
log::error!("Error sending message to shard; booting it: {}", e);
break
}
// Handle the socket closing, or errors receiving the message.
if let Err(soketto::connection::Error::Closed) = msg_info {
break;
}
if let Err(e) = msg_info {
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
break;
}
// SHARD -> AGGREGATOR
msg = websocket.next() => {
// End the loop when connection from shard ends:
let msg = match msg {
Some(msg) => msg,
None => break
};
let msg = match msg {
Err(e) => {
log::error!("Error receiving message from shard; booting it: {}", e);
break;
},
Ok(msg) => msg
};
// Close message? Break and allow connection to be dropped.
if msg.is_close() {
let msg: internal_messages::FromShardAggregator = match bincode::options().deserialize(&bytes) {
Ok(msg) => msg,
Err(e) => {
log::error!("Failed to deserialize message from shard; booting it: {}", e);
break;
}
};
// If the message isn't something we want to handle, just ignore it.
// This includes system messages like "pings" and such, so don't log anything.
if !msg.is_binary() && !msg.is_text() {
continue;
}
// Convert and send to the aggregator:
let aggregator_msg = match msg {
internal_messages::FromShardAggregator::AddNode { ip, node, local_id, genesis_hash } => {
FromShardWebsocket::Add { ip, node, genesis_hash, local_id }
},
internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => {
FromShardWebsocket::Update { local_id, payload }
},
internal_messages::FromShardAggregator::RemoveNode { local_id } => {
FromShardWebsocket::Remove { local_id }
},
};
let bytes = msg.as_bytes();
let msg: internal_messages::FromShardAggregator = match bincode::options().deserialize(bytes) {
Ok(msg) => msg,
Err(e) => {
log::error!("Failed to deserialize message from shard; booting it: {}", e);
break;
}
};
// Convert and send to the aggregator:
let aggregator_msg = match msg {
internal_messages::FromShardAggregator::AddNode { ip, node, local_id, genesis_hash } => {
FromShardWebsocket::Add { ip, node, genesis_hash, local_id }
},
internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => {
FromShardWebsocket::Update { local_id, payload }
},
internal_messages::FromShardAggregator::RemoveNode { local_id } => {
FromShardWebsocket::Remove { local_id }
},
};
if let Err(e) = tx_to_aggregator.send(aggregator_msg).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e);
break;
}
if let Err(e) = tx_to_aggregator.send(aggregator_msg).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e);
break;
}
}
}
drop(send_closer_tx); // Kill the send task if this recv task ends
tx_to_aggregator
});
// Send messages to the shard:
let send_handle = tokio::spawn(async move {
loop {
let msg = tokio::select! {
msg = rx_from_aggregator.next() => msg,
_ = &mut send_closer_rx => { break }
};
let msg = match msg {
Some(msg) => msg,
None => break
};
let internal_msg = match msg {
ToShardWebsocket::Mute { local_id, reason } => {
internal_messages::FromTelemetryCore::Mute { local_id, reason }
}
};
let bytes = bincode::options()
.serialize(&internal_msg)
.expect("message to shard should serialize");
if let Err(e) = ws_send.send_binary(bytes).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e)
}
if let Err(e) = ws_send.flush().await {
log::error!("Failed to flush message to aggregator; closing shard: {}", e)
}
}
drop(recv_closer_tx); // Kill the recv task if this send task ends
ws_send
});
// If our send/recv tasks are stopped (if one of them dies, they both will),
// collect the bits we need to hand back from them:
let ws_send = send_handle.await.unwrap();
let tx_to_aggregator = recv_handle.await.unwrap();
// loop ended; give socket back to parent:
(tx_to_aggregator, websocket)
(tx_to_aggregator, ws_send)
}
/// This handles messages coming from a feed connection
async fn handle_feed_websocket_connection<S>(
mut websocket: ws::WebSocket,
mut ws_send: http_utils::WsSender,
mut ws_recv: http_utils::WsReceiver,
mut tx_to_aggregator: S,
) -> (S, ws::WebSocket)
) -> (S, http_utils::WsSender)
where
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin,
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
{
// unbounded channel so that slow feeds don't block aggregator progress:
let (tx_to_feed_conn, rx_from_aggregator) = mpsc::unbounded();
@@ -235,106 +248,108 @@ where
};
if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e);
return (tx_to_aggregator, websocket);
return (tx_to_aggregator, ws_send);
}
// Loop, handling new messages from the shard or from the aggregator:
loop {
// Without any special handling, if messages come in every ~2.5ms to each feed, the select! loop
// has to wake up 400 times a second to poll things. If we have 1000 feeds, that's 400,000 wakeups
// per second. Even without any work in the loop, that uses a bunch of CPU. As an example, try
// replacing the loop with this:
//
// ```
// let s = tokio::time::sleep(tokio::time::Duration::from_micros(2500));
// tokio::select! {
// _ = s => {},
// _ = websocket.next() => {}
// }
// continue;
// ```
//
// To combat this, we add a small wait to reduce how often the select loop will be woken up under high load. We
// buffer messages to feeds so that we do as much work as possible during each wakeup, and if the
// wakeup lasts longer than 75ms we don't wait before polling again. This knocks ~80% of a CPU worth of usage
// off on my machine running a soak test with 500 feeds, 4 shards and 100 nodes, doesn't seem to impact
// memory usage much, and still ensures that messages are delivered in a timely fashion.
//
// Increasing the wait to 100ms or more doesn't seem to have much more of a positive impact anyway.
let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75));
// Channels to notify each loop if the other closes:
let (recv_closer_tx, mut recv_closer_rx) = tokio::sync::oneshot::channel::<()>();
let (send_closer_tx, mut send_closer_rx) = tokio::sync::oneshot::channel::<()>();
tokio::select! {biased;
// Receive messages from the feed:
let recv_handle = tokio::spawn(async move {
loop {
let mut bytes = Vec::new();
// FRONTEND -> AGGREGATOR (relay messages to the aggregator). Biased, so messages
// from the UI will have priority (especially important with our debounce delay).
msg = websocket.next() => {
// End the loop when connection from feed ends:
let msg = match msg {
Some(msg) => msg,
None => break
};
// Receive a message, or bail if closer called. We don't care about cancel safety;
// if we're halfway through receiving a message, no biggie since we're closing the
// connection anyway.
let msg_info = tokio::select! {
msg_info = ws_recv.receive_data(&mut bytes) => msg_info,
_ = &mut recv_closer_rx => { break }
};
// If we see any errors, log them and end our loop:
let msg = match msg {
Err(e) => {
log::error!("Error in node websocket connection: {}", e);
break;
},
Ok(msg) => msg
};
// Close message? Break and allow connection to be dropped.
if msg.is_close() {
break;
}
// We ignore all but text messages from the frontend:
let text = match msg.to_str() {
Ok(s) => s,
Err(_) => continue
};
// Parse the message into a command we understand and send it to the aggregator:
let cmd = match FromFeedWebsocket::from_str(text) {
Ok(cmd) => cmd,
Err(e) => {
log::warn!("Ignoring invalid command '{}' from the frontend: {}", text, e);
continue
}
};
if let Err(e) = tx_to_aggregator.send(cmd).await {
log::error!("Failed to send message to aggregator; closing feed: {}", e);
break;
}
// Handle the socket closing, or errors receiving the message.
if let Err(soketto::connection::Error::Closed) = msg_info {
break;
}
if let Err(e) = msg_info {
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
break;
}
// AGGREGATOR -> FRONTEND (buffer messages to the UI)
msgs = rx_from_aggregator_chunks.next() => {
// End the loop when connection from aggregator ends:
let msgs = match msgs {
Some(msgs) => msgs,
None => break
};
// We ignore all but valid UTF8 text messages from the frontend:
let text = match String::from_utf8(bytes) {
Ok(s) => s,
Err(_) => continue
};
// There is only one message type at the mo; bytes to send
// to the websocket. collect them all up to dispatch in one shot.
let all_ws_msgs = msgs.into_iter().map(|msg| {
let bytes = match msg {
ToFeedWebsocket::Bytes(bytes) => bytes
};
Ok(ws::Message::binary(&*bytes))
});
if let Err(e) = websocket.send_all(&mut futures::stream::iter(all_ws_msgs)).await {
log::warn!("Closing feed websocket due to error: {}", e);
break;
// Parse the message into a command we understand and send it to the aggregator:
let cmd = match FromFeedWebsocket::from_str(&text) {
Ok(cmd) => cmd,
Err(e) => {
log::warn!("Ignoring invalid command '{}' from the frontend: {}", text, e);
continue
}
};
if let Err(e) = tx_to_aggregator.send(cmd).await {
log::error!("Failed to send message to aggregator; closing feed: {}", e);
break;
}
}
debounce.await;
}
drop(send_closer_tx); // Kill the send task if this recv task ends
tx_to_aggregator
});
// Send messages to the feed:
let send_handle = tokio::spawn(async move {
loop {
let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75));
let msgs = tokio::select! {
msgs = rx_from_aggregator_chunks.next() => msgs,
_ = &mut send_closer_rx => { break }
};
// End the loop when connection from aggregator ends:
let msgs = match msgs {
Some(msgs) => msgs,
None => break
};
// There is only one message type at the mo; bytes to send
// to the websocket. collect them all up to dispatch in one shot.
let all_msg_bytes = msgs.into_iter().map(|msg| {
match msg {
ToFeedWebsocket::Bytes(bytes) => bytes
}
});
for bytes in all_msg_bytes {
if let Err(e) = ws_send.send_binary(&bytes).await {
log::warn!("Closing feed websocket due to error sending data: {}", e);
break;
}
}
if let Err(e) = ws_send.flush().await {
log::warn!("Closing feed websocket due to error flushing data: {}", e);
break;
}
debounce.await;
}
drop(recv_closer_tx); // Kill the recv task if this send task ends
ws_send
});
// If our send/recv tasks are stopped (if one of them dies, they both will),
// collect the bits we need to hand back from them:
let ws_send = send_handle.await.unwrap();
let tx_to_aggregator = recv_handle.await.unwrap();
// loop ended; give socket back to parent:
(tx_to_aggregator, websocket)
(tx_to_aggregator, ws_send)
}