diff --git a/backend/common/src/node_types.rs b/backend/common/src/node_types.rs index 4cf3c49..3b7c947 100644 --- a/backend/common/src/node_types.rs +++ b/backend/common/src/node_types.rs @@ -22,7 +22,6 @@ pub struct NodeDetails { pub startup_time: Option>, } - /// #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct NodeStats { @@ -42,7 +41,7 @@ impl Serialize for NodeStats { } } -impl <'de> Deserialize<'de> for NodeStats { +impl<'de> Deserialize<'de> for NodeStats { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -52,7 +51,6 @@ impl <'de> Deserialize<'de> for NodeStats { } } - /// #[derive(Default)] pub struct NodeIO { @@ -71,7 +69,6 @@ impl Serialize for NodeIO { } } - /// #[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq)] pub struct Block { @@ -88,7 +85,6 @@ impl Block { } } - /// #[derive(Default)] pub struct NodeHardware { @@ -114,7 +110,6 @@ impl Serialize for NodeHardware { } } - /// #[derive(Debug, Clone, PartialEq)] pub struct NodeLocation { @@ -136,17 +131,20 @@ impl Serialize for NodeLocation { } } -impl <'de> Deserialize<'de> for NodeLocation { +impl<'de> Deserialize<'de> for NodeLocation { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { let (latitude, longitude, city) = <(f32, f32, Box)>::deserialize(deserializer)?; - Ok(NodeLocation { latitude, longitude, city }) + Ok(NodeLocation { + latitude, + longitude, + city, + }) } } - /// #[derive(Debug, Clone, Copy, PartialEq)] pub struct BlockDetails { @@ -182,18 +180,20 @@ impl Serialize for BlockDetails { } } -impl <'de> Deserialize<'de> for BlockDetails { +impl<'de> Deserialize<'de> for BlockDetails { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { let tup = <(u64, BlockHash, u64, u64, Option)>::deserialize(deserializer)?; Ok(BlockDetails { - block: Block { height: tup.0, hash: tup.1 }, + block: Block { + height: tup.0, + hash: tup.1, + }, block_time: tup.2, block_timestamp: tup.3, - propagation_time: tup.4 + propagation_time: tup.4, }) } } - diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 747bd46..f645ceb 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -340,7 +340,8 @@ impl InnerLoop { .collect(); // ... and remove them: - self.remove_nodes_and_broadcast_result(node_ids_to_remove).await; + self.remove_nodes_and_broadcast_result(node_ids_to_remove) + .await; } } } @@ -557,11 +558,7 @@ impl InnerLoop { } /// Send a message to all chain feeds. - fn broadcast_to_chain_feeds( - &mut self, - genesis_hash: &BlockHash, - message: ToFeedWebsocket, - ) { + fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { for &feed_id in feeds { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { diff --git a/backend/telemetry_core/src/feed_message.rs b/backend/telemetry_core/src/feed_message.rs index 5512768..d6986ca 100644 --- a/backend/telemetry_core/src/feed_message.rs +++ b/backend/telemetry_core/src/feed_message.rs @@ -231,4 +231,4 @@ impl FeedMessageWrite for AddedNode<'_> { &node.startup_time(), )); } -} \ No newline at end of file +} diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 508c8b5..aff1b1f 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -94,14 +94,15 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { // We can decide how many messages can be buffered to be sent, but not specifically how // large those messages are cumulatively allowed to be: - ws.max_send_queue(1_000 ).on_upgrade(move |websocket| async move { - let (mut tx_to_aggregator, websocket) = - handle_feed_websocket_connection(websocket, tx_to_aggregator).await; - log::info!("Closing /feed connection from {:?}", addr); - // Tell the aggregator that this connection has closed, so it can tidy up. - let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; - let _ = websocket.close().await; - }) + ws.max_send_queue(1_000) + .on_upgrade(move |websocket| async move { + let (mut tx_to_aggregator, websocket) = + handle_feed_websocket_connection(websocket, tx_to_aggregator).await; + log::info!("Closing /feed connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; + let _ = websocket.close().await; + }) }); // Merge the routes and start our server: diff --git a/backend/telemetry_core/tests/basic_tests.rs b/backend/telemetry_core/tests/basic_tests.rs index ee0ffdb..48ad776 100644 --- a/backend/telemetry_core/tests/basic_tests.rs +++ b/backend/telemetry_core/tests/basic_tests.rs @@ -1,17 +1,19 @@ -use test_utils::{ - feed_message_de::{ FeedMessage, NodeDetails }, - server::{ self, Server }, - assert_contains_matches -}; +use common::node_types::BlockHash; use serde_json::json; use std::time::Duration; -use common::node_types::{ BlockHash }; +use test_utils::{ + assert_contains_matches, + feed_message_de::{FeedMessage, NodeDetails}, + server::{self, Server}, +}; async fn cargo_run_server() -> Server { Server::start(server::StartOpts { shard_command: server::cargo_run_commands::telemetry_shard().expect("valid shard command"), - core_command: server::cargo_run_commands::telemetry_core().expect("valid core command") - }).await.unwrap() + core_command: server::cargo_run_commands::telemetry_core().expect("valid core command"), + }) + .await + .unwrap() } #[tokio::test] @@ -23,7 +25,11 @@ async fn feed_sent_version_on_connect() { // Expect a version response of 31: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert_eq!(feed_messages, vec![FeedMessage::Version(31)], "expecting version"); + assert_eq!( + feed_messages, + vec![FeedMessage::Version(31)], + "expecting version" + ); // Tidy up: server.shutdown().await; @@ -41,7 +47,12 @@ async fn feed_ping_responded_to_with_pong() { // Expect a pong response: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert!(feed_messages.contains(&FeedMessage::Pong { msg: "hello!".to_owned() }), "Expecting pong"); + assert!( + feed_messages.contains(&FeedMessage::Pong { + msg: "hello!".to_owned() + }), + "Expecting pong" + ); // Tidy up: server.shutdown().await; @@ -54,57 +65,56 @@ async fn feed_add_and_remove_node() { let shard_id = server.add_shard().await.unwrap(); // Connect a node to the shard: - let (mut node_tx, _node_rx) = server.get_shard(shard_id) + let (mut node_tx, _node_rx) = server + .get_shard(shard_id) .unwrap() .connect() .await .expect("can connect to shard"); // Send a "system connected" message: - node_tx.send_json_text(json!( - { - "id":1, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Local Testnet", - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name":"Alice", - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - } - )).await.unwrap(); + node_tx + .send_json_text(json!( + { + "id":1, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Local Testnet", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":"Alice", + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + )) + .await + .unwrap(); // Wait a little for this message to propagate to the core // (so that our feed connects after the core knows and not before). tokio::time::sleep(Duration::from_millis(500)).await; // Connect a feed. - let (_feed_tx, mut feed_rx) = server.get_core().connect() - .await.unwrap(); + let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert!(feed_messages.contains( - &FeedMessage::AddedChain { - name: "Local Testnet".to_owned(), - node_count: 1 - } - )); + assert!(feed_messages.contains(&FeedMessage::AddedChain { + name: "Local Testnet".to_owned(), + node_count: 1 + })); // Disconnect the node: node_tx.close().await.unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert!(feed_messages.contains( - &FeedMessage::RemovedChain { - name: "Local Testnet".to_owned(), - } - )); + assert!(feed_messages.contains(&FeedMessage::RemovedChain { + name: "Local Testnet".to_owned(), + })); // Tidy up: server.shutdown().await; @@ -115,36 +125,40 @@ async fn feed_add_and_remove_shard() { let mut server = cargo_run_server().await; let mut shards = vec![]; - for id in 1 ..= 2 { + for id in 1..=2 { // Add a shard: let shard_id = server.add_shard().await.unwrap(); // Connect a node to it: - let (mut node_tx, _node_rx) = server.get_shard(shard_id) + let (mut node_tx, _node_rx) = server + .get_shard(shard_id) .unwrap() .connect() .await .expect("can connect to shard"); // Send a "system connected" message: - node_tx.send_json_text(json!( - { - "id":id, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain": format!("Local Testnet {}", id), - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(id), - "implementation":"Substrate Node", - "msg":"system.connected", - "name":"Alice", - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - } - )).await.unwrap(); + node_tx + .send_json_text(json!( + { + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain": format!("Local Testnet {}", id), + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(id), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":"Alice", + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + )) + .await + .unwrap(); // Keep what we need to to keep connection alive and let us kill a shard: shards.push((shard_id, node_tx)); @@ -155,30 +169,25 @@ async fn feed_add_and_remove_shard() { // The feed should be told about both of the chains that we've sent info about: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert!(feed_messages.contains( - &FeedMessage::AddedChain { - name: "Local Testnet 1".to_owned(), - node_count: 1 - } - )); - assert!(feed_messages.contains( - &FeedMessage::AddedChain { - name: "Local Testnet 2".to_owned(), - node_count: 1 - } - )); + assert!(feed_messages.contains(&FeedMessage::AddedChain { + name: "Local Testnet 1".to_owned(), + node_count: 1 + })); + assert!(feed_messages.contains(&FeedMessage::AddedChain { + name: "Local Testnet 2".to_owned(), + node_count: 1 + })); // Disconnect the first shard: server.kill_shard(shards[0].0).await; // We should be told about the node connected to that shard disconnecting: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert!(feed_messages.contains( - &FeedMessage::RemovedChain { - name: "Local Testnet 1".to_owned(), - } - )); - assert!(!feed_messages.contains( // Spot the "!"; this chain was not removed. + assert!(feed_messages.contains(&FeedMessage::RemovedChain { + name: "Local Testnet 1".to_owned(), + })); + assert!(!feed_messages.contains( + // Spot the "!"; this chain was not removed. &FeedMessage::RemovedChain { name: "Local Testnet 2".to_owned(), } @@ -199,24 +208,27 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { // Send a "system connected" message for a few nodes/chains: for id in 1..=3 { - node_tx.send_json_text(json!( - { - "id":id, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":format!("Local Testnet {}", id), - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(id), - "implementation":"Substrate Node", - "msg":"system.connected", - "name":format!("Alice {}", id), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - } - )).await.unwrap(); + node_tx + .send_json_text(json!( + { + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":format!("Local Testnet {}", id), + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(id), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":format!("Alice {}", id), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + )) + .await + .unwrap(); } // Connect a feed @@ -226,7 +238,10 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1"); // Subscribe it to a chain - feed_tx.send_command("subscribe", "Local Testnet 1").await.unwrap(); + feed_tx + .send_command("subscribe", "Local Testnet 1") + .await + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!( @@ -257,7 +272,10 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { .expect_err("Timeout should elapse since no messages sent"); // We can change our subscription: - feed_tx.send_command("subscribe", "Local Testnet 2").await.unwrap(); + feed_tx + .send_command("subscribe", "Local Testnet 2") + .await + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); // We are told about the subscription change and given similar on-subscribe messages to above. @@ -282,4 +300,4 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { // Tidy up: server.shutdown().await; -} \ No newline at end of file +} diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 7afe85a..b23e899 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -96,7 +96,8 @@ impl Aggregator { }); // Establish a resiliant connection to the core (this retries as needed): - let tx_to_telemetry_core = create_ws_connection_to_core(tx_from_connection, telemetry_uri).await; + let tx_to_telemetry_core = + create_ws_connection_to_core(tx_from_connection, telemetry_uri).await; // Handle any incoming messages in our handler loop: tokio::spawn(Aggregator::handle_messages( diff --git a/backend/test_utils/src/feed_message_de.rs b/backend/test_utils/src/feed_message_de.rs index 7e020ce..930e0eb 100644 --- a/backend/test_utils/src/feed_message_de.rs +++ b/backend/test_utils/src/feed_message_de.rs @@ -1,4 +1,6 @@ -use common::node_types::{BlockDetails, BlockHash, BlockNumber, NodeLocation, NodeStats, Timestamp}; +use common::node_types::{ + BlockDetails, BlockHash, BlockNumber, NodeLocation, NodeStats, Timestamp, +}; use serde_json::value::RawValue; #[derive(Debug, PartialEq)] @@ -77,7 +79,7 @@ pub enum FeedMessage { address: String, block_number: BlockNumber, block_hash: BlockHash, - voter: Option + voter: Option, }, AfgReceivedPrecommit { address: String, @@ -85,7 +87,8 @@ pub enum FeedMessage { block_hash: BlockHash, voter: Option, }, - AfgAuthoritySet { // Not used currently; not sure what "address" params are: + AfgAuthoritySet { + // Not used currently; not sure what "address" params are: a1: String, a2: String, a3: String, @@ -102,8 +105,8 @@ pub enum FeedMessage { /// A "special" case when we don't know how to decode an action: UnknownValue { action: u8, - value: String - } + value: String, + }, } #[derive(Debug, PartialEq)] @@ -138,22 +141,30 @@ impl FeedMessage { 0 => { let version = serde_json::from_str(raw_val.get())?; FeedMessage::Version(version) - }, + } // BestBlock 1 => { - let (block_number, timestamp, avg_block_time) = serde_json::from_str(raw_val.get())?; - FeedMessage::BestBlock { block_number, timestamp, avg_block_time } - }, + let (block_number, timestamp, avg_block_time) = + serde_json::from_str(raw_val.get())?; + FeedMessage::BestBlock { + block_number, + timestamp, + avg_block_time, + } + } // BestFinalized 2 => { let (block_number, block_hash) = serde_json::from_str(raw_val.get())?; - FeedMessage::BestFinalized { block_number, block_hash } + FeedMessage::BestFinalized { + block_number, + block_hash, + } } // AddNode 3 => { let ( node_id, - ( name, implementation, version, validator, network_id ), + (name, implementation, version, validator, network_id), stats, io, hardware, @@ -163,113 +174,153 @@ impl FeedMessage { ) = serde_json::from_str(raw_val.get())?; // Give these two types but don't use the results: - let (_,_): (&RawValue, &RawValue) = (io, hardware); + let (_, _): (&RawValue, &RawValue) = (io, hardware); FeedMessage::AddedNode { node_id, - node: NodeDetails { name, implementation, version, validator, network_id }, + node: NodeDetails { + name, + implementation, + version, + validator, + network_id, + }, stats, block_details, location, startup_time, } - }, + } // RemoveNode 4 => { let node_id = serde_json::from_str(raw_val.get())?; FeedMessage::RemovedNode { node_id } - }, + } // LocatedNode 5 => { let (node_id, lat, long, city) = serde_json::from_str(raw_val.get())?; - FeedMessage::LocatedNode { node_id, lat, long, city } - }, + FeedMessage::LocatedNode { + node_id, + lat, + long, + city, + } + } // ImportedBlock 6 => { let (node_id, block_details) = serde_json::from_str(raw_val.get())?; - FeedMessage::ImportedBlock { node_id, block_details } - }, + FeedMessage::ImportedBlock { + node_id, + block_details, + } + } // FinalizedBlock 7 => { let (node_id, block_number, block_hash) = serde_json::from_str(raw_val.get())?; - FeedMessage::FinalizedBlock { node_id, block_number, block_hash } - }, + FeedMessage::FinalizedBlock { + node_id, + block_number, + block_hash, + } + } // NodeStatsUpdate 8 => { let (node_id, stats) = serde_json::from_str(raw_val.get())?; FeedMessage::NodeStatsUpdate { node_id, stats } - }, + } // Hardware 9 => { let (node_id, _hardware): (_, &RawValue) = serde_json::from_str(raw_val.get())?; FeedMessage::Hardware { node_id } - }, + } // TimeSync 10 => { let time = serde_json::from_str(raw_val.get())?; FeedMessage::TimeSync { time } - }, + } // AddedChain 11 => { let (name, node_count) = serde_json::from_str(raw_val.get())?; FeedMessage::AddedChain { name, node_count } - }, + } // RemovedChain 12 => { let name = serde_json::from_str(raw_val.get())?; FeedMessage::RemovedChain { name } - }, + } // SubscribedTo 13 => { let name = serde_json::from_str(raw_val.get())?; FeedMessage::SubscribedTo { name } - }, + } // UnsubscribedFrom 14 => { let name = serde_json::from_str(raw_val.get())?; FeedMessage::UnsubscribedFrom { name } - }, + } // Pong 15 => { let msg = serde_json::from_str(raw_val.get())?; FeedMessage::Pong { msg } - }, + } // AfgFinalized 16 => { let (address, block_number, block_hash) = serde_json::from_str(raw_val.get())?; - FeedMessage::AfgFinalized { address, block_number, block_hash } - }, + FeedMessage::AfgFinalized { + address, + block_number, + block_hash, + } + } // AfgReceivedPrevote 17 => { - let (address, block_number, block_hash, voter) = serde_json::from_str(raw_val.get())?; - FeedMessage::AfgReceivedPrevote { address, block_number, block_hash, voter } - }, + let (address, block_number, block_hash, voter) = + serde_json::from_str(raw_val.get())?; + FeedMessage::AfgReceivedPrevote { + address, + block_number, + block_hash, + voter, + } + } // AfgReceivedPrecommit 18 => { - let (address, block_number, block_hash, voter) = serde_json::from_str(raw_val.get())?; - FeedMessage::AfgReceivedPrecommit { address, block_number, block_hash, voter } - }, + let (address, block_number, block_hash, voter) = + serde_json::from_str(raw_val.get())?; + FeedMessage::AfgReceivedPrecommit { + address, + block_number, + block_hash, + voter, + } + } // AfgAuthoritySet 19 => { let (a1, a2, a3, block_number, block_hash) = serde_json::from_str(raw_val.get())?; - FeedMessage::AfgAuthoritySet { a1, a2, a3, block_number, block_hash } - }, + FeedMessage::AfgAuthoritySet { + a1, + a2, + a3, + block_number, + block_hash, + } + } // StaleNode 20 => { let node_id = serde_json::from_str(raw_val.get())?; FeedMessage::StaleNode { node_id } - }, + } // NodeIOUpdate 21 => { // ignore NodeIO for now: let (node_id, _node_io): (_, &RawValue) = serde_json::from_str(raw_val.get())?; FeedMessage::NodeIOUpdate { node_id } - }, + } // A catchall for messages we don't know/care about yet: _ => { let value = raw_val.to_string(); FeedMessage::UnknownValue { action, value } - }, + } }; Ok(feed_message) @@ -288,7 +339,9 @@ mod test { assert_eq!( FeedMessage::from_bytes(msg.as_bytes()).unwrap(), - vec![FeedMessage::RemovedChain { name: "".to_owned() }] + vec![FeedMessage::RemovedChain { + name: "".to_owned() + }] ); } @@ -300,10 +353,14 @@ mod test { assert_eq!( FeedMessage::from_bytes(msg.as_bytes()).unwrap(), vec![ - FeedMessage::RemovedChain { name: "".to_owned() }, - FeedMessage::AddedChain { name: "Local Testnet".to_owned(), node_count: 1 }, + FeedMessage::RemovedChain { + name: "".to_owned() + }, + FeedMessage::AddedChain { + name: "Local Testnet".to_owned(), + node_count: 1 + }, ] ); } - -} \ No newline at end of file +} diff --git a/backend/test_utils/src/server/cargo_run_commands.rs b/backend/test_utils/src/server/cargo_run_commands.rs index 08b8b2a..c0470a1 100644 --- a/backend/test_utils/src/server/cargo_run_commands.rs +++ b/backend/test_utils/src/server/cargo_run_commands.rs @@ -33,4 +33,4 @@ fn try_find_workspace_dir() -> Result { let mut dir = std::env::current_dir()?; while !dir.ends_with("backend") && dir.pop() {} Ok(dir) -} \ No newline at end of file +} diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index 5404390..7070553 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -1,45 +1,65 @@ use std::time::Duration; +use crate::feed_message_de::FeedMessage; use crate::ws_client; use futures::{Sink, SinkExt, Stream, StreamExt}; -use crate::feed_message_de::FeedMessage; /// Wrap a `ws_client::Sender` with convenient utility methods for shard connections pub struct ShardSender(ws_client::Sender); impl From for ShardSender { - fn from(c: ws_client::Sender) -> Self { ShardSender(c) } + fn from(c: ws_client::Sender) -> Self { + ShardSender(c) + } } impl ShardSender { /// Close this connection - pub async fn close(&mut self) -> Result<(),ws_client::SendError> { + pub async fn close(&mut self) -> Result<(), ws_client::SendError> { self.0.close().await } } impl Sink for ShardSender { type Error = ws_client::SendError; - fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_ready_unpin(cx) } - fn start_send(mut self: std::pin::Pin<&mut Self>, item: ws_client::Message) -> Result<(), Self::Error> { + fn start_send( + mut self: std::pin::Pin<&mut Self>, + item: ws_client::Message, + ) -> Result<(), Self::Error> { self.0.start_send_unpin(item) } - fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_flush_unpin(cx) } - fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_close_unpin(cx) } } impl ShardSender { - pub async fn send_json_binary(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { + pub async fn send_json_binary( + &mut self, + json: serde_json::Value, + ) -> Result<(), ws_client::SendError> { let bytes = serde_json::to_vec(&json).expect("valid bytes"); self.send(ws_client::Message::Binary(bytes)).await } - pub async fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { + pub async fn send_json_text( + &mut self, + json: serde_json::Value, + ) -> Result<(), ws_client::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.send(ws_client::Message::Text(s)).await } @@ -49,43 +69,70 @@ impl ShardSender { pub struct ShardReceiver(ws_client::Receiver); impl From for ShardReceiver { - fn from(c: ws_client::Receiver) -> Self { ShardReceiver(c) } + fn from(c: ws_client::Receiver) -> Self { + ShardReceiver(c) + } } impl Stream for ShardReceiver { type Item = Result; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_next_unpin(cx) } } - /// Wrap a `ws_client::Sender` with convenient utility methods for feed connections pub struct FeedSender(ws_client::Sender); impl From for FeedSender { - fn from(c: ws_client::Sender) -> Self { FeedSender(c) } + fn from(c: ws_client::Sender) -> Self { + FeedSender(c) + } } impl Sink for FeedSender { type Error = ws_client::SendError; - fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_ready_unpin(cx) } - fn start_send(mut self: std::pin::Pin<&mut Self>, item: ws_client::Message) -> Result<(), Self::Error> { + fn start_send( + mut self: std::pin::Pin<&mut Self>, + item: ws_client::Message, + ) -> Result<(), Self::Error> { self.0.start_send_unpin(item) } - fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_flush_unpin(cx) } - fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_close_unpin(cx) } } impl FeedSender { - pub async fn send_command>(&mut self, command: S, param: S) -> Result<(), ws_client::SendError> { - self.send(ws_client::Message::Text(format!("{}:{}", command.as_ref(), param.as_ref()))).await + pub async fn send_command>( + &mut self, + command: S, + param: S, + ) -> Result<(), ws_client::SendError> { + self.send(ws_client::Message::Text(format!( + "{}:{}", + command.as_ref(), + param.as_ref() + ))) + .await } } @@ -93,12 +140,17 @@ impl FeedSender { pub struct FeedReceiver(ws_client::Receiver); impl From for FeedReceiver { - fn from(c: ws_client::Receiver) -> Self { FeedReceiver(c) } + fn from(c: ws_client::Receiver) -> Self { + FeedReceiver(c) + } } impl Stream for FeedReceiver { type Item = Result; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.0.poll_next_unpin(cx).map_err(|e| e.into()) } } @@ -111,7 +163,8 @@ impl FeedReceiver { /// robust in assuming that messages may not all be delivered at once (unless we are /// specifically testing which messages are buffered together). pub async fn recv_feed_messages_once(&mut self) -> Result, anyhow::Error> { - let msg = self.0 + let msg = self + .0 .next() .await .ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))??; @@ -120,7 +173,7 @@ impl FeedReceiver { ws_client::Message::Binary(data) => { let messages = FeedMessage::from_bytes(&data)?; Ok(messages) - }, + } ws_client::Message::Text(text) => { let messages = FeedMessage::from_bytes(text.as_bytes())?; Ok(messages) @@ -135,20 +188,20 @@ impl FeedReceiver { let mut feed_messages = self.recv_feed_messages_once().await?; // Then, loop a little to make sure we catch any additional messages that are sent soon after: loop { - match tokio::time::timeout(Duration::from_millis(250), self.recv_feed_messages_once()).await { + match tokio::time::timeout(Duration::from_millis(250), self.recv_feed_messages_once()) + .await + { // Timeout elapsed; return the messages we have so far Err(_) => { break Ok(feed_messages); - }, + } // Append messages that come back to our vec Ok(Ok(mut msgs)) => { feed_messages.append(&mut msgs); - }, - // Error came back receiving messages; return it - Ok(Err(e)) => { - break Err(e) } + // Error came back receiving messages; return it + Ok(Err(e)) => break Err(e), } } } -} \ No newline at end of file +} diff --git a/backend/test_utils/src/server/mod.rs b/backend/test_utils/src/server/mod.rs index 1b6c8dc..1bff5aa 100644 --- a/backend/test_utils/src/server/mod.rs +++ b/backend/test_utils/src/server/mod.rs @@ -1,6 +1,6 @@ -mod utils; mod server; +mod utils; pub mod cargo_run_commands; pub mod channels; -pub use server::*; \ No newline at end of file +pub use server::*; diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index fe1e4aa..f758d37 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -1,9 +1,9 @@ +use super::{channels, utils}; +use crate::ws_client; +use common::{id_type, DenseMap}; use std::ffi::OsString; use std::marker::PhantomData; -use crate::ws_client; -use tokio::process::{ self, Command as TokioCommand }; -use super::{ channels, utils }; -use common::{ id_type, DenseMap }; +use tokio::process::{self, Command as TokioCommand}; id_type! { /// The ID of a running process. Cannot be constructed externally. @@ -16,7 +16,7 @@ pub struct StartOpts { pub shard_command: Command, /// Command to run to start a telemetry core process. /// The `--listen` and `--log` arguments will be appended within and shouldn't be provided. - pub core_command: Command + pub core_command: Command, } pub struct ConnectToExistingOpts { @@ -38,7 +38,9 @@ pub enum Error { ErrorObtainingPort(anyhow::Error), #[error("Whoops; attempt to kill a process we didn't start (and so have no handle to)")] CannotKillNoHandle, - #[error("Whoops; attempt to add a shard to a server we didn't start (and so have no handle to)")] + #[error( + "Whoops; attempt to add a shard to a server we didn't start (and so have no handle to)" + )] CannotAddShardNoHandle, } @@ -68,20 +70,21 @@ impl Server { } pub fn iter_shards(&self) -> impl Iterator { - self.shards.iter().map(|(_,v)| v) + self.shards.iter().map(|(_, v)| v) } pub async fn kill_shard(&mut self, id: ProcessId) -> bool { let shard = match self.shards.remove(id) { Some(shard) => shard, - None => return false + None => return false, }; // With this, killing will complete even if the promise returned is cancelled // (it should regardless, but just to play it safe..) let _ = tokio::spawn(async move { let _ = shard.kill().await; - }).await; + }) + .await; true } @@ -91,14 +94,9 @@ impl Server { // Spawn so we don't need to await cleanup if we don't care. // Run all kill futs simultaneously. let handle = tokio::spawn(async move { - let shard_kill_futs = self.shards - .into_iter() - .map(|(_,s)| s.kill()); + let shard_kill_futs = self.shards.into_iter().map(|(_, s)| s.kill()); - let _ = tokio::join!( - futures::future::join_all(shard_kill_futs), - self.core.kill() - ); + let _ = tokio::join!(futures::future::join_all(shard_kill_futs), self.core.kill()); }); // You can wait for cleanup but aren't obliged to: @@ -109,10 +107,11 @@ impl Server { pub async fn add_shard(&mut self) -> Result { let core_uri = match &self.core_shard_submit_uri { Some(uri) => uri, - None => return Err(Error::CannotAddShardNoHandle) + None => return Err(Error::CannotAddShardNoHandle), }; - let mut shard_cmd: TokioCommand = self.shard_command + let mut shard_cmd: TokioCommand = self + .shard_command .clone() .ok_or_else(|| Error::CannotAddShardNoHandle)? .into(); @@ -141,8 +140,9 @@ impl Server { let _ = utils::wait_for_line_containing( &mut child_stdout, "Connected to telemetry core", - std::time::Duration::from_secs(5) - ).await; + std::time::Duration::from_secs(5), + ) + .await; // Since we're piping stdout from the child process, we need somewhere for it to go // else the process will get stuck when it tries to produce output: @@ -156,7 +156,7 @@ impl Server { id, handle: Some(shard_process), uri: shard_uri, - _channel_type: PhantomData + _channel_type: PhantomData, }); Ok(pid) @@ -164,7 +164,6 @@ impl Server { /// Start a telemetry_core process. From here, we can add/remove shards as needed. pub async fn start(opts: StartOpts) -> Result { - let mut core_cmd: TokioCommand = opts.core_command.into(); let mut child = core_cmd @@ -194,16 +193,18 @@ impl Server { Ok(Server { shard_command: Some(opts.shard_command), - core_shard_submit_uri: Some(format!("http://127.0.0.1:{}/shard_submit", core_port) - .parse() - .expect("valid shard_submit URI")), + core_shard_submit_uri: Some( + format!("http://127.0.0.1:{}/shard_submit", core_port) + .parse() + .expect("valid shard_submit URI"), + ), shards: DenseMap::new(), core: Process { id: ProcessId(0), handle: Some(child), uri: feed_uri, _channel_type: PhantomData, - } + }, }) } @@ -229,12 +230,11 @@ impl Server { uri: opts.feed_uri, handle: None, _channel_type: PhantomData, - } + }, } } } - /// This represents a running process that we can connect to, which /// may be either a `telemetry_shard` or `telemetry_core`. pub struct Process { @@ -245,7 +245,7 @@ pub struct Process { /// The URI that we can use to connect to the process socket. uri: http::Uri, /// The kind of the process (lets us add methods specific to shard/core). - _channel_type: PhantomData + _channel_type: PhantomData, } /// A shard process with shard-specific methods. @@ -254,7 +254,7 @@ pub type ShardProcess = Process<(channels::ShardSender, channels::ShardReceiver) /// A core process with core-specific methods. pub type CoreProcess = Process<(channels::FeedSender, channels::FeedReceiver)>; -impl Process { +impl Process { /// Get the ID of this process pub fn id(&self) -> ProcessId { self.id @@ -265,25 +265,28 @@ impl Process { async fn kill(self) -> Result<(), Error> { match self.handle { Some(mut handle) => Ok(handle.kill().await?), - None => Err(Error::CannotKillNoHandle) + None => Err(Error::CannotKillNoHandle), } } } -impl , Recv: From> Process<(Send, Recv)> { +impl, Recv: From> Process<(Send, Recv)> { /// Establish a connection to the process pub async fn connect(&self) -> Result<(Send, Recv), Error> { ws_client::connect(&self.uri) .await - .map(|(s,r)| (s.into(), r.into())) + .map(|(s, r)| (s.into(), r.into())) .map_err(|e| e.into()) } /// Establish multiple connections to the process - pub async fn connect_multiple(&self, num_connections: usize) -> Result, Error> { + pub async fn connect_multiple( + &self, + num_connections: usize, + ) -> Result, Error> { utils::connect_multiple_to_uri(&self.uri, num_connections) .await - .map(|v| v.into_iter().map(|(s,r)| (s.into(), r.into())).collect()) + .map(|v| v.into_iter().map(|(s, r)| (s.into(), r.into())).collect()) .map_err(|e| e.into()) } } @@ -294,14 +297,14 @@ impl , Recv: From> Process<(S #[derive(Clone, Debug)] pub struct Command { command: OsString, - args: Vec + args: Vec, } impl Command { pub fn new>(command: S) -> Command { Command { command: command.into(), - args: Vec::new() + args: Vec::new(), } } @@ -317,4 +320,4 @@ impl Into for Command { cmd.args(self.args); cmd } -} \ No newline at end of file +} diff --git a/backend/test_utils/src/server/utils.rs b/backend/test_utils/src/server/utils.rs index 38e6cbe..3946159 100644 --- a/backend/test_utils/src/server/utils.rs +++ b/backend/test_utils/src/server/utils.rs @@ -1,8 +1,8 @@ use crate::ws_client; +use anyhow::{anyhow, Context}; use tokio::io::BufReader; -use tokio::io::{ AsyncRead, AsyncWrite, AsyncBufReadExt }; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite}; use tokio::time::Duration; -use anyhow::{ anyhow, Context }; /// Reads from the stdout of the shard/core process to extract the port that was assigned to it, /// with the side benefit that we'll wait for it to start listening before returning. We do this @@ -23,25 +23,35 @@ pub async fn get_port(reader: R) -> Result(reader: R, text: &str, max_wait_between_lines: Duration) -> Result { +pub async fn wait_for_line_containing( + reader: R, + text: &str, + max_wait_between_lines: Duration, +) -> Result { let reader = BufReader::new(reader); let mut reader_lines = reader.lines(); loop { - let line = tokio::time::timeout( - max_wait_between_lines, - reader_lines.next_line() - ).await; + let line = tokio::time::timeout(max_wait_between_lines, reader_lines.next_line()).await; let line = match line { // timeout expired; couldn't get port: - Err(_) => return Err(anyhow!("Timeout expired waiting for output containing: {}", text)), + Err(_) => { + return Err(anyhow!( + "Timeout expired waiting for output containing: {}", + text + )) + } // Something went wrong reading line; bail: Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)), // No more output; process ended? bail: - Ok(Ok(None)) => return Err(anyhow!("No more output from stdout; has the process ended?")), + Ok(Ok(None)) => { + return Err(anyhow!( + "No more output from stdout; has the process ended?" + )) + } // All OK, and a line is given back; phew! - Ok(Ok(Some(line))) => line + Ok(Ok(Some(line))) => line, }; if line.contains(text) { @@ -51,10 +61,12 @@ pub async fn wait_for_line_containing(reader: R, text: &st } /// Establish multiple connections to a URI and return them all. -pub async fn connect_multiple_to_uri(uri: &http::Uri, num_connections: usize) -> Result, ws_client::ConnectError> { - let connect_futs = (0..num_connections) - .map(|_| ws_client::connect(uri)); - let sockets: Result,_> = futures::future::join_all(connect_futs) +pub async fn connect_multiple_to_uri( + uri: &http::Uri, + num_connections: usize, +) -> Result, ws_client::ConnectError> { + let connect_futs = (0..num_connections).map(|_| ws_client::connect(uri)); + let sockets: Result, _> = futures::future::join_all(connect_futs) .await .into_iter() .collect(); @@ -66,9 +78,9 @@ pub async fn connect_multiple_to_uri(uri: &http::Uri, num_connections: usize) -> pub fn drain(mut reader: R, mut writer: W) where R: AsyncRead + Unpin + Send + 'static, - W: AsyncWrite + Unpin + Send + 'static + W: AsyncWrite + Unpin + Send + 'static, { tokio::spawn(async move { let _ = tokio::io::copy(&mut reader, &mut writer).await; }); -} \ No newline at end of file +} diff --git a/backend/test_utils/src/ws_client.rs b/backend/test_utils/src/ws_client.rs index d674977..9da8fd2 100644 --- a/backend/test_utils/src/ws_client.rs +++ b/backend/test_utils/src/ws_client.rs @@ -1,17 +1,17 @@ -use futures::channel::{ mpsc }; -use soketto::handshake::{Client, ServerResponse}; -use tokio_util::compat::{ TokioAsyncReadCompatExt }; -use tokio::net::TcpStream; +use futures::channel::mpsc; use futures::{Sink, SinkExt, Stream, StreamExt}; +use soketto::handshake::{Client, ServerResponse}; +use tokio::net::TcpStream; +use tokio_util::compat::TokioAsyncReadCompatExt; /// Send messages into the connection #[derive(Clone)] pub struct Sender { - inner: mpsc::UnboundedSender + inner: mpsc::UnboundedSender, } impl Sender { - pub async fn close(&mut self) -> Result<(),SendError> { + pub async fn close(&mut self) -> Result<(), SendError> { self.inner.send(SentMessage::Close).await?; Ok(()) } @@ -20,28 +20,39 @@ impl Sender { #[derive(thiserror::Error, Debug, Clone)] pub enum SendError { #[error("Failed to send message: {0}")] - ChannelError(#[from] mpsc::SendError) + ChannelError(#[from] mpsc::SendError), } impl Sink for Sender { type Error = SendError; - fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.inner.poll_ready_unpin(cx).map_err(|e| e.into()) } fn start_send(mut self: std::pin::Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { - self.inner.start_send_unpin(SentMessage::Message(item)).map_err(|e| e.into()) + self.inner + .start_send_unpin(SentMessage::Message(item)) + .map_err(|e| e.into()) } - fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.inner.poll_flush_unpin(cx).map_err(|e| e.into()) } - fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.inner.poll_close_unpin(cx).map_err(|e| e.into()) } } /// Receive messages out of a connection pub struct Receiver { - inner: mpsc::UnboundedReceiver> + inner: mpsc::UnboundedReceiver>, } #[derive(thiserror::Error, Debug)] @@ -49,12 +60,15 @@ pub enum RecvError { #[error("Text message contains invalid UTF8: {0}")] InvalidUtf8(#[from] std::string::FromUtf8Error), #[error("Stream finished")] - StreamFinished + StreamFinished, } impl Stream for Receiver { type Item = Result; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.inner.poll_next_unpin(cx).map_err(|e| e.into()) } } @@ -62,13 +76,13 @@ impl Stream for Receiver { /// A message type that can be sent or received from the connection pub enum Message { Text(String), - Binary(Vec) + Binary(Vec), } /// Sent messages can be anything publically visible, or a close message. enum SentMessage { Message(Message), - Close + Close, } #[derive(thiserror::Error, Debug)] @@ -98,12 +112,10 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError // Establish a WS connection: let mut client = Client::new(socket.compat(), host, &path); let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? { - ServerResponse::Accepted { .. } => { - client.into_builder().finish() - }, + ServerResponse::Accepted { .. } => client.into_builder().finish(), ServerResponse::Redirect { status_code, .. } => { return Err(ConnectError::ConnectionFailedRedirect { status_code }) - }, + } ServerResponse::Rejected { status_code } => { return Err(ConnectError::ConnectionFailedRejected { status_code }) } @@ -124,23 +136,20 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError Err(e) => { // Couldn't receive data may mean all senders are gone, so log // the error and shut this down: - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to receive data: {}", + e + ); break; - }, - Ok(data) => { - data } + Ok(data) => data, }; let msg = match message_data { - soketto::Data::Text(_) => { - Ok(Message::Binary(data)) - }, - soketto::Data::Binary(_) => { - String::from_utf8(data) - .map(|s| Message::Text(s)) - .map_err(|e| e.into()) - }, + soketto::Data::Text(_) => Ok(Message::Binary(data)), + soketto::Data::Binary(_) => String::from_utf8(data) + .map(|s| Message::Text(s)) + .map_err(|e| e.into()), }; data = Vec::with_capacity(128); @@ -148,7 +157,10 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError if let Err(e) = tx_to_external.send(msg).await { // Failure to send likely means that the recv has been dropped, // so let's drop this loop too. - log::error!("Shutting down websocket connection: Failed to send data out: {}", e); + log::error!( + "Shutting down websocket connection: Failed to send data out: {}", + e + ); break; } } @@ -161,16 +173,22 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError match msg { SentMessage::Message(Message::Text(s)) => { if let Err(e) = ws_to_connection.send_text_owned(s).await { - log::error!("Shutting down websocket connection: Failed to send text data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to send text data: {}", + e + ); break; } - }, + } SentMessage::Message(Message::Binary(bytes)) => { if let Err(e) = ws_to_connection.send_binary_mut(bytes).await { - log::error!("Shutting down websocket connection: Failed to send binary data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to send binary data: {}", + e + ); break; } - }, + } SentMessage::Close => { if let Err(e) = ws_to_connection.close().await { log::error!("Error attempting to close connection: {}", e); @@ -180,14 +198,14 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError } if let Err(e) = ws_to_connection.flush().await { - log::error!("Shutting down websocket connection: Failed to flush data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to flush data: {}", + e + ); break; } } }); - Ok(( - Sender { inner: tx_to_ws }, - Receiver { inner: rx_from_ws } - )) + Ok((Sender { inner: tx_to_ws }, Receiver { inner: rx_from_ws })) }