diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index f9225c6..7a399ca 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -67,8 +67,8 @@ impl Connection { }; let msg = match message_data { - soketto::Data::Text(_) => Ok(RecvMessage::Binary(data)), - soketto::Data::Binary(_) => String::from_utf8(data) + soketto::Data::Binary(_) => Ok(RecvMessage::Binary(data)), + soketto::Data::Text(_) => String::from_utf8(data) .map(|s| RecvMessage::Text(s)) .map_err(|e| e.into()), }; diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 251a3a1..3824e71 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -33,12 +33,16 @@ struct Opts { socket: std::net::SocketAddr, /// The desired log level; one of 'error', 'warn', 'info', 'debug' or 'trace', where /// 'error' only logs errors and 'trace' logs everything. - #[structopt(required = false, long = "log", default_value = "info")] + #[structopt(long = "log", default_value = "info")] log_level: log::LevelFilter, /// Space delimited list of the names of chains that are not allowed to connect to /// telemetry. Case sensitive. - #[structopt(required = false, long = "denylist")] + #[structopt(long, required = false)] denylist: Vec, + /// If it takes longer than this number of seconds to send the current batch of messages + /// to a feed, the feed connection will be closed. + #[structopt(long, default_value = "10")] + feed_timeout: u64 } #[tokio::main] @@ -60,7 +64,10 @@ async fn main() { /// Declare our routes and start the server. async fn start_server(opts: Opts) -> anyhow::Result<()> { let aggregator = Aggregator::spawn(opts.denylist).await?; - let server = http_utils::start_server(opts.socket, move |addr, req| { + let socket_addr = opts.socket; + let feed_timeout = opts.feed_timeout; + + let server = http_utils::start_server(socket_addr, move |addr, req| { let aggregator = aggregator.clone(); async move { match (req.method(), req.uri().path().trim_end_matches('/')) { @@ -73,7 +80,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move { let tx_to_aggregator = aggregator.subscribe_feed(); let (mut tx_to_aggregator, mut ws_send) - = handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator).await; + = handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator, feed_timeout).await; log::info!("Closing /feed connection from {:?}", addr); // Tell the aggregator that this connection has closed, so it can tidy up. let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; @@ -234,6 +241,7 @@ async fn handle_feed_websocket_connection( mut ws_send: http_utils::WsSender, mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, + feed_timeout: u64 ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, @@ -304,7 +312,7 @@ where // Send messages to the feed: let send_handle = tokio::spawn(async move { - loop { + 'outer: loop { let debounce = tokio::time::sleep_until(Instant::now() + Duration::from_millis(75)); let msgs = tokio::select! { @@ -326,21 +334,34 @@ where } }); - // We have 10 seconds to send and flush messages. If the client isn't keeping up with our + // We have a deadline to send and flush messages. If the client isn't keeping up with our // messages, the number we obtain from `ReadyChunksAll` will gradually increase and eventually // we'll hit this deadline and the client will be booted. - let message_send_deadline = Instant::now() + Duration::from_secs(10); + let message_send_deadline = Instant::now() + Duration::from_secs(feed_timeout); for bytes in all_msg_bytes { - if let Err(e) = ws_send.send_binary(&bytes).await { - log::warn!("Closing feed websocket due to error sending data: {}", e); - break; + match tokio::time::timeout_at(message_send_deadline, ws_send.send_binary(&bytes)).await { + Err(_) => { + log::warn!("Closing feed websocket that was too slow to keep up (1)"); + break 'outer; + } + Ok(Err(e)) => { + log::warn!("Closing feed websocket due to error sending data: {}", e); + break 'outer; + } + Ok(_) => {} } } - - if let Err(e) = tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await { - log::warn!("Closing feed websocket due to error flushing data: {}", e); - break; + match tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await { + Err(_) => { + log::warn!("Closing feed websocket that was too slow to keep up (2)"); + break + } + Ok(Err(e)) => { + log::warn!("Closing feed websocket due to error flushing data: {}", e); + break; + } + Ok(_) => {} } debounce.await; diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index 22e66dc..ddc1238 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -6,7 +6,7 @@ use std::time::Duration; use test_utils::{ assert_contains_matches, feed_message_de::{FeedMessage, NodeDetails}, - workspace::start_server_debug + workspace::{ start_server, CoreOpts, start_server_debug } }; /// The simplest test we can run; the main benefit of this test (since we check similar) @@ -476,3 +476,64 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { // Tidy up: server.shutdown().await; } + +/// Feeds will be disconnected if they can't receive messages quickly enough. +#[tokio::test] +async fn slow_feeds_are_disconnected() { + // Start server in release mode with a 1s feed timeout (to make the test run faster): + let mut server = start_server( + true, + CoreOpts { feed_timeout: Some(1) } + ).await; + + // Give us a shard to talk to: + let shard_id = server.add_shard().await.unwrap(); + let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap(); + + // Add a load of nodes from this shard so there's plenty of data to give to a feed. + // We want to exhaust any buffers between core and feed (eg BufWriters). + for n in 1..50_000 { + node_tx.send_json_text(json!({ + "id":n, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Polkadot", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Alice {}", n), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })).unwrap(); + } + + // Connect a raw feed so that we can control how fast we consume data from the websocket + let (mut raw_feed_tx, mut raw_feed_rx) = server.get_core().connect_feed_raw().await.unwrap(); + + // Subscribe the feed: + raw_feed_tx.send_text("subscribe:Polkadot").await.unwrap(); + + // Wait a little.. the feed hasn't been receiving messages so it should + // be booted after ~a second. + tokio::time::sleep(Duration::from_secs(2)).await; + + let mut v = Vec::new(); + + // Drain anything out and expect to hit a "closed" error. + let res = loop { + if let Err(e) = raw_feed_rx.receive_data(&mut v).await { + break e + } + }; + assert!( + matches!(res, soketto::connection::Error::Closed), + "Should be Closed error, but is {:?}", res + ); + + // Tidy up: + server.shutdown().await; +} \ No newline at end of file diff --git a/backend/test_utils/src/workspace/mod.rs b/backend/test_utils/src/workspace/mod.rs index b9803ff..e0d2f0f 100644 --- a/backend/test_utils/src/workspace/mod.rs +++ b/backend/test_utils/src/workspace/mod.rs @@ -1,4 +1,4 @@ mod commands; mod start_server; -pub use start_server::{ start_server_debug, start_server_release }; \ No newline at end of file +pub use start_server::*; \ No newline at end of file diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index 8bbf590..1c0f4e4 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -1,6 +1,19 @@ use super::commands; use crate::server::{self, Server, Command}; +/// Additional options to pass to the feed command. +pub struct CoreOpts { + pub feed_timeout: Option +} + +impl Default for CoreOpts { + fn default() -> Self { + Self { + feed_timeout: None + } + } +} + /// Start a telemetry server. We'll use `cargo run` by default, but you can also provide /// env vars to configure the binary that runs for the shard and core process. Either: /// @@ -18,7 +31,7 @@ use crate::server::{self, Server, Command}; /// - `TELEMETRY_SUBMIT_HOSTS` - hosts (comma separated) to connect to for telemetry `/submit`s. /// - `TELEMETRY_FEED_HOST` - host to connect to for feeds (eg 127.0.0.1:3000) /// -pub async fn start_server(release_mode: bool) -> Server { +pub async fn start_server(release_mode: bool, core_opts: CoreOpts) -> Server { // Start to a single process: if let Ok(bin) = std::env::var("TELEMETRY_BIN") { return Server::start(server::StartOpts::SingleProcess { @@ -38,13 +51,24 @@ pub async fn start_server(release_mode: bool) -> Server { }).await.unwrap(); } - // Start a shard and core process: + // Build the shard command let shard_command = std::env::var("TELEMETRY_SHARD_BIN") .map(|val| Command::new(val)) .unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("must be in rust workspace to run shard command")); - let core_command = std::env::var("TELEMETRY_CORE_BIN") + + // Build the core command + let mut core_command = std::env::var("TELEMETRY_CORE_BIN") .map(|val| Command::new(val)) .unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("must be in rust workspace to run core command")); + + // Append additional opts to the core command + if let Some(feed_timeout) = core_opts.feed_timeout { + core_command = core_command + .arg("--feed-timeout") + .arg(feed_timeout.to_string()); + } + + // Star the server Server::start(server::StartOpts::ShardAndCore { shard_command, core_command @@ -53,10 +77,10 @@ pub async fn start_server(release_mode: bool) -> Server { /// Start a telemetry core server in debug mode. see [`start_server`] for details. pub async fn start_server_debug() -> Server { - start_server(false).await + start_server(false, CoreOpts::default()).await } /// Start a telemetry core server in release mode. see [`start_server`] for details. pub async fn start_server_release() -> Server { - start_server(true).await + start_server(true, CoreOpts::default()).await } \ No newline at end of file