diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index c5f3d23..941dfeb 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -14,26 +14,27 @@ id_type! { pub struct ShardNodeId(usize); } -/// Message sent from the shard to the backend core +/// Message sent from a telemetry shard to the telemetry core #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromShardAggregator { - /// Get information about a new node, passing IPv4 + /// Get information about a new node, including it's IP + /// address and chain genesis hash. AddNode { ip: Option, node: NodeDetails, local_id: ShardNodeId, genesis_hash: BlockHash, }, - /// Send a message payload to update details for a node + /// A message payload with updated details for a node UpdateNode { local_id: ShardNodeId, payload: Payload, }, - /// Inform the core that a node has been removed + /// Inform the telemetry core that a node has been removed RemoveNode { local_id: ShardNodeId }, } -/// Message sent form the backend core to the shard +/// Message sent form the telemetry core to a telemetry shard #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromTelemetryCore { Mute { diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 56f4b15..7afe85a 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -1,4 +1,4 @@ -use crate::connection::{create_ws_connection, Message}; +use crate::connection::{create_ws_connection_to_core, Message}; use common::{ internal_messages::{self, ShardNodeId}, node_message, @@ -7,7 +7,7 @@ use common::{ }; use futures::{channel::mpsc, future}; use futures::{Sink, SinkExt, StreamExt}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -23,9 +23,13 @@ type ConnId = u64; /// [`FromWebsocket`] instances. #[derive(Clone, Debug)] enum ToAggregator { + /// Sent when the telemetry core is disconnected. DisconnectedFromTelemetryCore, + /// Sent when the telemetry core (re)connects. ConnectedToTelemetryCore, + /// Sent when a message comes in from a substrate node. FromWebsocket(ConnId, FromWebsocket), + /// Send when a message comes in from the telemetry core. FromTelemetryCore(internal_messages::FromTelemetryCore), } @@ -60,6 +64,8 @@ pub enum FromWebsocket { pub type FromAggregator = internal_messages::FromShardAggregator; +/// The aggregator loop handles incoming messages from nodes, or from the telemetry core. +/// this is where we decide what effect messages will have. #[derive(Clone)] pub struct Aggregator(Arc); @@ -90,7 +96,7 @@ impl Aggregator { }); // Establish a resiliant connection to the core (this retries as needed): - let tx_to_telemetry_core = create_ws_connection(tx_from_connection, telemetry_uri).await; + let tx_to_telemetry_core = create_ws_connection_to_core(tx_from_connection, telemetry_uri).await; // Handle any incoming messages in our handler loop: tokio::spawn(Aggregator::handle_messages( @@ -106,7 +112,7 @@ impl Aggregator { } // This is spawned into a separate task and handles any messages coming - // in to the aggregator. If nobody is tolding the tx side of the channel + // in to the aggregator. If nobody is holding the tx side of the channel // any more, this task will gracefully end. async fn handle_messages( mut rx_from_external: mpsc::Receiver, @@ -118,9 +124,9 @@ impl Aggregator { // or not, and ignore incoming messages while we aren't. let mut connected_to_telemetry_core = false; - // A list of close channels for the current connections. Send an empty tuple to - // these to ask the connections to be closed. - let mut close_connections: Vec> = vec![]; + // A list of close channels for the currently connected substrate nodes. Send an empty + // tuple to these to ask the connections to be closed. + let mut close_connections: HashMap> = HashMap::new(); // Maintain mappings from the connection ID and node message ID to the "local ID" which we // broadcast to the telemetry core. @@ -136,13 +142,13 @@ impl Aggregator { // Take hold of the connection closers and run them all. let closers = close_connections; - for mut closer in closers { + for (_, mut closer) in closers { // if this fails, it probably means the connection has died already anyway. let _ = closer.send(()); } // We've told everything to disconnect. Now, reset our state: - close_connections = vec![]; + close_connections = HashMap::new(); to_local_id.clear(); muted.clear(); @@ -154,13 +160,13 @@ impl Aggregator { log::info!("Disconnected from telemetry core"); } ToAggregator::FromWebsocket( - _conn_id, + conn_id, FromWebsocket::Initialize { close_connection }, ) => { // We boot all connections on a reconnect-to-core to force new systemconnected // messages to be sent. We could boot on muting, but need to be careful not to boot // connections where we mute one set of messages it sends and not others. - close_connections.push(close_connection); + close_connections.insert(conn_id, close_connection); } ToAggregator::FromWebsocket( conn_id, @@ -228,8 +234,11 @@ impl Aggregator { .map(|(local_id, _)| local_id) .collect(); + close_connections.remove(&disconnected_conn_id); + for local_id in local_ids_disconnected { to_local_id.remove_by_id(local_id); + muted.remove(&local_id); let _ = tx_to_telemetry_core .send(FromShardAggregator::RemoveNode { local_id }) .await; @@ -239,11 +248,6 @@ impl Aggregator { local_id, reason: _, }) => { - // Ignore incoming messages if we're not connected to the backend: - if !connected_to_telemetry_core { - continue; - } - // Mute the local ID we've been told to: muted.insert(local_id); } diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs index 918e102..62e7aa6 100644 --- a/backend/telemetry_shard/src/connection.rs +++ b/backend/telemetry_shard/src/connection.rs @@ -10,11 +10,15 @@ pub enum Message { Data(Out), } -/// Connect to a websocket server, retrying the connection if we're disconnected. -/// - Sends messages when disconnected, reconnected or data received from the connection. +/// Connect to the telemetry core, retrying the connection if we're disconnected. +/// - Sends `Message::Connected` and `Message::Disconnected` when the connection goes up/down. /// - Returns a channel that allows you to send messages to the connection. -/// - Messages all encoded/decoded from bincode. -pub async fn create_ws_connection( +/// - Messages are all encoded/decoded to/from bincode, and so need to support being (de)serialized from +/// a non self-describing encoding. +/// +/// Note: have a look at [`common::internal_messages`] to see the different message types exchanged +/// between aggregator and core. +pub async fn create_ws_connection_to_core( mut tx_to_external: S, telemetry_uri: http::Uri, ) -> mpsc::Sender @@ -33,7 +37,8 @@ where loop { // Throw away any pending messages from the incoming channel so that it - // doesn't get blocked up while we're looping and waiting for a reconnection. + // doesn't get filled up and begin blocking while we're looping and waiting + // for a reconnection. while let Ok(Some(_)) = rx_from_external_proxy.try_next() {} // The connection will pass messages back to this. @@ -54,7 +59,7 @@ where while let Some(msg) = rx_from_external_proxy.next().await { if let Err(e) = tx_to_connection.send(msg).await { // Issue forwarding a message to the telemetry core? - // Give up and try to reconnect on the next loop iteration. + // Give up and try to reconnect on the next outer loop iteration. log::error!( "Error sending message to websocker server (will reconnect): {}", e @@ -88,7 +93,7 @@ where } /// This spawns a connection to a websocket server, serializing/deserialziing -/// from bincode as messages are sent or received. +/// to/from bincode as messages are sent or received. async fn create_ws_connection_no_retry( mut tx_to_external: S, telemetry_uri: http::Uri, @@ -107,9 +112,9 @@ where let path = telemetry_uri.path(); let socket = TcpStream::connect((host, port)).await?; - socket.set_nodelay(true).unwrap(); + socket.set_nodelay(true).expect("socket set_nodelay failed"); - // Open a websocket connection with the relemetry core: + // Open a websocket connection with the telemetry core: let mut client = Client::new(socket.compat(), host, &path); let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? { ServerResponse::Accepted { .. } => client.into_builder().finish(), @@ -124,12 +129,10 @@ where }; // This task reads data sent from the telemetry core and - // forwards it on to our aggregator loop: + // forwards it to our aggregator loop: tokio::spawn(async move { - let mut data = Vec::with_capacity(128); loop { - // Clear the buffer and wait for the next message to arrive: - data.clear(); + let mut data = Vec::new(); if let Err(e) = ws_from_connection.receive_data(&mut data).await { // Couldn't receive data may mean all senders are gone, so log // the error and shut this down: diff --git a/backend/telemetry_shard/src/json_message/hash.rs b/backend/telemetry_shard/src/json_message/hash.rs index 23b583f..4be81af 100644 --- a/backend/telemetry_shard/src/json_message/hash.rs +++ b/backend/telemetry_shard/src/json_message/hash.rs @@ -1,8 +1,13 @@ +//! A hash wrapper which can be deserialized from a hex string as well as from an array of bytes, +//! so that it can deal with the sort of inputs we expect from substrate nodes. + use serde::de::{self, Deserialize, Deserializer, SeqAccess, Unexpected, Visitor}; use serde::ser::{Serialize, Serializer}; use std::fmt::{self, Debug, Display}; use std::str::FromStr; +/// We assume that hashes are 32 bytes long, and in practise that's currently true, +/// but in theory it doesn't need to be. We may need to be more dynamic here. const HASH_BYTES: usize = 32; /// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`. @@ -128,7 +133,7 @@ impl Display for Hash { hex::encode_to_slice(self.0, &mut ascii) .expect("Encoding 32 bytes into 64 bytes of ascii; qed"); - f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed")) + f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes can't fail; qed")) } } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 4737166..195eaf8 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -147,14 +147,19 @@ where if !msg.is_binary() && !msg.is_text() { continue; } - // Deserialize from JSON, warning if deserialization fails: + // Deserialize from JSON, warning in debug mode if deserialization fails: let bytes = msg.as_bytes(); let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) { Ok(node_message) => node_message, - Err(_e) => { - // let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); - // let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); - // log::warn!("Failed to parse node message ({}): {}", msg_start, e); + #[cfg(debug)] + Err(e) => { + let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); + let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); + log::warn!("Failed to parse node message ({}): {}", msg_start, e); + continue; + }, + #[cfg(not(debug))] + Err(_) => { continue; } }; diff --git a/backend/test_utils/src/ws_client.rs b/backend/test_utils/src/ws_client.rs index ea40bf2..d674977 100644 --- a/backend/test_utils/src/ws_client.rs +++ b/backend/test_utils/src/ws_client.rs @@ -93,7 +93,7 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError let path = uri.path(); let socket = TcpStream::connect((host, port)).await?; - socket.set_nodelay(true).unwrap(); + socket.set_nodelay(true).expect("socket set_nodelay failed"); // Establish a WS connection: let mut client = Client::new(socket.compat(), host, &path);