Various bits and pieces to address PR comments

This commit is contained in:
James Wilson
2021-07-13 16:51:14 +01:00
parent faedba87d4
commit 9ac5ea7624
6 changed files with 59 additions and 41 deletions
+6 -5
View File
@@ -14,26 +14,27 @@ id_type! {
pub struct ShardNodeId(usize);
}
/// Message sent from the shard to the backend core
/// Message sent from a telemetry shard to the telemetry core
#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum FromShardAggregator {
/// Get information about a new node, passing IPv4
/// Get information about a new node, including it's IP
/// address and chain genesis hash.
AddNode {
ip: Option<IpAddr>,
node: NodeDetails,
local_id: ShardNodeId,
genesis_hash: BlockHash,
},
/// Send a message payload to update details for a node
/// A message payload with updated details for a node
UpdateNode {
local_id: ShardNodeId,
payload: Payload,
},
/// Inform the core that a node has been removed
/// Inform the telemetry core that a node has been removed
RemoveNode { local_id: ShardNodeId },
}
/// Message sent form the backend core to the shard
/// Message sent form the telemetry core to a telemetry shard
#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum FromTelemetryCore {
Mute {
+20 -16
View File
@@ -1,4 +1,4 @@
use crate::connection::{create_ws_connection, Message};
use crate::connection::{create_ws_connection_to_core, Message};
use common::{
internal_messages::{self, ShardNodeId},
node_message,
@@ -7,7 +7,7 @@ use common::{
};
use futures::{channel::mpsc, future};
use futures::{Sink, SinkExt, StreamExt};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
@@ -23,9 +23,13 @@ type ConnId = u64;
/// [`FromWebsocket`] instances.
#[derive(Clone, Debug)]
enum ToAggregator {
/// Sent when the telemetry core is disconnected.
DisconnectedFromTelemetryCore,
/// Sent when the telemetry core (re)connects.
ConnectedToTelemetryCore,
/// Sent when a message comes in from a substrate node.
FromWebsocket(ConnId, FromWebsocket),
/// Send when a message comes in from the telemetry core.
FromTelemetryCore(internal_messages::FromTelemetryCore),
}
@@ -60,6 +64,8 @@ pub enum FromWebsocket {
pub type FromAggregator = internal_messages::FromShardAggregator;
/// The aggregator loop handles incoming messages from nodes, or from the telemetry core.
/// this is where we decide what effect messages will have.
#[derive(Clone)]
pub struct Aggregator(Arc<AggregatorInternal>);
@@ -90,7 +96,7 @@ impl Aggregator {
});
// Establish a resiliant connection to the core (this retries as needed):
let tx_to_telemetry_core = create_ws_connection(tx_from_connection, telemetry_uri).await;
let tx_to_telemetry_core = create_ws_connection_to_core(tx_from_connection, telemetry_uri).await;
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(
@@ -106,7 +112,7 @@ impl Aggregator {
}
// This is spawned into a separate task and handles any messages coming
// in to the aggregator. If nobody is tolding the tx side of the channel
// in to the aggregator. If nobody is holding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
mut rx_from_external: mpsc::Receiver<ToAggregator>,
@@ -118,9 +124,9 @@ impl Aggregator {
// or not, and ignore incoming messages while we aren't.
let mut connected_to_telemetry_core = false;
// A list of close channels for the current connections. Send an empty tuple to
// these to ask the connections to be closed.
let mut close_connections: Vec<mpsc::Sender<()>> = vec![];
// A list of close channels for the currently connected substrate nodes. Send an empty
// tuple to these to ask the connections to be closed.
let mut close_connections: HashMap<ConnId, mpsc::Sender<()>> = HashMap::new();
// Maintain mappings from the connection ID and node message ID to the "local ID" which we
// broadcast to the telemetry core.
@@ -136,13 +142,13 @@ impl Aggregator {
// Take hold of the connection closers and run them all.
let closers = close_connections;
for mut closer in closers {
for (_, mut closer) in closers {
// if this fails, it probably means the connection has died already anyway.
let _ = closer.send(());
}
// We've told everything to disconnect. Now, reset our state:
close_connections = vec![];
close_connections = HashMap::new();
to_local_id.clear();
muted.clear();
@@ -154,13 +160,13 @@ impl Aggregator {
log::info!("Disconnected from telemetry core");
}
ToAggregator::FromWebsocket(
_conn_id,
conn_id,
FromWebsocket::Initialize { close_connection },
) => {
// We boot all connections on a reconnect-to-core to force new systemconnected
// messages to be sent. We could boot on muting, but need to be careful not to boot
// connections where we mute one set of messages it sends and not others.
close_connections.push(close_connection);
close_connections.insert(conn_id, close_connection);
}
ToAggregator::FromWebsocket(
conn_id,
@@ -228,8 +234,11 @@ impl Aggregator {
.map(|(local_id, _)| local_id)
.collect();
close_connections.remove(&disconnected_conn_id);
for local_id in local_ids_disconnected {
to_local_id.remove_by_id(local_id);
muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send(FromShardAggregator::RemoveNode { local_id })
.await;
@@ -239,11 +248,6 @@ impl Aggregator {
local_id,
reason: _,
}) => {
// Ignore incoming messages if we're not connected to the backend:
if !connected_to_telemetry_core {
continue;
}
// Mute the local ID we've been told to:
muted.insert(local_id);
}
+16 -13
View File
@@ -10,11 +10,15 @@ pub enum Message<Out> {
Data(Out),
}
/// Connect to a websocket server, retrying the connection if we're disconnected.
/// - Sends messages when disconnected, reconnected or data received from the connection.
/// Connect to the telemetry core, retrying the connection if we're disconnected.
/// - Sends `Message::Connected` and `Message::Disconnected` when the connection goes up/down.
/// - Returns a channel that allows you to send messages to the connection.
/// - Messages all encoded/decoded from bincode.
pub async fn create_ws_connection<In, Out, S, E>(
/// - Messages are all encoded/decoded to/from bincode, and so need to support being (de)serialized from
/// a non self-describing encoding.
///
/// Note: have a look at [`common::internal_messages`] to see the different message types exchanged
/// between aggregator and core.
pub async fn create_ws_connection_to_core<In, Out, S, E>(
mut tx_to_external: S,
telemetry_uri: http::Uri,
) -> mpsc::Sender<In>
@@ -33,7 +37,8 @@ where
loop {
// Throw away any pending messages from the incoming channel so that it
// doesn't get blocked up while we're looping and waiting for a reconnection.
// doesn't get filled up and begin blocking while we're looping and waiting
// for a reconnection.
while let Ok(Some(_)) = rx_from_external_proxy.try_next() {}
// The connection will pass messages back to this.
@@ -54,7 +59,7 @@ where
while let Some(msg) = rx_from_external_proxy.next().await {
if let Err(e) = tx_to_connection.send(msg).await {
// Issue forwarding a message to the telemetry core?
// Give up and try to reconnect on the next loop iteration.
// Give up and try to reconnect on the next outer loop iteration.
log::error!(
"Error sending message to websocker server (will reconnect): {}",
e
@@ -88,7 +93,7 @@ where
}
/// This spawns a connection to a websocket server, serializing/deserialziing
/// from bincode as messages are sent or received.
/// to/from bincode as messages are sent or received.
async fn create_ws_connection_no_retry<In, Out, S, E>(
mut tx_to_external: S,
telemetry_uri: http::Uri,
@@ -107,9 +112,9 @@ where
let path = telemetry_uri.path();
let socket = TcpStream::connect((host, port)).await?;
socket.set_nodelay(true).unwrap();
socket.set_nodelay(true).expect("socket set_nodelay failed");
// Open a websocket connection with the relemetry core:
// Open a websocket connection with the telemetry core:
let mut client = Client::new(socket.compat(), host, &path);
let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? {
ServerResponse::Accepted { .. } => client.into_builder().finish(),
@@ -124,12 +129,10 @@ where
};
// This task reads data sent from the telemetry core and
// forwards it on to our aggregator loop:
// forwards it to our aggregator loop:
tokio::spawn(async move {
let mut data = Vec::with_capacity(128);
loop {
// Clear the buffer and wait for the next message to arrive:
data.clear();
let mut data = Vec::new();
if let Err(e) = ws_from_connection.receive_data(&mut data).await {
// Couldn't receive data may mean all senders are gone, so log
// the error and shut this down:
@@ -1,8 +1,13 @@
//! A hash wrapper which can be deserialized from a hex string as well as from an array of bytes,
//! so that it can deal with the sort of inputs we expect from substrate nodes.
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Unexpected, Visitor};
use serde::ser::{Serialize, Serializer};
use std::fmt::{self, Debug, Display};
use std::str::FromStr;
/// We assume that hashes are 32 bytes long, and in practise that's currently true,
/// but in theory it doesn't need to be. We may need to be more dynamic here.
const HASH_BYTES: usize = 32;
/// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`.
@@ -128,7 +133,7 @@ impl Display for Hash {
hex::encode_to_slice(self.0, &mut ascii)
.expect("Encoding 32 bytes into 64 bytes of ascii; qed");
f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed"))
f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes can't fail; qed"))
}
}
+10 -5
View File
@@ -147,14 +147,19 @@ where
if !msg.is_binary() && !msg.is_text() {
continue;
}
// Deserialize from JSON, warning if deserialization fails:
// Deserialize from JSON, warning in debug mode if deserialization fails:
let bytes = msg.as_bytes();
let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) {
Ok(node_message) => node_message,
Err(_e) => {
// let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
// let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
// log::warn!("Failed to parse node message ({}): {}", msg_start, e);
#[cfg(debug)]
Err(e) => {
let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
log::warn!("Failed to parse node message ({}): {}", msg_start, e);
continue;
},
#[cfg(not(debug))]
Err(_) => {
continue;
}
};
+1 -1
View File
@@ -93,7 +93,7 @@ pub async fn connect(uri: &http::Uri) -> Result<(Sender, Receiver), ConnectError
let path = uri.path();
let socket = TcpStream::connect((host, port)).await?;
socket.set_nodelay(true).unwrap();
socket.set_nodelay(true).expect("socket set_nodelay failed");
// Establish a WS connection:
let mut client = Client::new(socket.compat(), host, &path);