make possible to test, test, and fix feed cutoff

This commit is contained in:
James Wilson
2021-07-26 16:38:24 +01:00
parent 50935b29fc
commit ecf5fccaab
5 changed files with 129 additions and 23 deletions
+35 -14
View File
@@ -33,12 +33,16 @@ struct Opts {
socket: std::net::SocketAddr,
/// The desired log level; one of 'error', 'warn', 'info', 'debug' or 'trace', where
/// 'error' only logs errors and 'trace' logs everything.
#[structopt(required = false, long = "log", default_value = "info")]
#[structopt(long = "log", default_value = "info")]
log_level: log::LevelFilter,
/// Space delimited list of the names of chains that are not allowed to connect to
/// telemetry. Case sensitive.
#[structopt(required = false, long = "denylist")]
#[structopt(long, required = false)]
denylist: Vec<String>,
/// If it takes longer than this number of seconds to send the current batch of messages
/// to a feed, the feed connection will be closed.
#[structopt(long, default_value = "10")]
feed_timeout: u64
}
#[tokio::main]
@@ -60,7 +64,10 @@ async fn main() {
/// Declare our routes and start the server.
async fn start_server(opts: Opts) -> anyhow::Result<()> {
let aggregator = Aggregator::spawn(opts.denylist).await?;
let server = http_utils::start_server(opts.socket, move |addr, req| {
let socket_addr = opts.socket;
let feed_timeout = opts.feed_timeout;
let server = http_utils::start_server(socket_addr, move |addr, req| {
let aggregator = aggregator.clone();
async move {
match (req.method(), req.uri().path().trim_end_matches('/')) {
@@ -73,7 +80,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move {
let tx_to_aggregator = aggregator.subscribe_feed();
let (mut tx_to_aggregator, mut ws_send)
= handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator).await;
= handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator, feed_timeout).await;
log::info!("Closing /feed connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await;
@@ -234,6 +241,7 @@ async fn handle_feed_websocket_connection<S>(
mut ws_send: http_utils::WsSender,
mut ws_recv: http_utils::WsReceiver,
mut tx_to_aggregator: S,
feed_timeout: u64
) -> (S, http_utils::WsSender)
where
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
@@ -304,7 +312,7 @@ where
// Send messages to the feed:
let send_handle = tokio::spawn(async move {
loop {
'outer: loop {
let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75));
let msgs = tokio::select! {
@@ -326,21 +334,34 @@ where
}
});
// We have 10 seconds to send and flush messages. If the client isn't keeping up with our
// We have a deadline to send and flush messages. If the client isn't keeping up with our
// messages, the number we obtain from `ReadyChunksAll` will gradually increase and eventually
// we'll hit this deadline and the client will be booted.
let message_send_deadline = Instant::now() + Duration::from_secs(10);
let message_send_deadline = Instant::now() + Duration::from_secs(feed_timeout);
for bytes in all_msg_bytes {
if let Err(e) = ws_send.send_binary(&bytes).await {
log::warn!("Closing feed websocket due to error sending data: {}", e);
break;
match tokio::time::timeout_at(message_send_deadline, ws_send.send_binary(&bytes)).await {
Err(_) => {
log::warn!("Closing feed websocket that was too slow to keep up (1)");
break 'outer;
}
Ok(Err(e)) => {
log::warn!("Closing feed websocket due to error sending data: {}", e);
break 'outer;
}
Ok(_) => {}
}
}
if let Err(e) = tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await {
log::warn!("Closing feed websocket due to error flushing data: {}", e);
break;
match tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await {
Err(_) => {
log::warn!("Closing feed websocket that was too slow to keep up (2)");
break
}
Ok(Err(e)) => {
log::warn!("Closing feed websocket due to error flushing data: {}", e);
break;
}
Ok(_) => {}
}
debounce.await;