mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-13 09:11:03 +00:00
put a deadline on message sending to feeds
This commit is contained in:
@@ -3,6 +3,7 @@ mod feed_message;
|
||||
mod find_location;
|
||||
mod state;
|
||||
use std::str::FromStr;
|
||||
use tokio::time::{ Duration, Instant };
|
||||
|
||||
use aggregator::{
|
||||
Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket,
|
||||
@@ -304,7 +305,7 @@ where
|
||||
// Send messages to the feed:
|
||||
let send_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
let debounce = tokio::time::sleep_until(tokio::time::Instant::now() + std::time::Duration::from_millis(75));
|
||||
let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75));
|
||||
|
||||
let msgs = tokio::select! {
|
||||
msgs = rx_from_aggregator_chunks.next() => msgs,
|
||||
@@ -325,6 +326,11 @@ where
|
||||
}
|
||||
});
|
||||
|
||||
// We have 10 seconds to send and flush messages. If the client isn't keeping up with our
|
||||
// messages, the number we obtain from `ReadyChunksAll` will gradually increase and eventually
|
||||
// we'll hit this deadline and the client will be booted.
|
||||
let message_send_deadline = Instant::now() + Duration::from_secs(10);
|
||||
|
||||
for bytes in all_msg_bytes {
|
||||
if let Err(e) = ws_send.send_binary(&bytes).await {
|
||||
log::warn!("Closing feed websocket due to error sending data: {}", e);
|
||||
@@ -332,7 +338,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = ws_send.flush().await {
|
||||
if let Err(e) = tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await {
|
||||
log::warn!("Closing feed websocket due to error flushing data: {}", e);
|
||||
break;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user