rename binaries for clarity, and first pass of connect_to_servers test util

This commit is contained in:
James Wilson
2021-07-07 12:49:03 +01:00
parent 8bf412cad9
commit f2adead2e9
25 changed files with 212 additions and 48 deletions
+271
View File
@@ -0,0 +1,271 @@
use crate::connection::{create_ws_connection, Message};
use common::{
internal_messages::{self, ShardNodeId},
node_message,
node_types::BlockHash,
AssignId,
};
use futures::{channel::mpsc, future};
use futures::{Sink, SinkExt, StreamExt};
use std::collections::HashSet;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
/// A unique Id is assigned per websocket connection (or more accurately,
/// per thing-that-subscribes-to-the-aggregator). That connection might send
/// data on behalf of multiple chains, so this ID is local to the aggregator,
/// and a unique ID is assigned per batch of data too ([`internal_messages::LocalId`]).
type ConnId = u64;
/// Incoming messages are either from websocket connections or
/// from the telemetry core. This can be private since the only
/// external messages are via subscriptions that take
/// [`FromWebsocket`] instances.
#[derive(Clone, Debug)]
enum ToAggregator {
DisconnectedFromTelemetryCore,
ConnectedToTelemetryCore,
FromWebsocket(ConnId, FromWebsocket),
FromTelemetryCore(internal_messages::FromTelemetryCore),
}
/// An incoming socket connection can provide these messages.
/// Until a node has been Added via [`FromWebsocket::Add`],
/// messages from it will be ignored.
#[derive(Clone, Debug)]
pub enum FromWebsocket {
/// Fire this when the connection is established.
Initialize {
/// When a message is sent back up this channel, we terminate
/// the websocket connection and force the node to reconnect
/// so that it sends its system info again incase the telemetry
/// core has restarted.
close_connection: mpsc::Sender<()>,
},
/// Tell the aggregator about a new node.
Add {
message_id: node_message::NodeMessageId,
ip: Option<std::net::IpAddr>,
node: common::node_types::NodeDetails,
genesis_hash: BlockHash,
},
/// Update/pass through details about a node.
Update {
message_id: node_message::NodeMessageId,
payload: node_message::Payload,
},
/// Make a note when the node disconnects.
Disconnected,
}
pub type FromAggregator = internal_messages::FromShardAggregator;
#[derive(Clone)]
pub struct Aggregator(Arc<AggregatorInternal>);
struct AggregatorInternal {
/// Nodes that connect are each assigned a unique connection ID. Nodes
/// can send messages on behalf of more than one chain, and so this ID is
/// only really used inside the Aggregator in conjunction with a per-message
/// ID.
conn_id: AtomicU64,
/// Send messages to the aggregator from websockets via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::Sender<ToAggregator>,
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
// Map responses from our connection into messages that will be sent to the aggregator:
let tx_from_connection = tx_to_aggregator.clone().with(|msg| {
future::ok::<_, mpsc::SendError>(match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
})
});
// Establish a resiliant connection to the core (this retries as needed):
let tx_to_telemetry_core = create_ws_connection(tx_from_connection, telemetry_uri).await;
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(
rx_from_external,
tx_to_telemetry_core,
));
// Return a handle to our aggregator:
Ok(Aggregator(Arc::new(AggregatorInternal {
conn_id: AtomicU64::new(1),
tx_to_aggregator,
})))
}
// This is spawned into a separate task and handles any messages coming
// in to the aggregator. If nobody is tolding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
mut rx_from_external: mpsc::Receiver<ToAggregator>,
mut tx_to_telemetry_core: mpsc::Sender<FromAggregator>,
) {
use internal_messages::{FromShardAggregator, FromTelemetryCore};
// Just as an optimisation, we can keep track of whether we're connected to the backend
// or not, and ignore incoming messages while we aren't.
let mut connected_to_telemetry_core = false;
// A list of close channels for the current connections. Send an empty tuple to
// these to ask the connections to be closed.
let mut close_connections: Vec<mpsc::Sender<()>> = vec![];
// Maintain mappings from the connection ID and node message ID to the "local ID" which we
// broadcast to the telemetry core.
let mut to_local_id = AssignId::new();
// Any messages coming from nodes that have been muted are ignored:
let mut muted: HashSet<ShardNodeId> = HashSet::new();
// Now, loop and receive messages to handle.
while let Some(msg) = rx_from_external.next().await {
match msg {
ToAggregator::ConnectedToTelemetryCore => {
// Take hold of the connection closers and run them all.
let closers = close_connections;
for mut closer in closers {
// if this fails, it probably means the connection has died already anyway.
let _ = closer.send(());
}
// We've told everything to disconnect. Now, reset our state:
close_connections = vec![];
to_local_id.clear();
muted.clear();
connected_to_telemetry_core = true;
log::info!("Connected to telemetry core");
}
ToAggregator::DisconnectedFromTelemetryCore => {
connected_to_telemetry_core = false;
log::info!("Disconnected from telemetry core");
}
ToAggregator::FromWebsocket(
_conn_id,
FromWebsocket::Initialize { close_connection },
) => {
// We boot all connections on a reconnect-to-core to force new systemconnected
// messages to be sent. We could boot on muting, but need to be careful not to boot
// connections where we mute one set of messages it sends and not others.
close_connections.push(close_connection);
}
ToAggregator::FromWebsocket(
conn_id,
FromWebsocket::Add {
message_id,
ip,
node,
genesis_hash,
},
) => {
// Don't bother doing anything else if we're disconnected, since we'll force the
// node to reconnect anyway when the backend does:
if !connected_to_telemetry_core {
continue;
}
// Generate a new "local ID" for messages from this connection:
let local_id = to_local_id.assign_id((conn_id, message_id));
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
.send(FromShardAggregator::AddNode {
ip,
node,
genesis_hash,
local_id,
})
.await;
}
ToAggregator::FromWebsocket(
conn_id,
FromWebsocket::Update {
message_id,
payload,
},
) => {
// Ignore incoming messages if we're not connected to the backend:
if !connected_to_telemetry_core {
continue;
}
// Get the local ID, ignoring the message if none match:
let local_id = match to_local_id.get_id(&(conn_id, message_id)) {
Some(id) => id,
None => continue,
};
// ignore the message if this node has been muted:
if muted.contains(&local_id) {
continue;
}
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
.send(FromShardAggregator::UpdateNode { local_id, payload })
.await;
}
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
// Find all of the local IDs corresponding to the disconnected connection ID and
// remove them, telling Telemetry Core about them too. This could be more efficient,
// but the mapping isn't currently cached and it's not a super frequent op.
let local_ids_disconnected: Vec<_> = to_local_id
.iter()
.filter(|(_, &(conn_id, _))| disconnected_conn_id == conn_id)
.map(|(local_id, _)| local_id)
.collect();
for local_id in local_ids_disconnected {
to_local_id.remove_by_id(local_id);
let _ = tx_to_telemetry_core
.send(FromShardAggregator::RemoveNode { local_id })
.await;
}
}
ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute {
local_id,
reason: _,
}) => {
// Ignore incoming messages if we're not connected to the backend:
if !connected_to_telemetry_core {
continue;
}
// Mute the local ID we've been told to:
muted.insert(local_id);
}
}
}
}
/// Return a sink that a node can send messages into to be handled by the aggregator.
pub fn subscribe_node(&self) -> impl Sink<FromWebsocket, Error = anyhow::Error> + Unpin {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let conn_id: ConnId = self
.0
.conn_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_to_aggregator = self.0.tx_to_aggregator.clone();
// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(
tx_to_aggregator
.with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }),
)
}
}
+195
View File
@@ -0,0 +1,195 @@
use futures::channel::mpsc;
use futures::{Sink, SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
#[derive(Clone, Debug)]
pub enum Message<Out> {
Connected,
Disconnected,
Data(Out),
}
/// Connect to a websocket server, retrying the connection if we're disconnected.
/// - Sends messages when disconnected, reconnected or data received from the connection.
/// - Returns a channel that allows you to send messages to the connection.
/// - Messages all encoded/decoded from bincode.
pub async fn create_ws_connection<In, Out, S, E>(
mut tx_to_external: S,
telemetry_uri: http::Uri,
) -> mpsc::Sender<In>
where
S: Sink<Message<Out>, Error = E> + Unpin + Send + Clone + 'static,
E: std::fmt::Debug + std::fmt::Display + Send + 'static,
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
// Set up a proxy channel to relay messages to the telemetry core, and return one end of it.
// Once a connection to the backend is established, we pass messages along to it. If the connection
// fails, we
let (tx_to_connection_proxy, mut rx_from_external_proxy) = mpsc::channel(10);
tokio::spawn(async move {
let mut connected = false;
loop {
// Throw away any pending messages from the incoming channel so that it
// doesn't get blocked up while we're looping and waiting for a reconnection.
while let Ok(Some(_)) = rx_from_external_proxy.try_next() {}
// The connection will pass messages back to this.
let tx_from_connection = tx_to_external.clone();
// Attempt to reconnect.
match create_ws_connection_no_retry(tx_from_connection, telemetry_uri.clone()).await {
Ok(mut tx_to_connection) => {
connected = true;
// Inform the handler loop that we've reconnected.
tx_to_external
.send(Message::Connected)
.await
.expect("must be able to send reconnect msg");
// Start forwarding messages on to the backend.
while let Some(msg) = rx_from_external_proxy.next().await {
if let Err(e) = tx_to_connection.send(msg).await {
// Issue forwarding a message to the telemetry core?
// Give up and try to reconnect on the next loop iteration.
log::error!(
"Error sending message to websocker server (will reconnect): {}",
e
);
break;
}
}
}
Err(e) => {
// Issue connecting? Wait and try again on the next loop iteration.
log::error!(
"Error connecting to websocker server (will reconnect): {}",
e
);
}
};
// Tell the aggregator that we're disconnected so that, if we like, we can discard
// messages without doing any futher processing on them.
if connected {
connected = false;
let _ = tx_to_external.send(Message::Disconnected).await;
}
// Wait a little before trying to reconnect.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
tx_to_connection_proxy
}
/// This spawns a connection to a websocket server, serializing/deserialziing
/// from bincode as messages are sent or received.
async fn create_ws_connection_no_retry<In, Out, S, E>(
mut tx_to_external: S,
telemetry_uri: http::Uri,
) -> anyhow::Result<mpsc::Sender<In>>
where
S: Sink<Message<Out>, Error = E> + Unpin + Send + 'static,
E: std::fmt::Debug + std::fmt::Display,
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
use bincode::Options;
use soketto::handshake::{Client, ServerResponse};
let host = telemetry_uri.host().unwrap_or("127.0.0.1");
let port = telemetry_uri.port_u16().unwrap_or(8000);
let path = telemetry_uri.path();
let socket = TcpStream::connect((host, port)).await?;
socket.set_nodelay(true).unwrap();
// Open a websocket connection with the relemetry core:
let mut client = Client::new(socket.compat(), host, &path);
let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? {
ServerResponse::Accepted { .. } => client.into_builder().finish(),
ServerResponse::Redirect { status_code, .. } | ServerResponse::Rejected { status_code } => {
return Err(anyhow::anyhow!(
"Failed to connect to {}{}, status code: {}",
host,
path,
status_code
));
}
};
// This task reads data sent from the telemetry core and
// forwards it on to our aggregator loop:
tokio::spawn(async move {
let mut data = Vec::with_capacity(128);
loop {
// Clear the buffer and wait for the next message to arrive:
data.clear();
if let Err(e) = ws_from_connection.receive_data(&mut data).await {
// Couldn't receive data may mean all senders are gone, so log
// the error and shut this down:
log::error!(
"Shutting down websocket connection: Failed to receive data: {}",
e
);
return;
}
// Attempt to deserialize, and send to our handler loop:
match bincode::options().deserialize(&data) {
Ok(msg) => {
if let Err(e) = tx_to_external.send(Message::Data(msg)).await {
// Failure to send to our loop likely means it's hit an
// issue and shut down, so bail on this loop as well:
log::error!(
"Shutting down websocket connection: Failed to send data out: {}",
e
);
return;
}
}
Err(err) => {
// Log the error but otherwise ignore it and keep running:
log::warn!("Failed to decode message from Backend Core: {:?}", err);
}
}
}
});
// This task receives messages from the aggregator,
// encodes them and sends them to the telemetry core:
let (tx_to_connection, mut rx_from_aggregator) = mpsc::channel(10);
tokio::spawn(async move {
while let Some(msg) = rx_from_aggregator.next().await {
let bytes = bincode::options()
.serialize(&msg)
.expect("must be able to serialize msg");
// Any errors sending the message leads to this task ending, which should cascade to
// the entire connection being ended.
if let Err(e) = ws_to_connection.send_binary_mut(bytes).await {
log::error!(
"Shutting down websocket connection: Failed to send data in: {}",
e
);
return;
}
if let Err(e) = ws_to_connection.flush().await {
log::error!(
"Shutting down websocket connection: Failed to flush data: {}",
e
);
return;
}
}
});
// We return a channel that you can send messages down in order to have
// them sent to the telemetry core:
Ok(tx_to_connection)
}
@@ -0,0 +1,214 @@
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Unexpected, Visitor};
use serde::ser::{Serialize, Serializer};
use std::fmt::{self, Debug, Display};
use std::str::FromStr;
const HASH_BYTES: usize = 32;
/// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`.
/// This can deserialize from a JSON string or array.
#[derive(Hash, PartialEq, Eq, Clone, Copy)]
pub struct Hash([u8; HASH_BYTES]);
impl From<Hash> for common::node_types::BlockHash {
fn from(hash: Hash) -> Self {
hash.0.into()
}
}
impl From<common::node_types::BlockHash> for Hash {
fn from(hash: common::node_types::BlockHash) -> Self {
Hash(hash.0)
}
}
struct HashVisitor;
impl<'de> Visitor<'de> for HashVisitor {
type Value = Hash;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(
"byte array of length 32, or hexidecimal string of 32 bytes beginning with 0x",
)
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
value
.parse()
.map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self))
}
fn visit_bytes<E>(self, value: &[u8]) -> Result<Self::Value, E>
where
E: de::Error,
{
if value.len() == HASH_BYTES {
let mut hash = [0; HASH_BYTES];
hash.copy_from_slice(value);
return Ok(Hash(hash));
}
Hash::from_ascii(value)
.map_err(|_| de::Error::invalid_value(Unexpected::Bytes(value), &self))
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut hash = [0u8; HASH_BYTES];
for (i, byte) in hash.iter_mut().enumerate() {
match seq.next_element()? {
Some(b) => *byte = b,
None => return Err(de::Error::invalid_length(i, &"an array of 32 bytes")),
}
}
if seq.next_element::<u8>()?.is_some() {
return Err(de::Error::invalid_length(33, &"an array of 32 bytes"));
}
Ok(Hash(hash))
}
}
impl Hash {
pub fn from_ascii(value: &[u8]) -> Result<Self, HashParseError> {
if !value.starts_with(b"0x") {
return Err(HashParseError::InvalidPrefix);
}
let mut hash = [0; HASH_BYTES];
hex::decode_to_slice(&value[2..], &mut hash).map_err(HashParseError::HexError)?;
Ok(Hash(hash))
}
}
impl FromStr for Hash {
type Err = HashParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
Hash::from_ascii(value.as_bytes())
}
}
impl<'de> Deserialize<'de> for Hash {
fn deserialize<D>(deserializer: D) -> Result<Hash, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_bytes(HashVisitor)
}
}
impl Serialize for Hash {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_bytes(&self.0)
}
}
impl Display for Hash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("0x")?;
let mut ascii = [0; HASH_BYTES * 2];
hex::encode_to_slice(self.0, &mut ascii)
.expect("Encoding 32 bytes into 64 bytes of ascii; qed");
f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed"))
}
}
impl Debug for Hash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(self, f)
}
}
#[derive(thiserror::Error, Debug)]
pub enum HashParseError {
#[error("Error parsing string into hex: {0}")]
HexError(hex::FromHexError),
#[error("Invalid hex prefix: expected '0x'")]
InvalidPrefix,
}
#[cfg(test)]
mod tests {
use super::Hash;
use bincode::Options;
const DUMMY: Hash = {
let mut hash = [0; 32];
hash[0] = 0xDE;
hash[1] = 0xAD;
hash[2] = 0xBE;
hash[3] = 0xEF;
Hash(hash)
};
#[test]
fn deserialize_json_hash_str() {
let json = r#""0xdeadBEEF00000000000000000000000000000000000000000000000000000000""#;
let hash: Hash = serde_json::from_str(json).unwrap();
assert_eq!(hash, DUMMY);
}
#[test]
fn deserialize_json_array() {
let json = r#"[222,173,190,239,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]"#;
let hash: Hash = serde_json::from_str(json).unwrap();
assert_eq!(hash, DUMMY);
}
#[test]
fn deserialize_json_array_too_short() {
let json = r#"[222,173,190,239,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]"#;
let res = serde_json::from_str::<Hash>(json);
assert!(res.is_err());
}
#[test]
fn deserialize_json_array_too_long() {
let json = r#"[222,173,190,239,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]"#;
let res = serde_json::from_str::<Hash>(json);
assert!(res.is_err());
}
#[test]
fn bincode() {
let bytes = bincode::options().serialize(&DUMMY).unwrap();
let mut expected = [0; 33];
expected[0] = 32; // length
expected[1..].copy_from_slice(&DUMMY.0);
assert_eq!(bytes, &expected);
let deserialized: Hash = bincode::options().deserialize(&bytes).unwrap();
assert_eq!(DUMMY, deserialized);
}
}
@@ -0,0 +1,7 @@
//! This module contains the types we need to deserialize JSON messages from nodes
mod hash;
mod node_message;
pub use hash::Hash;
pub use node_message::*;
@@ -0,0 +1,344 @@
//! The structs and enums defined in this module are largely identical to those
//! we'll use elsewhere internally, but are kept separate so that the JSON structure
//! is defined (almost) from just this file, and we don't have to worry about breaking
//! compatibility with the input data when we make changes to our internal data
//! structures (for example, to support bincode better).
use super::hash::Hash;
use common::node_message as internal;
use common::node_types;
use serde::Deserialize;
/// This struct represents a telemetry message sent from a node as
/// a JSON payload. Since JSON is self describing, we can use attributes
/// like serde(untagged) and serde(flatten) without issue.
///
/// Internally, we want to minimise the amount of data sent from shards to
/// the core node. For that reason, we use a non-self-describing serialization
/// format like bincode, which doesn't support things like `[serde(flatten)]` (which
/// internally wants to serialize to a map of unknown length) or `[serde(tag/untagged)]`
/// (which relies on the data to know which variant to deserialize to.)
///
/// So, this can be converted fairly cheaply into an enum we'll use internally
/// which is compatible with formats like bincode.
#[derive(Deserialize, Debug)]
#[serde(untagged)]
pub enum NodeMessage {
V1 {
#[serde(flatten)]
payload: Payload,
},
V2 {
id: NodeMessageId,
payload: Payload,
},
}
impl From<NodeMessage> for internal::NodeMessage {
fn from(msg: NodeMessage) -> Self {
match msg {
NodeMessage::V1 { payload } => internal::NodeMessage::V1 {
payload: payload.into(),
},
NodeMessage::V2 { id, payload } => internal::NodeMessage::V2 {
id,
payload: payload.into(),
},
}
}
}
#[derive(Deserialize, Debug)]
#[serde(tag = "msg")]
pub enum Payload {
#[serde(rename = "system.connected")]
SystemConnected(SystemConnected),
#[serde(rename = "system.interval")]
SystemInterval(SystemInterval),
#[serde(rename = "block.import")]
BlockImport(Block),
#[serde(rename = "notify.finalized")]
NotifyFinalized(Finalized),
#[serde(rename = "txpool.import")]
TxPoolImport,
#[serde(rename = "afg.finalized")]
AfgFinalized(AfgFinalized),
#[serde(rename = "afg.received_precommit")]
AfgReceivedPrecommit(AfgReceived),
#[serde(rename = "afg.received_prevote")]
AfgReceivedPrevote(AfgReceived),
#[serde(rename = "afg.received_commit")]
AfgReceivedCommit(AfgReceived),
#[serde(rename = "afg.authority_set")]
AfgAuthoritySet(AfgAuthoritySet),
#[serde(rename = "afg.finalized_blocks_up_to")]
AfgFinalizedBlocksUpTo,
#[serde(rename = "aura.pre_sealed_block")]
AuraPreSealedBlock,
#[serde(rename = "prepared_block_for_proposing")]
PreparedBlockForProposing,
}
impl From<Payload> for internal::Payload {
fn from(msg: Payload) -> Self {
match msg {
Payload::SystemConnected(m) => internal::Payload::SystemConnected(m.into()),
Payload::SystemInterval(m) => internal::Payload::SystemInterval(m.into()),
Payload::BlockImport(m) => internal::Payload::BlockImport(m.into()),
Payload::NotifyFinalized(m) => internal::Payload::NotifyFinalized(m.into()),
Payload::TxPoolImport => internal::Payload::TxPoolImport,
Payload::AfgFinalized(m) => internal::Payload::AfgFinalized(m.into()),
Payload::AfgReceivedPrecommit(m) => internal::Payload::AfgReceivedPrecommit(m.into()),
Payload::AfgReceivedPrevote(m) => internal::Payload::AfgReceivedPrevote(m.into()),
Payload::AfgReceivedCommit(m) => internal::Payload::AfgReceivedCommit(m.into()),
Payload::AfgAuthoritySet(m) => internal::Payload::AfgAuthoritySet(m.into()),
Payload::AfgFinalizedBlocksUpTo => internal::Payload::AfgFinalizedBlocksUpTo,
Payload::AuraPreSealedBlock => internal::Payload::AuraPreSealedBlock,
Payload::PreparedBlockForProposing => internal::Payload::PreparedBlockForProposing,
}
}
}
#[derive(Deserialize, Debug)]
pub struct SystemConnected {
pub genesis_hash: Hash,
#[serde(flatten)]
pub node: NodeDetails,
}
impl From<SystemConnected> for internal::SystemConnected {
fn from(msg: SystemConnected) -> Self {
internal::SystemConnected {
genesis_hash: msg.genesis_hash.into(),
node: msg.node.into(),
}
}
}
#[derive(Deserialize, Debug)]
pub struct SystemInterval {
pub peers: Option<u64>,
pub txcount: Option<u64>,
pub bandwidth_upload: Option<f64>,
pub bandwidth_download: Option<f64>,
pub finalized_height: Option<BlockNumber>,
pub finalized_hash: Option<Hash>,
#[serde(flatten)]
pub block: Option<Block>,
pub used_state_cache_size: Option<f32>,
}
impl From<SystemInterval> for internal::SystemInterval {
fn from(msg: SystemInterval) -> Self {
internal::SystemInterval {
peers: msg.peers,
txcount: msg.txcount,
bandwidth_upload: msg.bandwidth_upload,
bandwidth_download: msg.bandwidth_download,
finalized_height: msg.finalized_height,
finalized_hash: msg.finalized_hash.map(|h| h.into()),
block: msg.block.map(|b| b.into()),
used_state_cache_size: msg.used_state_cache_size,
}
}
}
#[derive(Deserialize, Debug)]
pub struct Finalized {
#[serde(rename = "best")]
pub hash: Hash,
pub height: Box<str>,
}
impl From<Finalized> for internal::Finalized {
fn from(msg: Finalized) -> Self {
internal::Finalized {
hash: msg.hash.into(),
height: msg.height,
}
}
}
#[derive(Deserialize, Debug)]
pub struct AfgAuthoritySet {
pub authority_id: Box<str>,
pub authorities: Box<str>,
pub authority_set_id: Box<str>,
}
impl From<AfgAuthoritySet> for internal::AfgAuthoritySet {
fn from(msg: AfgAuthoritySet) -> Self {
internal::AfgAuthoritySet {
authority_id: msg.authority_id,
authorities: msg.authorities,
authority_set_id: msg.authority_set_id,
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgFinalized {
pub finalized_hash: Hash,
pub finalized_number: Box<str>,
}
impl From<AfgFinalized> for internal::AfgFinalized {
fn from(msg: AfgFinalized) -> Self {
internal::AfgFinalized {
finalized_hash: msg.finalized_hash.into(),
finalized_number: msg.finalized_number,
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceived {
pub target_hash: Hash,
pub target_number: Box<str>,
pub voter: Option<Box<str>>,
}
impl From<AfgReceived> for internal::AfgReceived {
fn from(msg: AfgReceived) -> Self {
internal::AfgReceived {
target_hash: msg.target_hash.into(),
target_number: msg.target_number,
voter: msg.voter,
}
}
}
#[derive(Deserialize, Debug, Clone, Copy)]
pub struct Block {
#[serde(rename = "best")]
pub hash: Hash,
pub height: BlockNumber,
}
impl From<Block> for node_types::Block {
fn from(block: Block) -> Self {
node_types::Block {
hash: block.hash.into(),
height: block.height,
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct NodeDetails {
pub chain: Box<str>,
pub name: Box<str>,
pub implementation: Box<str>,
pub version: Box<str>,
pub validator: Option<Box<str>>,
pub network_id: Option<Box<str>>,
pub startup_time: Option<Box<str>>,
}
impl From<NodeDetails> for node_types::NodeDetails {
fn from(details: NodeDetails) -> Self {
node_types::NodeDetails {
chain: details.chain,
name: details.name,
implementation: details.implementation,
version: details.version,
validator: details.validator,
network_id: details.network_id,
startup_time: details.startup_time,
}
}
}
type NodeMessageId = u64;
type BlockNumber = u64;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message_v1() {
let json = r#"{
"msg":"notify.finalized",
"level":"INFO",
"ts":"2021-01-13T12:38:25.410794650+01:00",
"best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb",
"height":"50"
}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V1 { .. },
),
"message did not match variant V1",
);
}
#[test]
fn message_v2() {
let json = r#"{
"id":1,
"ts":"2021-01-13T12:22:20.053527101+01:00",
"payload":{
"best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d",
"height":"209",
"msg":"notify.finalized"
}
}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V2 { .. },
),
"message did not match variant V2",
);
}
#[test]
fn message_v2_received_precommit() {
let json = r#"{
"id":1,
"ts":"2021-01-13T12:22:20.053527101+01:00",
"payload":{
"target_hash":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d",
"target_number":"209",
"voter":"foo",
"msg":"afg.received_precommit"
}
}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V2 {
payload: Payload::AfgReceivedPrecommit(..),
..
},
),
"message did not match the expected output",
);
}
#[test]
fn message_v2_tx_pool_import() {
// We should happily ignore any fields we don't care about.
let json = r#"{
"id":1,
"ts":"2021-01-13T12:22:20.053527101+01:00",
"payload":{
"foo":"Something",
"bar":123,
"wibble":"wobble",
"msg":"txpool.import"
}
}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V2 {
payload: Payload::TxPoolImport,
..
},
),
"message did not match the expected output",
);
}
}
+191
View File
@@ -0,0 +1,191 @@
mod aggregator;
mod connection;
mod json_message;
mod real_ip;
use std::net::IpAddr;
use aggregator::{Aggregator, FromWebsocket};
use common::node_message;
use futures::{channel::mpsc, SinkExt, StreamExt};
use http::Uri;
use real_ip::real_ip;
use simple_logger::SimpleLogger;
use structopt::StructOpt;
use warp::filters::ws;
use warp::Filter;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
const NAME: &str = "Substrate Telemetry Backend Shard";
const ABOUT: &str = "This is the Telemetry Backend Shard that forwards the \
data sent by Substrate/Polkadot nodes to the Backend Core";
#[derive(StructOpt, Debug)]
#[structopt(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)]
struct Opts {
/// This is the socket address that this shard is listening to. This is restricted to
/// localhost (127.0.0.1) by default and should be fine for most use cases. If
/// you are using Telemetry in a container, you likely want to set this to '0.0.0.0:8000'
#[structopt(short = "l", long = "listen", default_value = "127.0.0.1:8001")]
socket: std::net::SocketAddr,
/// The desired log level; one of 'error', 'warn', 'info', 'debug' or 'trace', where
/// 'error' only logs errors and 'trace' logs everything.
#[structopt(required = false, long = "log", default_value = "info")]
log_level: log::LevelFilter,
/// Url to the Backend Core endpoint accepting shard connections
#[structopt(
short = "c",
long = "core",
default_value = "ws://127.0.0.1:8000/shard_submit/"
)]
core_url: Uri,
}
#[tokio::main]
async fn main() {
let opts = Opts::from_args();
SimpleLogger::new()
.with_level(opts.log_level)
.init()
.expect("Must be able to start a logger");
log::info!("Starting Telemetry Shard version: {}", VERSION);
if let Err(e) = start_server(opts).await {
log::error!("Error starting server: {}", e);
}
}
/// Declare our routes and start the server.
async fn start_server(opts: Opts) -> anyhow::Result<()> {
let aggregator = Aggregator::spawn(opts.core_url).await?;
// Handle requests to /health by returning OK.
let health_route = warp::path("health").map(|| "OK");
// Handle websocket requests to /submit.
let ws_route = warp::path("submit").and(warp::ws()).and(real_ip()).map(
move |ws: ws::Ws, addr: Option<IpAddr>| {
// Send messages from the websocket connection to this sink
// to have them pass to the aggregator.
let tx_to_aggregator = aggregator.subscribe_node();
log::info!("Opening /submit connection from {:?}", addr);
ws.on_upgrade(move |websocket| async move {
let (mut tx_to_aggregator, websocket) =
handle_websocket_connection(websocket, tx_to_aggregator, addr).await;
log::info!("Closing /submit connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await;
// Note: IF we want to close with a status code and reason, we need to construct
// a ws::Message using `ws::Message::close_with`, rather than using this method:
let _ = websocket.close().await;
})
},
);
// Merge the routes and start our server:
let routes = ws_route.or(health_route);
warp::serve(routes).run(opts.socket).await;
Ok(())
}
/// This takes care of handling messages from an established socket connection.
async fn handle_websocket_connection<S>(
mut websocket: ws::WebSocket,
mut tx_to_aggregator: S,
addr: Option<IpAddr>,
) -> (S, ws::WebSocket)
where
S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin,
{
// This could be a oneshot channel, but it's useful to be able to clone
// messages, and we can't clone oneshot channel senders.
let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0);
// Tell the aggregator about this new connection, and give it a way to close this connection:
let init_msg = FromWebsocket::Initialize {
close_connection: close_connection_tx,
};
if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e);
return (tx_to_aggregator, websocket);
}
// Now we've "initialized", wait for messages from the node. Messages will
// either be `SystemConnected` type messages that inform us that a new set
// of messages with some message ID will be sent (a node could have more
// than one of these), or updates linked to a specific message_id.
loop {
tokio::select! {
// The close channel has fired, so end the loop:
_ = close_connection_rx.next() => {
log::info!("connection to {:?} being closed by aggregator", addr);
break
},
// A message was received; handle it:
msg = websocket.next() => {
let msg = match msg {
Some(msg) => msg,
None => { log::warn!("Websocket connection from {:?} closed", addr); break }
};
// If we see any errors, log them and end our loop:
let msg = match msg {
Err(e) => { log::error!("Error in node websocket connection: {}", e); break },
Ok(msg) => msg,
};
// Close message? Break to close connection.
if msg.is_close() {
break;
}
// If the message isn't something we want to handle, just ignore it.
// This includes system messages like "pings" and such, so don't log anything.
if !msg.is_binary() && !msg.is_text() {
continue;
}
// Deserialize from JSON, warning if deserialization fails:
let bytes = msg.as_bytes();
let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) {
Ok(node_message) => node_message,
Err(_e) => {
// let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
// let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
// log::warn!("Failed to parse node message ({}): {}", msg_start, e);
continue;
}
};
// Pull relevant details from the message:
let node_message: node_message::NodeMessage = node_message.into();
let message_id = node_message.id();
let payload = node_message.into_payload();
// Until the aggregator receives an `Add` message, which we can create once
// we see one of these SystemConnected ones, it will ignore messages with
// the corresponding message_id.
if let node_message::Payload::SystemConnected(info) = payload {
let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id,
ip: addr,
node: info.node,
genesis_hash: info.genesis_hash,
}).await;
}
// Anything that's not an "Add" is an Update. The aggregator will ignore
// updates against a message_id that hasn't first been Added, above.
else if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
log::error!("Failed to send node message to aggregator: {}", e);
continue;
}
}
}
}
// Return what we need to close the connection gracefully:
(tx_to_aggregator, websocket)
}
+136
View File
@@ -0,0 +1,136 @@
use std::net::{IpAddr, SocketAddr};
use warp::filters::addr;
use warp::filters::header;
use warp::Filter;
/**
A warp filter to extract the "real" IP address of the connection by looking at headers
set by proxies (this is inspired by Actix Web's implementation of the feature).
First, check for the standardised "Forwarded" header. This looks something like:
"Forwarded: for=12.34.56.78;host=example.com;proto=https, for=23.45.67.89"
Each proxy can append to this comma separated list of forwarded-details. We'll look for
the first "for" address and try to decode that.
If this doesn't yield a result, look for the non-standard but common X-Forwarded-For header,
which contains a comma separated list of addresses; each proxy in the potential chain possibly
appending one to the end. So, take the first of these if it exists.
If still no luck, look for the X-Real-IP header, which we expect to contain a single IP address.
If that _still_ doesn't work, fall back to the socket address of the connection.
Return `None` if all of this fails to yield an address.
*/
pub fn real_ip() -> impl warp::Filter<Extract = (Option<IpAddr>,), Error = warp::Rejection> + Clone
{
header::optional("forwarded")
.and(header::optional("x-forwarded-for"))
.and(header::optional("x-real-ip"))
.and(addr::remote())
.map(pick_best_ip_from_options)
}
fn pick_best_ip_from_options(
// Forwarded header value (if present)
forwarded: Option<String>,
// X-Forwarded-For header value (if present)
forwarded_for: Option<String>,
// X-Real-IP header value (if present)
real_ip: Option<String>,
// socket address (if known)
addr: Option<SocketAddr>,
) -> Option<IpAddr> {
let realip = forwarded
.as_ref()
.and_then(|val| get_first_addr_from_forwarded_header(val))
.or_else(|| {
// fall back to X-Forwarded-For
forwarded_for
.as_ref()
.and_then(|val| get_first_addr_from_x_forwarded_for_header(val))
})
.or_else(|| {
// fall back to X-Real-IP
real_ip.as_ref().map(|val| val.trim())
})
.and_then(|ip| {
// Try parsing assuming it may have a port first,
// and then assuming it doesn't.
ip.parse::<SocketAddr>()
.map(|s| s.ip())
.or_else(|_| ip.parse::<IpAddr>())
.ok()
})
// Fall back to local IP address if the above fails
.or(addr.map(|a| a.ip()));
realip
}
/// Follow https://datatracker.ietf.org/doc/html/rfc7239 to decode the Forwarded header value.
/// Roughly, proxies can add new sets of values by appending a comma to the existing list
/// (so we have something like "values1, values2, values3" from proxy1, proxy2 and proxy3 for
/// instance) and then the valeus themselves are ';' separated name=value pairs. The value in each
/// pair may or may not be surrounded in double quotes.
///
/// Examples from the RFC:
///
/// Forwarded: for="_gazonk"
/// Forwarded: For="[2001:db8:cafe::17]:4711"
/// Forwarded: for=192.0.2.60;proto=http;by=203.0.113.43
/// Forwarded: for=192.0.2.43, for=198.51.100.17
fn get_first_addr_from_forwarded_header(value: &str) -> Option<&str> {
let first_values = value.split(',').next()?;
for pair in first_values.split(';') {
let mut parts = pair.trim().splitn(2, '=');
let key = parts.next()?;
let value = parts.next()?;
if key.to_lowercase() == "for" {
// trim double quotes if they surround the value:
let value = if value.starts_with('"') && value.ends_with('"') {
&value[1..value.len() - 1]
} else {
value
};
return Some(value);
}
}
None
}
fn get_first_addr_from_x_forwarded_for_header(value: &str) -> Option<&str> {
value.split(",").map(|val| val.trim()).next()
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn get_addr_from_forwarded_rfc_examples() {
let examples = vec![
(r#"for="_gazonk""#, "_gazonk"),
(
r#"For="[2001:db8:cafe::17]:4711""#,
"[2001:db8:cafe::17]:4711",
),
(r#"for=192.0.2.60;proto=http;by=203.0.113.43"#, "192.0.2.60"),
(r#"for=192.0.2.43, for=198.51.100.17"#, "192.0.2.43"),
];
for (value, expected) in examples {
assert_eq!(
get_first_addr_from_forwarded_header(value),
Some(expected),
"Header value: {}",
value
);
}
}
}