diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index e6d489d..ea35dca 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -1,5 +1,5 @@ -use std::hash::Hash; use bimap::BiMap; +use std::hash::Hash; /// A struct that allows you to assign an ID to an arbitrary set of /// details (so long as they are Eq+Hash+Clone), and then access @@ -9,20 +9,20 @@ use bimap::BiMap; pub struct AssignId { current_id: usize, mapping: BiMap, - _id_type: std::marker::PhantomData + _id_type: std::marker::PhantomData, } -impl AssignId +impl AssignId where Details: Eq + Hash, Id: From + Copy, - usize: From + usize: From, { pub fn new() -> Self { Self { current_id: 0, mapping: BiMap::new(), - _id_type: std::marker::PhantomData + _id_type: std::marker::PhantomData, } } @@ -42,11 +42,15 @@ where } pub fn remove_by_id(&mut self, id: Id) -> Option
{ - self.mapping.remove_by_left(&id.into()).map(|(_,details)| details) + self.mapping + .remove_by_left(&id.into()) + .map(|(_, details)| details) } pub fn remove_by_details(&mut self, details: &Details) -> Option { - self.mapping.remove_by_right(&details).map(|(id,_)| id.into()) + self.mapping + .remove_by_right(&details) + .map(|(id, _)| id.into()) } pub fn clear(&mut self) { @@ -54,6 +58,8 @@ where } pub fn iter(&self) -> impl Iterator { - self.mapping.iter().map(|(&id, details)| (id.into(), details)) + self.mapping + .iter() + .map(|(&id, details)| (id.into(), details)) } -} \ No newline at end of file +} diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index 99cc6f4..f1fe33b 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -4,19 +4,19 @@ pub struct DenseMap { /// All items items: Vec>, /// Our ID type - _id_type: std::marker::PhantomData + _id_type: std::marker::PhantomData, } impl DenseMap where Id: From + Copy, - usize: From + usize: From, { pub fn new() -> Self { DenseMap { retired: Vec::new(), items: Vec::new(), - _id_type: std::marker::PhantomData + _id_type: std::marker::PhantomData, } } @@ -90,12 +90,8 @@ where /// Return the next Id that will be assigned. pub fn next_id(&self) -> usize { match self.retired.last() { - Some(id) => { - *id - } - None => { - self.items.len() - } + Some(id) => *id, + None => self.items.len(), } } } diff --git a/backend/common/src/id_type.rs b/backend/common/src/id_type.rs index b573b98..669fed6 100644 --- a/backend/common/src/id_type.rs +++ b/backend/common/src/id_type.rs @@ -33,7 +33,7 @@ mod test { #[test] fn create_and_use_new_id_type() { - id_type!{ + id_type! { Foo(usize) }; let _ = Foo::new(123); @@ -41,16 +41,15 @@ mod test { let _: usize = id.into(); // Check that these don't lead to compile errors: - id_type!{ + id_type! { Bar(usize); }; - id_type!{ + id_type! { pub Wibble(u64) }; - id_type!{ + id_type! { /// We can have doc strings, too pub(crate) Wobble(u16) }; } - -} \ No newline at end of file +} diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index 17cd827..f111174 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -1,8 +1,8 @@ use std::net::IpAddr; -use crate::node_message::Payload; -use crate::node_types::{NodeDetails, BlockHash}; use crate::id_type; +use crate::node_message::Payload; +use crate::node_types::{BlockHash, NodeDetails}; use serde::{Deserialize, Serialize}; id_type! { @@ -12,16 +12,15 @@ id_type! { pub ShardNodeId(usize); } - /// Message sent from the shard to the backend core #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromShardAggregator { /// Get information about a new node, passing IPv4 AddNode { - ip: Option, - node: NodeDetails, - local_id: ShardNodeId, - genesis_hash: BlockHash + ip: Option, + node: NodeDetails, + local_id: ShardNodeId, + genesis_hash: BlockHash, }, /// Send a message payload to update details for a node UpdateNode { @@ -29,23 +28,21 @@ pub enum FromShardAggregator { payload: Payload, }, /// Inform the core that a node has been removed - RemoveNode { - local_id: ShardNodeId - } + RemoveNode { local_id: ShardNodeId }, } /// Message sent form the backend core to the shard #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromTelemetryCore { - Mute { - local_id: ShardNodeId, - reason: MuteReason - } + Mute { + local_id: ShardNodeId, + reason: MuteReason, + }, } /// Why is the thing being muted? #[derive(Deserialize, Serialize, Debug, Clone)] pub enum MuteReason { Overquota, - ChainNotAllowed -} \ No newline at end of file + ChainNotAllowed, +} diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 9b55855..6d09ed2 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -1,20 +1,20 @@ -pub mod node_message; -pub mod internal_messages; -pub mod node_types; pub mod id_type; +pub mod internal_messages; +pub mod node_message; +pub mod node_types; pub mod time; -mod log_level; mod assign_id; -mod most_seen; mod dense_map; +mod log_level; mod mean_list; +mod most_seen; mod num_stats; // Export a bunch of common bits at the top level for ease of import: pub use assign_id::AssignId; pub use dense_map::DenseMap; +pub use log_level::LogLevel; pub use mean_list::MeanList; -pub use num_stats::NumStats; pub use most_seen::MostSeen; -pub use log_level::LogLevel; \ No newline at end of file +pub use num_stats::NumStats; diff --git a/backend/common/src/log_level.rs b/backend/common/src/log_level.rs index 1de2f05..73e2103 100644 --- a/backend/common/src/log_level.rs +++ b/backend/common/src/log_level.rs @@ -14,11 +14,11 @@ impl std::str::FromStr for LogLevel { fn from_str(s: &str) -> Result { match s { "error" => Ok(LogLevel::Error), - "warn" => Ok(LogLevel::Warn), - "info" => Ok(LogLevel::Info), + "warn" => Ok(LogLevel::Warn), + "info" => Ok(LogLevel::Info), "debug" => Ok(LogLevel::Debug), "trace" => Ok(LogLevel::Trace), - _ => Err("expected 'error', 'warn', 'info', 'debug' or 'trace'") + _ => Err("expected 'error', 'warn', 'info', 'debug' or 'trace'"), } } } @@ -33,4 +33,4 @@ impl From<&LogLevel> for log::LevelFilter { LogLevel::Trace => log::LevelFilter::Trace, } } -} \ No newline at end of file +} diff --git a/backend/common/src/most_seen.rs b/backend/common/src/most_seen.rs index 5e8da1a..393022d 100644 --- a/backend/common/src/most_seen.rs +++ b/backend/common/src/most_seen.rs @@ -7,25 +7,25 @@ use std::hash::Hash; pub struct MostSeen { current_best: T, current_count: usize, - others: HashMap + others: HashMap, } -impl Default for MostSeen { +impl Default for MostSeen { fn default() -> Self { Self { current_best: T::default(), current_count: 0, - others: HashMap::new() + others: HashMap::new(), } } } -impl MostSeen { +impl MostSeen { pub fn new(item: T) -> Self { Self { current_best: item, current_count: 1, - others: HashMap::new() + others: HashMap::new(), } } pub fn best(&self) -> &T { @@ -36,7 +36,7 @@ impl MostSeen { } } -impl MostSeen { +impl MostSeen { pub fn insert(&mut self, item: &T) -> ChangeResult { if &self.current_best == item { // Item already the best one; bump count. @@ -50,9 +50,7 @@ impl MostSeen { // Is item now the best? if *item_count > self.current_count { - let (mut item, mut count) = self.others - .remove_entry(item) - .expect("item added above"); + let (mut item, mut count) = self.others.remove_entry(item).expect("item added above"); // Swap the current best for the new best: std::mem::swap(&mut item, &mut self.current_best); @@ -72,13 +70,11 @@ impl MostSeen { self.current_count = self.current_count.saturating_sub(1); // Is there a new best? - let other_best = self.others - .iter() - .max_by_key(|f| f.1); + let other_best = self.others.iter().max_by_key(|f| f.1); let (other_item, &other_count) = match other_best { Some(item) => item, - None => { return ChangeResult::NoChange } + None => return ChangeResult::NoChange, }; if other_count > self.current_count { @@ -87,7 +83,8 @@ impl MostSeen { // instead, but most of the time there is no change, so I'm // aiming to keep that path cheaper. let other_item = other_item.clone(); - let (mut other_item, mut other_count) = self.others + let (mut other_item, mut other_count) = self + .others .remove_entry(&other_item) .expect("item returned above, so def exists"); @@ -113,19 +110,19 @@ impl MostSeen { } /// Record the result of adding/removing an entry -#[derive(Clone,Copy)] +#[derive(Clone, Copy)] pub enum ChangeResult { /// The best item has remained the same. NoChange, /// There is a new best item now. - NewMostSeenItem + NewMostSeenItem, } impl ChangeResult { pub fn has_changed(self) -> bool { match self { ChangeResult::NewMostSeenItem => true, - ChangeResult::NoChange => false + ChangeResult::NoChange => false, } } } @@ -214,7 +211,7 @@ mod test { a.insert(&"Second"); a.insert(&"Second"); // 3 - a.insert(&"First"); // 2 + a.insert(&"First"); // 2 assert_eq!(*a.best(), "Second"); assert_eq!(a.best_count(), 3); @@ -231,5 +228,4 @@ mod test { assert_eq!(a.best_count(), 2); assert_eq!(*a.best(), "First"); // First is now ahead } - -} \ No newline at end of file +} diff --git a/backend/common/src/node_message.rs b/backend/common/src/node_message.rs index 5c84391..25c7cd9 100644 --- a/backend/common/src/node_message.rs +++ b/backend/common/src/node_message.rs @@ -5,13 +5,8 @@ pub type NodeMessageId = u64; #[derive(Serialize, Deserialize, Debug)] pub enum NodeMessage { - V1 { - payload: Payload, - }, - V2 { - id: NodeMessageId, - payload: Payload, - }, + V1 { payload: Payload }, + V2 { id: NodeMessageId, payload: Payload }, } impl NodeMessage { @@ -26,8 +21,7 @@ impl NodeMessage { /// Return the payload associated with the message. pub fn into_payload(self) -> Payload { match self { - NodeMessage::V1 { payload, .. } | - NodeMessage::V2 { payload, .. } => payload, + NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload, } } } @@ -133,7 +127,8 @@ mod tests { // we test the different types we want to (de)serialize ourselves. We just need to test each // type, not each variant. fn bincode_can_serialize_and_deserialize<'de, T>(item: T) - where T: Serialize + serde::de::DeserializeOwned + where + T: Serialize + serde::de::DeserializeOwned, { let bytes = bincode::serialize(&item).expect("Serialization should work"); let _: T = bincode::deserialize(&bytes).expect("Deserialization should work"); @@ -141,111 +136,95 @@ mod tests { #[test] fn bincode_can_serialize_and_deserialize_node_message_system_connected() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::SystemConnected(SystemConnected { - genesis_hash: BlockHash::zero(), - node: NodeDetails { - chain: "foo".into(), - name: "foo".into(), - implementation: "foo".into(), - version: "foo".into(), - validator: None, - network_id: None, - startup_time: None, - }, - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::SystemConnected(SystemConnected { + genesis_hash: BlockHash::zero(), + node: NodeDetails { + chain: "foo".into(), + name: "foo".into(), + implementation: "foo".into(), + version: "foo".into(), + validator: None, + network_id: None, + startup_time: None, + }, + }), + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_system_interval() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::SystemInterval(SystemInterval { - peers: None, - txcount: None, - bandwidth_upload: None, - bandwidth_download: None, - finalized_height: None, - finalized_hash: None, - block: None, - used_state_cache_size: None, - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::SystemInterval(SystemInterval { + peers: None, + txcount: None, + bandwidth_upload: None, + bandwidth_download: None, + finalized_height: None, + finalized_hash: None, + block: None, + used_state_cache_size: None, + }), + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_block_import() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::BlockImport(Block { - hash: BlockHash([0; 32]), - height: 0, - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::BlockImport(Block { + hash: BlockHash([0; 32]), + height: 0, + }), + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_notify_finalized() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::NotifyFinalized(Finalized { - hash: BlockHash::zero(), - height: "foo".into(), - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::NotifyFinalized(Finalized { + hash: BlockHash::zero(), + height: "foo".into(), + }), + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_tx_pool_import() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::TxPoolImport - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::TxPoolImport, + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_afg_finalized() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::AfgFinalized(AfgFinalized { - finalized_hash: BlockHash::zero(), - finalized_number: "foo".into(), - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::AfgFinalized(AfgFinalized { + finalized_hash: BlockHash::zero(), + finalized_number: "foo".into(), + }), + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_afg_received() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::AfgReceivedPrecommit(AfgReceived { - target_hash: BlockHash::zero(), - target_number: "foo".into(), - voter: None, - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::AfgReceivedPrecommit(AfgReceived { + target_hash: BlockHash::zero(), + target_number: "foo".into(), + voter: None, + }), + }); } #[test] fn bincode_can_serialize_and_deserialize_node_message_afg_authority_set() { - bincode_can_serialize_and_deserialize( - NodeMessage::V1 { - payload: Payload::AfgAuthoritySet(AfgAuthoritySet { - authority_id: "foo".into(), - authorities: "foo".into(), - authority_set_id: "foo".into(), - }) - } - ); + bincode_can_serialize_and_deserialize(NodeMessage::V1 { + payload: Payload::AfgAuthoritySet(AfgAuthoritySet { + authority_id: "foo".into(), + authorities: "foo".into(), + authority_set_id: "foo".into(), + }), + }); } #[test] diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index afc274a..56f4b15 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -1,10 +1,15 @@ -use common::{internal_messages::{self, ShardNodeId}, node_message, AssignId, node_types::BlockHash}; -use std::sync::Arc; -use std::sync::atomic::AtomicU64; +use crate::connection::{create_ws_connection, Message}; +use common::{ + internal_messages::{self, ShardNodeId}, + node_message, + node_types::BlockHash, + AssignId, +}; use futures::{channel::mpsc, future}; -use futures::{ Sink, SinkExt, StreamExt }; -use std::collections::{ HashSet }; -use crate::connection::{ create_ws_connection, Message }; +use futures::{Sink, SinkExt, StreamExt}; +use std::collections::HashSet; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; /// A unique Id is assigned per websocket connection (or more accurately, /// per thing-that-subscribes-to-the-aggregator). That connection might send @@ -16,18 +21,18 @@ type ConnId = u64; /// from the telemetry core. This can be private since the only /// external messages are via subscriptions that take /// [`FromWebsocket`] instances. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] enum ToAggregator { DisconnectedFromTelemetryCore, ConnectedToTelemetryCore, FromWebsocket(ConnId, FromWebsocket), - FromTelemetryCore(internal_messages::FromTelemetryCore) + FromTelemetryCore(internal_messages::FromTelemetryCore), } /// An incoming socket connection can provide these messages. /// Until a node has been Added via [`FromWebsocket::Add`], /// messages from it will be ignored. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub enum FromWebsocket { /// Fire this when the connection is established. Initialize { @@ -35,22 +40,22 @@ pub enum FromWebsocket { /// the websocket connection and force the node to reconnect /// so that it sends its system info again incase the telemetry /// core has restarted. - close_connection: mpsc::Sender<()> + close_connection: mpsc::Sender<()>, }, /// Tell the aggregator about a new node. Add { message_id: node_message::NodeMessageId, ip: Option, node: common::node_types::NodeDetails, - genesis_hash: BlockHash + genesis_hash: BlockHash, }, /// Update/pass through details about a node. Update { message_id: node_message::NodeMessageId, - payload: node_message::Payload + payload: node_message::Payload, }, /// Make a note when the node disconnects. - Disconnected + Disconnected, } pub type FromAggregator = internal_messages::FromShardAggregator; @@ -67,7 +72,7 @@ struct AggregatorInternal { /// Send messages to the aggregator from websockets via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::Sender + tx_to_aggregator: mpsc::Sender, } impl Aggregator { @@ -77,21 +82,21 @@ impl Aggregator { // Map responses from our connection into messages that will be sent to the aggregator: let tx_from_connection = tx_to_aggregator.clone().with(|msg| { - future::ok::<_,mpsc::SendError>(match msg { + future::ok::<_, mpsc::SendError>(match msg { Message::Connected => ToAggregator::ConnectedToTelemetryCore, Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, - Message::Data(data) => ToAggregator::FromTelemetryCore(data) + Message::Data(data) => ToAggregator::FromTelemetryCore(data), }) }); // Establish a resiliant connection to the core (this retries as needed): - let tx_to_telemetry_core = create_ws_connection( - tx_from_connection, - telemetry_uri - ).await; + let tx_to_telemetry_core = create_ws_connection(tx_from_connection, telemetry_uri).await; // Handle any incoming messages in our handler loop: - tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_telemetry_core)); + tokio::spawn(Aggregator::handle_messages( + rx_from_external, + tx_to_telemetry_core, + )); // Return a handle to our aggregator: Ok(Aggregator(Arc::new(AggregatorInternal { @@ -103,8 +108,11 @@ impl Aggregator { // This is spawned into a separate task and handles any messages coming // in to the aggregator. If nobody is tolding the tx side of the channel // any more, this task will gracefully end. - async fn handle_messages(mut rx_from_external: mpsc::Receiver, mut tx_to_telemetry_core: mpsc::Sender) { - use internal_messages::{ FromShardAggregator, FromTelemetryCore }; + async fn handle_messages( + mut rx_from_external: mpsc::Receiver, + mut tx_to_telemetry_core: mpsc::Sender, + ) { + use internal_messages::{FromShardAggregator, FromTelemetryCore}; // Just as an optimisation, we can keep track of whether we're connected to the backend // or not, and ignore incoming messages while we aren't. @@ -140,41 +148,64 @@ impl Aggregator { connected_to_telemetry_core = true; log::info!("Connected to telemetry core"); - }, + } ToAggregator::DisconnectedFromTelemetryCore => { connected_to_telemetry_core = false; log::info!("Disconnected from telemetry core"); - }, - ToAggregator::FromWebsocket(_conn_id, FromWebsocket::Initialize { close_connection }) => { + } + ToAggregator::FromWebsocket( + _conn_id, + FromWebsocket::Initialize { close_connection }, + ) => { // We boot all connections on a reconnect-to-core to force new systemconnected // messages to be sent. We could boot on muting, but need to be careful not to boot // connections where we mute one set of messages it sends and not others. close_connections.push(close_connection); - }, - ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node, genesis_hash }) => { + } + ToAggregator::FromWebsocket( + conn_id, + FromWebsocket::Add { + message_id, + ip, + node, + genesis_hash, + }, + ) => { // Don't bother doing anything else if we're disconnected, since we'll force the // node to reconnect anyway when the backend does: - if !connected_to_telemetry_core { continue } + if !connected_to_telemetry_core { + continue; + } // Generate a new "local ID" for messages from this connection: let local_id = to_local_id.assign_id((conn_id, message_id)); // Send the message to the telemetry core with this local ID: - let _ = tx_to_telemetry_core.send(FromShardAggregator::AddNode { - ip, - node, - genesis_hash, - local_id - }).await; - }, - ToAggregator::FromWebsocket(conn_id, FromWebsocket::Update { message_id, payload }) => { + let _ = tx_to_telemetry_core + .send(FromShardAggregator::AddNode { + ip, + node, + genesis_hash, + local_id, + }) + .await; + } + ToAggregator::FromWebsocket( + conn_id, + FromWebsocket::Update { + message_id, + payload, + }, + ) => { // Ignore incoming messages if we're not connected to the backend: - if !connected_to_telemetry_core { continue } + if !connected_to_telemetry_core { + continue; + } // Get the local ID, ignoring the message if none match: let local_id = match to_local_id.get_id(&(conn_id, message_id)) { Some(id) => id, - None => continue + None => continue, }; // ignore the message if this node has been muted: @@ -183,28 +214,35 @@ impl Aggregator { } // Send the message to the telemetry core with this local ID: - let _ = tx_to_telemetry_core.send(FromShardAggregator::UpdateNode { - local_id, - payload - }).await; - }, + let _ = tx_to_telemetry_core + .send(FromShardAggregator::UpdateNode { local_id, payload }) + .await; + } ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { // Find all of the local IDs corresponding to the disconnected connection ID and // remove them, telling Telemetry Core about them too. This could be more efficient, // but the mapping isn't currently cached and it's not a super frequent op. - let local_ids_disconnected: Vec<_> = to_local_id.iter() + let local_ids_disconnected: Vec<_> = to_local_id + .iter() .filter(|(_, &(conn_id, _))| disconnected_conn_id == conn_id) .map(|(local_id, _)| local_id) .collect(); for local_id in local_ids_disconnected { to_local_id.remove_by_id(local_id); - let _ = tx_to_telemetry_core.send(FromShardAggregator::RemoveNode { local_id }).await; + let _ = tx_to_telemetry_core + .send(FromShardAggregator::RemoveNode { local_id }) + .await; } - }, - ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { local_id, reason: _ }) => { + } + ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { + local_id, + reason: _, + }) => { // Ignore incoming messages if we're not connected to the backend: - if !connected_to_telemetry_core { continue } + if !connected_to_telemetry_core { + continue; + } // Mute the local ID we've been told to: muted.insert(local_id); @@ -217,13 +255,17 @@ impl Aggregator { pub fn subscribe_node(&self) -> impl Sink + Unpin { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: - let conn_id: ConnId = self.0.conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let conn_id: ConnId = self + .0 + .conn_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let tx_to_aggregator = self.0.tx_to_aggregator.clone(); // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: - Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(ToAggregator::FromWebsocket(conn_id, msg)) - })) + Box::pin( + tx_to_aggregator + .with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }), + ) } -} \ No newline at end of file +} diff --git a/backend/shard/src/connection.rs b/backend/shard/src/connection.rs index 89df934..918e102 100644 --- a/backend/shard/src/connection.rs +++ b/backend/shard/src/connection.rs @@ -1,25 +1,28 @@ -use futures::channel::{ mpsc }; -use futures::{ Sink, SinkExt, StreamExt }; +use futures::channel::mpsc; +use futures::{Sink, SinkExt, StreamExt}; use tokio::net::TcpStream; -use tokio_util::compat::{ TokioAsyncReadCompatExt }; +use tokio_util::compat::TokioAsyncReadCompatExt; -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub enum Message { Connected, Disconnected, - Data(Out) + Data(Out), } /// Connect to a websocket server, retrying the connection if we're disconnected. /// - Sends messages when disconnected, reconnected or data received from the connection. /// - Returns a channel that allows you to send messages to the connection. /// - Messages all encoded/decoded from bincode. -pub async fn create_ws_connection(mut tx_to_external: S, telemetry_uri: http::Uri) -> mpsc::Sender +pub async fn create_ws_connection( + mut tx_to_external: S, + telemetry_uri: http::Uri, +) -> mpsc::Sender where S: Sink, Error = E> + Unpin + Send + Clone + 'static, E: std::fmt::Debug + std::fmt::Display + Send + 'static, In: serde::Serialize + Send + 'static, - Out: serde::de::DeserializeOwned + Send + 'static + Out: serde::de::DeserializeOwned + Send + 'static, { // Set up a proxy channel to relay messages to the telemetry core, and return one end of it. // Once a connection to the backend is established, we pass messages along to it. If the connection @@ -42,7 +45,8 @@ where connected = true; // Inform the handler loop that we've reconnected. - tx_to_external.send(Message::Connected) + tx_to_external + .send(Message::Connected) .await .expect("must be able to send reconnect msg"); @@ -51,14 +55,20 @@ where if let Err(e) = tx_to_connection.send(msg).await { // Issue forwarding a message to the telemetry core? // Give up and try to reconnect on the next loop iteration. - log::error!("Error sending message to websocker server (will reconnect): {}", e); + log::error!( + "Error sending message to websocker server (will reconnect): {}", + e + ); break; } } - }, + } Err(e) => { // Issue connecting? Wait and try again on the next loop iteration. - log::error!("Error connecting to websocker server (will reconnect): {}", e); + log::error!( + "Error connecting to websocker server (will reconnect): {}", + e + ); } }; @@ -79,15 +89,18 @@ where /// This spawns a connection to a websocket server, serializing/deserialziing /// from bincode as messages are sent or received. -async fn create_ws_connection_no_retry(mut tx_to_external: S, telemetry_uri: http::Uri) -> anyhow::Result> +async fn create_ws_connection_no_retry( + mut tx_to_external: S, + telemetry_uri: http::Uri, +) -> anyhow::Result> where S: Sink, Error = E> + Unpin + Send + 'static, E: std::fmt::Debug + std::fmt::Display, In: serde::Serialize + Send + 'static, - Out: serde::de::DeserializeOwned + Send + 'static + Out: serde::de::DeserializeOwned + Send + 'static, { - use soketto::handshake::{Client, ServerResponse}; use bincode::Options; + use soketto::handshake::{Client, ServerResponse}; let host = telemetry_uri.host().unwrap_or("127.0.0.1"); let port = telemetry_uri.port_u16().unwrap_or(8000); @@ -100,9 +113,13 @@ where let mut client = Client::new(socket.compat(), host, &path); let (mut ws_to_connection, mut ws_from_connection) = match client.handshake().await? { ServerResponse::Accepted { .. } => client.into_builder().finish(), - ServerResponse::Redirect { status_code, .. } | - ServerResponse::Rejected { status_code } => { - return Err(anyhow::anyhow!("Failed to connect to {}{}, status code: {}", host, path, status_code)); + ServerResponse::Redirect { status_code, .. } | ServerResponse::Rejected { status_code } => { + return Err(anyhow::anyhow!( + "Failed to connect to {}{}, status code: {}", + host, + path, + status_code + )); } }; @@ -116,7 +133,10 @@ where if let Err(e) = ws_from_connection.receive_data(&mut data).await { // Couldn't receive data may mean all senders are gone, so log // the error and shut this down: - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to receive data: {}", + e + ); return; } @@ -126,10 +146,13 @@ where if let Err(e) = tx_to_external.send(Message::Data(msg)).await { // Failure to send to our loop likely means it's hit an // issue and shut down, so bail on this loop as well: - log::error!("Shutting down websocket connection: Failed to send data out: {}", e); + log::error!( + "Shutting down websocket connection: Failed to send data out: {}", + e + ); return; } - }, + } Err(err) => { // Log the error but otherwise ignore it and keep running: log::warn!("Failed to decode message from Backend Core: {:?}", err); @@ -150,11 +173,17 @@ where // Any errors sending the message leads to this task ending, which should cascade to // the entire connection being ended. if let Err(e) = ws_to_connection.send_binary_mut(bytes).await { - log::error!("Shutting down websocket connection: Failed to send data in: {}", e); + log::error!( + "Shutting down websocket connection: Failed to send data in: {}", + e + ); return; } if let Err(e) = ws_to_connection.flush().await { - log::error!("Shutting down websocket connection: Failed to flush data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to flush data: {}", + e + ); return; } } @@ -163,4 +192,4 @@ where // We return a channel that you can send messages down in order to have // them sent to the telemetry core: Ok(tx_to_connection) -} \ No newline at end of file +} diff --git a/backend/shard/src/json_message/hash.rs b/backend/shard/src/json_message/hash.rs index b075ee1..23b583f 100644 --- a/backend/shard/src/json_message/hash.rs +++ b/backend/shard/src/json_message/hash.rs @@ -1,7 +1,7 @@ +use serde::de::{self, Deserialize, Deserializer, SeqAccess, Unexpected, Visitor}; +use serde::ser::{Serialize, Serializer}; use std::fmt::{self, Debug, Display}; use std::str::FromStr; -use serde::ser::{Serialize, Serializer}; -use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor, SeqAccess}; const HASH_BYTES: usize = 32; @@ -28,7 +28,9 @@ impl<'de> Visitor<'de> for HashVisitor { type Value = Hash; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("byte array of length 32, or hexidecimal string of 32 bytes beginning with 0x") + formatter.write_str( + "byte array of length 32, or hexidecimal string of 32 bytes beginning with 0x", + ) } fn visit_str(self, value: &str) -> Result @@ -65,7 +67,7 @@ impl<'de> Visitor<'de> for HashVisitor { for (i, byte) in hash.iter_mut().enumerate() { match seq.next_element()? { Some(b) => *byte = b, - None => return Err(de::Error::invalid_length(i, &"an array of 32 bytes")) + None => return Err(de::Error::invalid_length(i, &"an array of 32 bytes")), } } @@ -176,7 +178,6 @@ mod tests { assert_eq!(hash, DUMMY); } - #[test] fn deserialize_json_array_too_short() { let json = r#"[222,173,190,239,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]"#; diff --git a/backend/shard/src/json_message/mod.rs b/backend/shard/src/json_message/mod.rs index 22f233f..60b5d3b 100644 --- a/backend/shard/src/json_message/mod.rs +++ b/backend/shard/src/json_message/mod.rs @@ -3,5 +3,5 @@ mod hash; mod node_message; +pub use hash::Hash; pub use node_message::*; -pub use hash::Hash; \ No newline at end of file diff --git a/backend/shard/src/json_message/node_message.rs b/backend/shard/src/json_message/node_message.rs index 80386d2..41ba1b0 100644 --- a/backend/shard/src/json_message/node_message.rs +++ b/backend/shard/src/json_message/node_message.rs @@ -4,9 +4,9 @@ //! compatibility with the input data when we make changes to our internal data //! structures (for example, to support bincode better). use super::hash::Hash; -use serde::{Deserialize}; use common::node_message as internal; use common::node_types; +use serde::Deserialize; /// This struct represents a telemetry message sent from a node as /// a JSON payload. Since JSON is self describing, we can use attributes @@ -36,11 +36,12 @@ pub enum NodeMessage { impl From for internal::NodeMessage { fn from(msg: NodeMessage) -> Self { match msg { - NodeMessage::V1 { payload } => { - internal::NodeMessage::V1 { payload: payload.into() } + NodeMessage::V1 { payload } => internal::NodeMessage::V1 { + payload: payload.into(), }, - NodeMessage::V2 { id, payload } => { - internal::NodeMessage::V2 { id, payload: payload.into() } + NodeMessage::V2 { id, payload } => internal::NodeMessage::V2 { + id, + payload: payload.into(), }, } } @@ -80,45 +81,19 @@ pub enum Payload { impl From for internal::Payload { fn from(msg: Payload) -> Self { match msg { - Payload::SystemConnected(m) => { - internal::Payload::SystemConnected(m.into()) - }, - Payload::SystemInterval(m) => { - internal::Payload::SystemInterval(m.into()) - }, - Payload::BlockImport(m) => { - internal::Payload::BlockImport(m.into()) - }, - Payload::NotifyFinalized(m) => { - internal::Payload::NotifyFinalized(m.into()) - }, - Payload::TxPoolImport => { - internal::Payload::TxPoolImport - }, - Payload::AfgFinalized(m) => { - internal::Payload::AfgFinalized(m.into()) - }, - Payload::AfgReceivedPrecommit(m) => { - internal::Payload::AfgReceivedPrecommit(m.into()) - }, - Payload::AfgReceivedPrevote(m) => { - internal::Payload::AfgReceivedPrevote(m.into()) - }, - Payload::AfgReceivedCommit(m) => { - internal::Payload::AfgReceivedCommit(m.into()) - }, - Payload::AfgAuthoritySet(m) => { - internal::Payload::AfgAuthoritySet(m.into()) - }, - Payload::AfgFinalizedBlocksUpTo => { - internal::Payload::AfgFinalizedBlocksUpTo - }, - Payload::AuraPreSealedBlock => { - internal::Payload::AuraPreSealedBlock - }, - Payload::PreparedBlockForProposing => { - internal::Payload::PreparedBlockForProposing - }, + Payload::SystemConnected(m) => internal::Payload::SystemConnected(m.into()), + Payload::SystemInterval(m) => internal::Payload::SystemInterval(m.into()), + Payload::BlockImport(m) => internal::Payload::BlockImport(m.into()), + Payload::NotifyFinalized(m) => internal::Payload::NotifyFinalized(m.into()), + Payload::TxPoolImport => internal::Payload::TxPoolImport, + Payload::AfgFinalized(m) => internal::Payload::AfgFinalized(m.into()), + Payload::AfgReceivedPrecommit(m) => internal::Payload::AfgReceivedPrecommit(m.into()), + Payload::AfgReceivedPrevote(m) => internal::Payload::AfgReceivedPrevote(m.into()), + Payload::AfgReceivedCommit(m) => internal::Payload::AfgReceivedCommit(m.into()), + Payload::AfgAuthoritySet(m) => internal::Payload::AfgAuthoritySet(m.into()), + Payload::AfgFinalizedBlocksUpTo => internal::Payload::AfgFinalizedBlocksUpTo, + Payload::AuraPreSealedBlock => internal::Payload::AuraPreSealedBlock, + Payload::PreparedBlockForProposing => internal::Payload::PreparedBlockForProposing, } } } @@ -134,7 +109,7 @@ impl From for internal::SystemConnected { fn from(msg: SystemConnected) -> Self { internal::SystemConnected { genesis_hash: msg.genesis_hash.into(), - node: msg.node.into() + node: msg.node.into(), } } } @@ -243,7 +218,7 @@ impl From for node_types::Block { fn from(block: Block) -> Self { node_types::Block { hash: block.hash.into(), - height: block.height + height: block.height, } } } diff --git a/backend/shard/src/main.rs b/backend/shard/src/main.rs index 89104f5..6073566 100644 --- a/backend/shard/src/main.rs +++ b/backend/shard/src/main.rs @@ -1,19 +1,19 @@ mod aggregator; mod connection; -mod real_ip; mod json_message; +mod real_ip; use std::net::IpAddr; -use structopt::StructOpt; -use http::Uri; -use simple_logger::SimpleLogger; -use futures::{StreamExt, SinkExt, channel::mpsc}; -use warp::Filter; -use warp::filters::ws; +use aggregator::{Aggregator, FromWebsocket}; use common::{node_message, LogLevel}; -use aggregator::{ Aggregator, FromWebsocket }; +use futures::{channel::mpsc, SinkExt, StreamExt}; +use http::Uri; use real_ip::real_ip; +use simple_logger::SimpleLogger; +use structopt::StructOpt; +use warp::filters::ws; +use warp::Filter; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -27,25 +27,17 @@ struct Opts { /// This is the socket address that this shard is listening to. This is restricted to /// localhost (127.0.0.1) by default and should be fine for most use cases. If /// you are using Telemetry in a container, you likely want to set this to '0.0.0.0:8000' - #[structopt( - short = "l", - long = "listen", - default_value = "127.0.0.1:8001", - )] + #[structopt(short = "l", long = "listen", default_value = "127.0.0.1:8001")] socket: std::net::SocketAddr, /// The desired log level; one of 'error', 'warn', 'info', 'debug' or 'trace', where /// 'error' only logs errors and 'trace' logs everything. - #[structopt( - required = false, - long = "log", - default_value = "info", - )] + #[structopt(required = false, long = "log", default_value = "info")] log_level: LogLevel, /// Url to the Backend Core endpoint accepting shard connections #[structopt( - short = "c", - long = "core", - default_value = "ws://127.0.0.1:8000/shard_submit/", + short = "c", + long = "core", + default_value = "ws://127.0.0.1:8000/shard_submit/" )] core_url: Uri, } @@ -60,10 +52,7 @@ async fn main() { .init() .expect("Must be able to start a logger"); - log::info!( - "Starting Telemetry Shard version: {}", - VERSION - ); + log::info!("Starting Telemetry Shard version: {}", VERSION); if let Err(e) = start_server(opts).await { log::error!("Error starting server: {}", e); @@ -72,26 +61,21 @@ async fn main() { /// Declare our routes and start the server. async fn start_server(opts: Opts) -> anyhow::Result<()> { - let aggregator = Aggregator::spawn(opts.core_url).await?; // Handle requests to /health by returning OK. - let health_route = - warp::path("health") - .map(|| "OK"); + let health_route = warp::path("health").map(|| "OK"); // Handle websocket requests to /submit. - let ws_route = - warp::path("submit") - .and(warp::ws()) - .and(real_ip()) - .map(move |ws: ws::Ws, addr: Option| { + let ws_route = warp::path("submit").and(warp::ws()).and(real_ip()).map( + move |ws: ws::Ws, addr: Option| { // Send messages from the websocket connection to this sink // to have them pass to the aggregator. let tx_to_aggregator = aggregator.subscribe_node(); log::info!("Opening /submit connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { - let (mut tx_to_aggregator, websocket) = handle_websocket_connection(websocket, tx_to_aggregator, addr).await; + let (mut tx_to_aggregator, websocket) = + handle_websocket_connection(websocket, tx_to_aggregator, addr).await; log::info!("Closing /submit connection from {:?}", addr); // Tell the aggregator that this connection has closed, so it can tidy up. let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; @@ -99,7 +83,8 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { // a ws::Message using `ws::Message::close_with`, rather than using this method: let _ = websocket.close().await; }) - }); + }, + ); // Merge the routes and start our server: let routes = ws_route.or(health_route); @@ -108,8 +93,13 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { } /// This takes care of handling messages from an established socket connection. -async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option) -> (S, ws::WebSocket) - where S: futures::Sink + Unpin +async fn handle_websocket_connection( + mut websocket: ws::WebSocket, + mut tx_to_aggregator: S, + addr: Option, +) -> (S, ws::WebSocket) +where + S: futures::Sink + Unpin, { // This could be a oneshot channel, but it's useful to be able to clone // messages, and we can't clone oneshot channel senders. @@ -117,7 +107,7 @@ async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_ // Tell the aggregator about this new connection, and give it a way to close this connection: let init_msg = FromWebsocket::Initialize { - close_connection: close_connection_tx + close_connection: close_connection_tx, }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); @@ -179,13 +169,15 @@ async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_ /// Deserialize an incoming websocket message, returning an error if something /// fatal went wrong, [`Some`] message if all went well, and [`None`] if a non-fatal /// issue was encountered and the message should simply be ignored. -fn deserialize_ws_message(msg: Result) -> anyhow::Result> { +fn deserialize_ws_message( + msg: Result, +) -> anyhow::Result> { // If we see any errors, log them and end our loop: let msg = match msg { Err(e) => { return Err(anyhow::anyhow!("Error in node websocket connection: {}", e)); - }, - Ok(msg) => msg + } + Ok(msg) => msg, }; // If the message isn't something we want to handle, just ignore it. @@ -202,11 +194,11 @@ fn deserialize_ws_message(msg: Result) -> anyhow::Resu // let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); // let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); // log::warn!("Failed to parse node message ({}): {}", msg_start, e); - return Ok(None) + return Ok(None); } }; // Pull relevant details from the message: let node_message: node_message::NodeMessage = node_message.into(); Ok(Some(node_message)) -} \ No newline at end of file +} diff --git a/backend/shard/src/real_ip.rs b/backend/shard/src/real_ip.rs index 7d64af4..604a7ff 100644 --- a/backend/shard/src/real_ip.rs +++ b/backend/shard/src/real_ip.rs @@ -1,6 +1,6 @@ -use std::net::{ SocketAddr, IpAddr }; -use warp::filters::header; +use std::net::{IpAddr, SocketAddr}; use warp::filters::addr; +use warp::filters::header; use warp::Filter; /** @@ -24,7 +24,8 @@ If that _still_ doesn't work, fall back to the socket address of the connection. Return `None` if all of this fails to yield an address. */ -pub fn real_ip() -> impl warp::Filter,), Error = warp::Rejection> + Clone { +pub fn real_ip() -> impl warp::Filter,), Error = warp::Rejection> + Clone +{ header::optional("forwarded") .and(header::optional("x-forwarded-for")) .and(header::optional("x-real-ip")) @@ -40,12 +41,16 @@ fn pick_best_ip_from_options( // X-Real-IP header value (if present) real_ip: Option, // socket address (if known) - addr: Option + addr: Option, ) -> Option { - let realip = forwarded.as_ref().and_then(|val| get_first_addr_from_forwarded_header(val)) + let realip = forwarded + .as_ref() + .and_then(|val| get_first_addr_from_forwarded_header(val)) .or_else(|| { // fall back to X-Forwarded-For - forwarded_for.as_ref().and_then(|val| get_first_addr_from_x_forwarded_for_header(val)) + forwarded_for + .as_ref() + .and_then(|val| get_first_addr_from_x_forwarded_for_header(val)) }) .or_else(|| { // fall back to X-Real-IP @@ -54,7 +59,8 @@ fn pick_best_ip_from_options( .and_then(|ip| { // Try parsing assuming it may have a port first, // and then assuming it doesn't. - ip.parse::().map(|s| s.ip()) + ip.parse::() + .map(|s| s.ip()) .or_else(|_| ip.parse::()) .ok() }) @@ -110,7 +116,10 @@ mod test { fn get_addr_from_forwarded_rfc_examples() { let examples = vec![ (r#"for="_gazonk""#, "_gazonk"), - (r#"For="[2001:db8:cafe::17]:4711""#, "[2001:db8:cafe::17]:4711"), + ( + r#"For="[2001:db8:cafe::17]:4711""#, + "[2001:db8:cafe::17]:4711", + ), (r#"for=192.0.2.60;proto=http;by=203.0.113.43"#, "192.0.2.60"), (r#"for=192.0.2.43, for=198.51.100.17"#, "192.0.2.43"), ]; @@ -119,9 +128,9 @@ mod test { assert_eq!( get_first_addr_from_forwarded_header(value), Some(expected), - "Header value: {}", value + "Header value: {}", + value ); } } - -} \ No newline at end of file +} diff --git a/backend/telemetry/src/aggregator/aggregator.rs b/backend/telemetry/src/aggregator/aggregator.rs index dcba57c..a8ae177 100644 --- a/backend/telemetry/src/aggregator/aggregator.rs +++ b/backend/telemetry/src/aggregator/aggregator.rs @@ -1,12 +1,12 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use futures::channel::mpsc; -use futures::{ future, Sink, SinkExt }; use super::inner_loop; use crate::find_location::find_location; use crate::state::NodeId; use common::id_type; +use futures::channel::mpsc; +use futures::{future, Sink, SinkExt}; use std::net::Ipv4Addr; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; id_type! { /// A unique Id is assigned per websocket connection (or more accurately, @@ -28,7 +28,7 @@ struct AggregatorInternal { /// Send messages in to the aggregator from the outside via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::Sender + tx_to_aggregator: mpsc::Sender, } impl Aggregator { @@ -38,11 +38,17 @@ impl Aggregator { // Kick off a locator task to locate nodes, which hands back a channel to make location requests let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| { - future::ok::<_,mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation(node_id, msg)) + future::ok::<_, mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation( + node_id, msg, + )) })); // Handle any incoming messages in our handler loop: - tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_locator, denylist)); + tokio::spawn(Aggregator::handle_messages( + rx_from_external, + tx_to_locator, + denylist, + )); // Return a handle to our aggregator: Ok(Aggregator(Arc::new(AggregatorInternal { @@ -58,37 +64,54 @@ impl Aggregator { async fn handle_messages( rx_from_external: mpsc::Receiver, tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, - denylist: Vec + denylist: Vec, ) { - inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist).handle().await; + inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist) + .handle() + .await; } /// Return a sink that a shard can send messages into to be handled by the aggregator. - pub fn subscribe_shard(&self) -> impl Sink + Unpin { + pub fn subscribe_shard( + &self, + ) -> impl Sink + Unpin { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: - let shard_conn_id = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let shard_conn_id = self + .0 + .shard_conn_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let tx_to_aggregator = self.0.tx_to_aggregator.clone(); // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(inner_loop::ToAggregator::FromShardWebsocket(shard_conn_id.into(), msg)) + Ok(inner_loop::ToAggregator::FromShardWebsocket( + shard_conn_id.into(), + msg, + )) })) } /// Return a sink that a feed can send messages into to be handled by the aggregator. - pub fn subscribe_feed(&self) -> impl Sink + Unpin { + pub fn subscribe_feed( + &self, + ) -> impl Sink + Unpin { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: - let feed_conn_id = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let feed_conn_id = self + .0 + .feed_conn_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let tx_to_aggregator = self.0.tx_to_aggregator.clone(); // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(inner_loop::ToAggregator::FromFeedWebsocket(feed_conn_id.into(), msg)) + Ok(inner_loop::ToAggregator::FromFeedWebsocket( + feed_conn_id.into(), + msg, + )) })) } - -} \ No newline at end of file +} diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index 347ac9b..91d40d2 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -1,33 +1,32 @@ -use common::{ - internal_messages::{ - self, - ShardNodeId, - MuteReason - }, - node_types::BlockHash, - node_message, - time -}; -use bimap::BiMap; -use std::{net::{IpAddr, Ipv4Addr}, str::FromStr}; -use futures::channel::{ mpsc }; -use futures::{ SinkExt, StreamExt }; -use std::collections::{ HashMap, HashSet }; -use crate::state::{ self, State, NodeId }; -use crate::feed_message::{ self, FeedMessageSerializer }; -use crate::find_location; use super::aggregator::ConnId; +use crate::feed_message::{self, FeedMessageSerializer}; +use crate::find_location; +use crate::state::{self, NodeId, State}; +use bimap::BiMap; +use common::{ + internal_messages::{self, MuteReason, ShardNodeId}, + node_message, + node_types::BlockHash, + time, +}; +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; +use std::collections::{HashMap, HashSet}; +use std::{ + net::{IpAddr, Ipv4Addr}, + str::FromStr, +}; /// Incoming messages come via subscriptions, and end up looking like this. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub enum ToAggregator { FromShardWebsocket(ConnId, FromShardWebsocket), FromFeedWebsocket(ConnId, FromFeedWebsocket), - FromFindLocation(NodeId, find_location::Location) + FromFindLocation(NodeId, find_location::Location), } /// An incoming shard connection can send these messages to the aggregator. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub enum FromShardWebsocket { /// When the socket is opened, it'll send this first /// so that we have a way to communicate back to it. @@ -39,19 +38,17 @@ pub enum FromShardWebsocket { local_id: ShardNodeId, ip: Option, node: common::node_types::NodeDetails, - genesis_hash: common::node_types::BlockHash + genesis_hash: common::node_types::BlockHash, }, /// Update/pass through details about a node. Update { local_id: ShardNodeId, - payload: node_message::Payload + payload: node_message::Payload, }, /// Tell the aggregator that a node has been removed when it disconnects. - Remove { - local_id: ShardNodeId, - }, + Remove { local_id: ShardNodeId }, /// The shard is disconnected. - Disconnected + Disconnected, } /// The aggregator can these messages back to a shard connection. @@ -60,12 +57,12 @@ pub enum ToShardWebsocket { /// Mute messages to the core by passing the shard-local ID of them. Mute { local_id: ShardNodeId, - reason: internal_messages::MuteReason - } + reason: internal_messages::MuteReason, + }, } /// An incoming feed connection can send these messages to the aggregator. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub enum FromFeedWebsocket { /// When the socket is opened, it'll send this first /// so that we have a way to communicate back to it. @@ -76,19 +73,15 @@ pub enum FromFeedWebsocket { }, /// The feed can subscribe to a chain to receive /// messages relating to it. - Subscribe { - chain: Box - }, + Subscribe { chain: Box }, /// The feed wants finality info for the chain, too. SendFinality, /// The feed doesn't want any more finality info for the chain. NoMoreFinality, /// An explicit ping message. - Ping { - value: Box - }, + Ping { value: Box }, /// The feed is disconnected. - Disconnected + Disconnected, } // The frontend sends text based commands; parse them into these messages: @@ -96,23 +89,23 @@ impl FromStr for FromFeedWebsocket { type Err = anyhow::Error; fn from_str(s: &str) -> Result { let (cmd, value) = match s.find(':') { - Some(idx) => (&s[..idx], s[idx+1..].into()), - None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`")) + Some(idx) => (&s[..idx], s[idx + 1..].into()), + None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`")), }; match cmd { "ping" => Ok(FromFeedWebsocket::Ping { value }), "subscribe" => Ok(FromFeedWebsocket::Subscribe { chain: value }), "send-finality" => Ok(FromFeedWebsocket::SendFinality), "no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality), - _ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)) + _ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)), } } } /// The aggregator can these messages back to a feed connection. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub enum ToFeedWebsocket { - Bytes(Vec) + Bytes(Vec), } /// Instances of this are responsible for handling incoming and @@ -143,7 +136,7 @@ pub struct InnerLoop { feed_conn_id_finality: HashSet, /// Send messages here to make geographical location requests. - tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)> + tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, } impl InnerLoop { @@ -151,7 +144,7 @@ impl InnerLoop { pub fn new( rx_from_external: mpsc::Receiver, tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, - denylist: Vec + denylist: Vec, ) -> Self { InnerLoop { rx_from_external, @@ -162,7 +155,7 @@ impl InnerLoop { feed_conn_id_to_chain: HashMap::new(), chain_to_feed_conn_ids: HashMap::new(), feed_conn_id_finality: HashSet::new(), - tx_to_locator + tx_to_locator, } } @@ -172,10 +165,10 @@ impl InnerLoop { match msg { ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { self.handle_from_feed(feed_conn_id, msg).await - }, + } ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { self.handle_from_shard(shard_conn_id, msg).await - }, + } ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location).await } @@ -184,8 +177,13 @@ impl InnerLoop { } /// Handle messages that come from the node geographical locator. - async fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { - self.node_state.update_node_location(node_id, location.clone()); + async fn handle_from_find_location( + &mut self, + node_id: NodeId, + location: find_location::Location, + ) { + self.node_state + .update_node_location(node_id, location.clone()); if let Some(loc) = location { let mut feed_message_serializer = FeedMessageSerializer::new(); @@ -193,15 +191,20 @@ impl InnerLoop { node_id.get_chain_node_id().into(), loc.latitude, loc.longitude, - &loc.city + &loc.city, )); - let chain_genesis_hash = self.node_state + let chain_genesis_hash = self + .node_state .get_chain_by_node_id(node_id) .map(|chain| *chain.genesis_hash()); if let Some(chain_genesis_hash) = chain_genesis_hash { - self.finalize_and_broadcast_to_chain_feeds(&chain_genesis_hash, feed_message_serializer).await; + self.finalize_and_broadcast_to_chain_feeds( + &chain_genesis_hash, + feed_message_serializer, + ) + .await; } } } @@ -213,25 +216,34 @@ impl InnerLoop { match msg { FromShardWebsocket::Initialize { channel } => { self.shard_channels.insert(shard_conn_id, channel); - }, - FromShardWebsocket::Add { local_id, ip, node, genesis_hash } => { + } + FromShardWebsocket::Add { + local_id, + ip, + node, + genesis_hash, + } => { match self.node_state.add_node(genesis_hash, node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn.send(ToShardWebsocket::Mute { - local_id, - reason: MuteReason::ChainNotAllowed - }).await; + let _ = shard_conn + .send(ToShardWebsocket::Mute { + local_id, + reason: MuteReason::ChainNotAllowed, + }) + .await; } - }, + } state::AddNodeResult::ChainOverQuota => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn.send(ToShardWebsocket::Mute { - local_id, - reason: MuteReason::Overquota - }).await; + let _ = shard_conn + .send(ToShardWebsocket::Mute { + local_id, + reason: MuteReason::Overquota, + }) + .await; } - }, + } state::AddNodeResult::NodeAddedToChain(details) => { let node_id = details.id; @@ -246,66 +258,96 @@ impl InnerLoop { // Tell chain subscribers about the node we've just added: let mut feed_messages_for_chain = FeedMessageSerializer::new(); - feed_messages_for_chain.push(feed_message::AddedNode(node_id.get_chain_node_id().into(), &details.node)); - self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_messages_for_chain).await; + feed_messages_for_chain.push(feed_message::AddedNode( + node_id.get_chain_node_id().into(), + &details.node, + )); + self.finalize_and_broadcast_to_chain_feeds( + &genesis_hash, + feed_messages_for_chain, + ) + .await; // Tell everybody about the new node count and potential rename: let mut feed_messages_for_all = FeedMessageSerializer::new(); if has_chain_label_changed { - feed_messages_for_all.push(feed_message::RemovedChain(&old_chain_label)); + feed_messages_for_all + .push(feed_message::RemovedChain(&old_chain_label)); } - feed_messages_for_all.push(feed_message::AddedChain(&new_chain_label, chain_node_count)); - self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all).await; + feed_messages_for_all + .push(feed_message::AddedChain(&new_chain_label, chain_node_count)); + self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all) + .await; // Ask for the grographical location of the node. // Currently we only geographically locate IPV4 addresses so ignore IPV6. if let Some(IpAddr::V4(ip_v4)) = ip { let _ = self.tx_to_locator.send((node_id, ip_v4)).await; } - }, + } } - }, + } FromShardWebsocket::Remove { local_id } => { let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { Some((node_id, _)) => node_id, None => { - log::error!("Cannot find ID for node with shard/connectionId of {:?}/{:?}", shard_conn_id, local_id); - return + log::error!( + "Cannot find ID for node with shard/connectionId of {:?}/{:?}", + shard_conn_id, + local_id + ); + return; } }; self.remove_nodes_and_broadcast_result(Some(node_id)).await; - }, + } FromShardWebsocket::Update { local_id, payload } => { let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) { Some(id) => *id, None => { - log::error!("Cannot find ID for node with shard/connectionId of {:?}/{:?}", shard_conn_id, local_id); - return + log::error!( + "Cannot find ID for node with shard/connectionId of {:?}/{:?}", + shard_conn_id, + local_id + ); + return; } }; let mut feed_message_serializer = FeedMessageSerializer::new(); - let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer); + let broadcast_finality = + self.node_state + .update_node(node_id, payload, &mut feed_message_serializer); if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { let genesis_hash = *chain.genesis_hash(); if broadcast_finality { - self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await; + self.finalize_and_broadcast_to_chain_finality_feeds( + &genesis_hash, + feed_message_serializer, + ) + .await; } else { - self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await; + self.finalize_and_broadcast_to_chain_feeds( + &genesis_hash, + feed_message_serializer, + ) + .await; } } - }, + } FromShardWebsocket::Disconnected => { // Find all nodes associated with this shard connection ID: - let node_ids_to_remove: Vec = self.node_ids + let node_ids_to_remove: Vec = self + .node_ids .iter() .filter(|(_, &(this_shard_conn_id, _))| shard_conn_id == this_shard_conn_id) - .map(|(&node_id,_)| node_id) + .map(|(&node_id, _)| node_id) .collect(); // ... and remove them: - self.remove_nodes_and_broadcast_result(node_ids_to_remove).await; + self.remove_nodes_and_broadcast_result(node_ids_to_remove) + .await; } } } @@ -321,21 +363,19 @@ impl InnerLoop { let mut feed_serializer = FeedMessageSerializer::new(); feed_serializer.push(feed_message::Version(31)); for chain in self.node_state.iter_chains() { - feed_serializer.push(feed_message::AddedChain( - chain.label(), - chain.node_count() - )); + feed_serializer + .push(feed_message::AddedChain(chain.label(), chain.node_count())); } // Send this to the channel that subscribed: if let Some(bytes) = feed_serializer.into_finalized() { let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await; } - }, + } FromFeedWebsocket::Ping { value } => { let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) { Some(chan) => chan, - None => return + None => return, }; // Pong! @@ -344,11 +384,11 @@ impl InnerLoop { if let Some(bytes) = feed_serializer.into_finalized() { let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; } - }, + } FromFeedWebsocket::Subscribe { chain } => { let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) { Some(chan) => chan, - None => return + None => return, }; // Unsubscribe from previous chain if subscribed to one: @@ -364,13 +404,13 @@ impl InnerLoop { // Get old chain if there was one: let node_state = &self.node_state; - let old_chain = old_genesis_hash - .and_then(|hash| node_state.get_chain_by_genesis_hash(&hash)); + let old_chain = + old_genesis_hash.and_then(|hash| node_state.get_chain_by_genesis_hash(&hash)); // Get new chain, ignoring the rest if it doesn't exist. let new_chain = match self.node_state.get_chain_by_label(&chain) { Some(chain) => chain, - None => return + None => return, }; // Send messages to the feed about this subscription: @@ -380,14 +420,14 @@ impl InnerLoop { } feed_serializer.push(feed_message::SubscribedTo(new_chain.label())); feed_serializer.push(feed_message::TimeSync(time::now())); - feed_serializer.push(feed_message::BestBlock ( + feed_serializer.push(feed_message::BestBlock( new_chain.best_block().height, new_chain.timestamp(), - new_chain.average_block_time() + new_chain.average_block_time(), )); - feed_serializer.push(feed_message::BestFinalized ( + feed_serializer.push(feed_message::BestFinalized( new_chain.finalized_block().height, - new_chain.finalized_block().hash + new_chain.finalized_block().hash, )); for (idx, (chain_node_id, node)) in new_chain.iter_nodes().enumerate() { let chain_node_id = chain_node_id.into(); @@ -415,15 +455,19 @@ impl InnerLoop { // Actually make a note of the new chain subsciption: let new_genesis_hash = *new_chain.genesis_hash(); - self.feed_conn_id_to_chain.insert(feed_conn_id, new_genesis_hash); - self.chain_to_feed_conn_ids.entry(new_genesis_hash).or_default().insert(feed_conn_id); - }, + self.feed_conn_id_to_chain + .insert(feed_conn_id, new_genesis_hash); + self.chain_to_feed_conn_ids + .entry(new_genesis_hash) + .or_default() + .insert(feed_conn_id); + } FromFeedWebsocket::SendFinality => { self.feed_conn_id_finality.insert(feed_conn_id); - }, + } FromFeedWebsocket::NoMoreFinality => { self.feed_conn_id_finality.remove(&feed_conn_id); - }, + } FromFeedWebsocket::Disconnected => { // The feed has disconnected; clean up references to it: if let Some(chain) = self.feed_conn_id_to_chain.remove(&feed_conn_id) { @@ -431,18 +475,23 @@ impl InnerLoop { } self.feed_channels.remove(&feed_conn_id); self.feed_conn_id_finality.remove(&feed_conn_id); - }, + } } } /// Remove all of the node IDs provided and broadcast messages to feeds as needed. - async fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator) { - + async fn remove_nodes_and_broadcast_result( + &mut self, + node_ids: impl IntoIterator, + ) { // Group by chain to simplify the handling of feed messages: - let mut node_ids_per_chain: HashMap> = HashMap::new(); + let mut node_ids_per_chain: HashMap> = HashMap::new(); for node_id in node_ids.into_iter() { if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { - node_ids_per_chain.entry(*chain.genesis_hash()).or_default().push(node_id); + node_ids_per_chain + .entry(*chain.genesis_hash()) + .or_default() + .push(node_id); } } @@ -454,12 +503,14 @@ impl InnerLoop { self.remove_node( node_id, &mut feed_messages_for_chain, - &mut feed_messages_for_all + &mut feed_messages_for_all, ); } - self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_messages_for_chain).await; + self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_messages_for_chain) + .await; } - self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all).await; + self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all) + .await; } /// Remove a single node by its ID, pushing any messages we'd want to send @@ -469,7 +520,7 @@ impl InnerLoop { &mut self, node_id: NodeId, feed_for_chain: &mut FeedMessageSerializer, - feed_for_all: &mut FeedMessageSerializer + feed_for_all: &mut FeedMessageSerializer, ) { // Remove our top level association (this may already have been done). self.node_ids.remove_by_left(&node_id); @@ -478,41 +529,49 @@ impl InnerLoop { Some(remove_details) => remove_details, None => { log::error!("Could not find node {:?}", node_id); - return + return; } }; // The chain has been removed (no nodes left in it, or it was renamed): if removed_details.chain_node_count == 0 || removed_details.has_chain_label_changed { - feed_for_all.push(feed_message::RemovedChain( - &removed_details.old_chain_label - )); + feed_for_all.push(feed_message::RemovedChain(&removed_details.old_chain_label)); } // If the chain still exists, tell everybody about the new label or updated node count: if removed_details.chain_node_count != 0 { - feed_for_all.push( - feed_message::AddedChain(&removed_details.new_chain_label, removed_details.chain_node_count) - ); + feed_for_all.push(feed_message::AddedChain( + &removed_details.new_chain_label, + removed_details.chain_node_count, + )); } // Assuming the chain hasn't gone away, tell chain subscribers about the node removal if removed_details.chain_node_count != 0 { - feed_for_chain.push( - feed_message::RemovedNode(node_id.get_chain_node_id().into()) - ); + feed_for_chain.push(feed_message::RemovedNode( + node_id.get_chain_node_id().into(), + )); } } /// Finalize a [`FeedMessageSerializer`] and broadcast the result to feeds for the chain. - async fn finalize_and_broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, serializer: FeedMessageSerializer) { + async fn finalize_and_broadcast_to_chain_feeds( + &mut self, + genesis_hash: &BlockHash, + serializer: FeedMessageSerializer, + ) { if let Some(bytes) = serializer.into_finalized() { - self.broadcast_to_chain_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)).await; + self.broadcast_to_chain_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)) + .await; } } /// Send a message to all chain feeds. - async fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { + async fn broadcast_to_chain_feeds( + &mut self, + genesis_hash: &BlockHash, + message: ToFeedWebsocket, + ) { if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { for &feed_id in feeds { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { @@ -525,7 +584,8 @@ impl InnerLoop { /// Finalize a [`FeedMessageSerializer`] and broadcast the result to all feeds async fn finalize_and_broadcast_to_all_feeds(&mut self, serializer: FeedMessageSerializer) { if let Some(bytes) = serializer.into_finalized() { - self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)).await; + self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes)) + .await; } } @@ -537,14 +597,23 @@ impl InnerLoop { } /// Finalize a [`FeedMessageSerializer`] and broadcast the result to chain finality feeds - async fn finalize_and_broadcast_to_chain_finality_feeds(&mut self, genesis_hash: &BlockHash, serializer: FeedMessageSerializer) { + async fn finalize_and_broadcast_to_chain_finality_feeds( + &mut self, + genesis_hash: &BlockHash, + serializer: FeedMessageSerializer, + ) { if let Some(bytes) = serializer.into_finalized() { - self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)).await; + self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)) + .await; } } /// Send a message to all chain finality feeds. - async fn broadcast_to_chain_finality_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) { + async fn broadcast_to_chain_finality_feeds( + &mut self, + genesis_hash: &BlockHash, + message: ToFeedWebsocket, + ) { if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { // Get all feeds for the chain, but only broadcast to those feeds that // are also subscribed to receive finality updates. @@ -555,4 +624,4 @@ impl InnerLoop { } } } -} \ No newline at end of file +} diff --git a/backend/telemetry/src/aggregator/mod.rs b/backend/telemetry/src/aggregator/mod.rs index 1ac1108..c73fba3 100644 --- a/backend/telemetry/src/aggregator/mod.rs +++ b/backend/telemetry/src/aggregator/mod.rs @@ -2,6 +2,6 @@ mod aggregator; mod inner_loop; // Expose the various message types that can be worked with externally: -pub use inner_loop::{ FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket }; +pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket}; -pub use aggregator::*; \ No newline at end of file +pub use aggregator::*; diff --git a/backend/telemetry/src/feed_message.rs b/backend/telemetry/src/feed_message.rs index 756f293..d6986ca 100644 --- a/backend/telemetry/src/feed_message.rs +++ b/backend/telemetry/src/feed_message.rs @@ -5,11 +5,10 @@ use serde::Serialize; use std::mem; use crate::state::Node; -use serde_json::to_writer; use common::node_types::{ - BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeStats, - Timestamp + BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeStats, Timestamp, }; +use serde_json::to_writer; type Address = Box; type FeedNodeId = usize; diff --git a/backend/telemetry/src/find_location.rs b/backend/telemetry/src/find_location.rs index 9610d14..e2aa661 100644 --- a/backend/telemetry/src/find_location.rs +++ b/backend/telemetry/src/find_location.rs @@ -1,11 +1,11 @@ use std::net::Ipv4Addr; use std::sync::Arc; +use futures::channel::mpsc; +use futures::{Sink, SinkExt, StreamExt}; use parking_lot::RwLock; use rustc_hash::FxHashMap; use serde::Deserialize; -use futures::{Sink, SinkExt, StreamExt}; -use futures::channel::mpsc; use common::node_types::NodeLocation; use tokio::sync::Semaphore; @@ -18,7 +18,7 @@ pub type Location = Option>; pub fn find_location(response_chan: R) -> mpsc::UnboundedSender<(Id, Ipv4Addr)> where R: Sink<(Id, Option>)> + Unpin + Send + Clone + 'static, - Id: Clone + Send + 'static + Id: Clone + Send + 'static, { let (tx, mut rx) = mpsc::unbounded(); @@ -40,14 +40,12 @@ where // Spawn a loop to handle location requests tokio::spawn(async move { - // Allow 4 requests at a time. acquiring a token will block while the // number of concurrent location requests is more than this. let semaphore = Arc::new(Semaphore::new(4)); loop { while let Some((id, ip_address)) = rx.next().await { - let permit = semaphore.clone().acquire_owned().await.unwrap(); let mut response_chan = response_chan.clone(); let locator = locator.clone(); @@ -57,8 +55,8 @@ where tokio::spawn(async move { match locator.locate(ip_address).await { Ok(loc) => { - let _ = response_chan.send((id,loc)).await; - }, + let _ = response_chan.send((id, loc)).await; + } Err(e) => { log::debug!("GET error for ip location: {:?}", e); } @@ -88,7 +86,7 @@ impl Locator { Locator { client, - cache: Arc::new(RwLock::new(cache)) + cache: Arc::new(RwLock::new(cache)), } } @@ -113,7 +111,10 @@ impl Locator { Ok(location) } - async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result>, reqwest::Error> { + async fn iplocate_ipapi_co( + &self, + ip: Ipv4Addr, + ) -> Result>, reqwest::Error> { let location = self .query(&format!("https://ipapi.co/{}/json", ip)) .await? @@ -122,7 +123,10 @@ impl Locator { Ok(location) } - async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result>, reqwest::Error> { + async fn iplocate_ipinfo_io( + &self, + ip: Ipv4Addr, + ) -> Result>, reqwest::Error> { let location = self .query(&format!("https://ipinfo.io/{}/json", ip)) .await? @@ -132,7 +136,8 @@ impl Locator { } async fn query(&self, url: &str) -> Result, reqwest::Error> - where for<'de> T: Deserialize<'de> + where + for<'de> T: Deserialize<'de>, { match self.client.get(url).send().await?.json::().await { Ok(result) => Ok(Some(result)), @@ -203,4 +208,4 @@ mod tests { assert!(location.is_none()); } -} \ No newline at end of file +} diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index c1caf39..aeefd4e 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -1,19 +1,21 @@ mod aggregator; -mod state; mod feed_message; mod find_location; +mod state; use std::net::SocketAddr; use std::str::FromStr; +use aggregator::{ + Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, +}; use bincode::Options; -use structopt::StructOpt; +use common::{internal_messages, LogLevel}; +use futures::{channel::mpsc, SinkExt, StreamExt}; use simple_logger::SimpleLogger; -use futures::{StreamExt, SinkExt, channel::mpsc}; -use warp::Filter; +use structopt::StructOpt; use warp::filters::ws; -use common::{ internal_messages, LogLevel }; -use aggregator::{ Aggregator, FromFeedWebsocket, ToFeedWebsocket, FromShardWebsocket, ToShardWebsocket }; +use warp::Filter; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -27,26 +29,15 @@ struct Opts { /// This is the socket address that Telemetryis listening to. This is restricted to /// localhost (127.0.0.1) by default and should be fine for most use cases. If /// you are using Telemetry in a container, you likely want to set this to '0.0.0.0:8000' - #[structopt( - short = "l", - long = "listen", - default_value = "127.0.0.1:8000", - )] + #[structopt(short = "l", long = "listen", default_value = "127.0.0.1:8000")] socket: std::net::SocketAddr, /// The desired log level; one of 'error', 'warn', 'info', 'debug' or 'trace', where /// 'error' only logs errors and 'trace' logs everything. - #[structopt( - required = false, - long = "log", - default_value = "info", - )] + #[structopt(required = false, long = "log", default_value = "info")] log_level: LogLevel, /// Space delimited list of the names of chains that are not allowed to connect to /// telemetry. Case sensitive. - #[structopt( - required = false, - long = "denylist", - )] + #[structopt(required = false, long = "denylist")] denylist: Vec, } @@ -60,10 +51,7 @@ async fn main() { .init() .expect("Must be able to start a logger"); - log::info!( - "Starting Telemetry Core version: {}", - VERSION - ); + log::info!("Starting Telemetry Core version: {}", VERSION); if let Err(e) = start_server(opts).await { log::error!("Error starting server: {}", e); @@ -72,42 +60,41 @@ async fn main() { /// Declare our routes and start the server. async fn start_server(opts: Opts) -> anyhow::Result<()> { - let shard_aggregator = Aggregator::spawn(opts.denylist).await?; let feed_aggregator = shard_aggregator.clone(); // Handle requests to /health by returning OK. - let health_route = - warp::path("health") - .map(|| "OK"); + let health_route = warp::path("health").map(|| "OK"); // Handle websocket requests from shards. - let ws_shard_submit_route = - warp::path("shard_submit") + let ws_shard_submit_route = warp::path("shard_submit") .and(warp::ws()) .and(warp::filters::addr::remote()) .map(move |ws: ws::Ws, addr: Option| { let tx_to_aggregator = shard_aggregator.subscribe_shard(); log::info!("Opening /shard_submit connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { - let (mut tx_to_aggregator, websocket) = handle_shard_websocket_connection(websocket, tx_to_aggregator).await; + let (mut tx_to_aggregator, websocket) = + handle_shard_websocket_connection(websocket, tx_to_aggregator).await; log::info!("Closing /shard_submit connection from {:?}", addr); // Tell the aggregator that this connection has closed, so it can tidy up. - let _ = tx_to_aggregator.send(FromShardWebsocket::Disconnected).await; + let _ = tx_to_aggregator + .send(FromShardWebsocket::Disconnected) + .await; let _ = websocket.close().await; }) }); // Handle websocket requests from frontends. - let ws_feed_route = - warp::path("feed") + let ws_feed_route = warp::path("feed") .and(warp::ws()) .and(warp::filters::addr::remote()) .map(move |ws: ws::Ws, addr: Option| { let tx_to_aggregator = feed_aggregator.subscribe_feed(); log::info!("Opening /feed connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { - let (mut tx_to_aggregator, websocket) = handle_feed_websocket_connection(websocket, tx_to_aggregator).await; + let (mut tx_to_aggregator, websocket) = + handle_feed_websocket_connection(websocket, tx_to_aggregator).await; log::info!("Closing /feed connection from {:?}", addr); // Tell the aggregator that this connection has closed, so it can tidy up. let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; @@ -116,22 +103,24 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { }); // Merge the routes and start our server: - let routes = ws_shard_submit_route - .or(ws_feed_route) - .or(health_route); + let routes = ws_shard_submit_route.or(ws_feed_route).or(health_route); warp::serve(routes).run(opts.socket).await; Ok(()) } /// This handles messages coming to/from a shard connection -async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> (S, ws::WebSocket) - where S: futures::Sink + Unpin +async fn handle_shard_websocket_connection( + mut websocket: ws::WebSocket, + mut tx_to_aggregator: S, +) -> (S, ws::WebSocket) +where + S: futures::Sink + Unpin, { let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::channel(10); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromShardWebsocket::Initialize { - channel: tx_to_shard_conn + channel: tx_to_shard_conn, }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); @@ -220,15 +209,19 @@ async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut } /// This handles messages coming from a feed connection -async fn handle_feed_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> (S, ws::WebSocket) - where S: futures::Sink + Unpin +async fn handle_feed_websocket_connection( + mut websocket: ws::WebSocket, + mut tx_to_aggregator: S, +) -> (S, ws::WebSocket) +where + S: futures::Sink + Unpin, { // unbounded channel so that slow feeds don't block aggregator progress: let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::unbounded(); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { - channel: tx_to_feed_conn + channel: tx_to_feed_conn, }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index 0b9fca9..5a00ee7 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -1,16 +1,16 @@ -use std::collections::{ HashSet }; -use common::node_types::{ BlockHash, BlockNumber }; -use common::node_types::{Block, Timestamp}; use common::node_message::Payload; -use common::{time, id_type, DenseMap, MostSeen, NumStats}; +use common::node_types::{Block, Timestamp}; +use common::node_types::{BlockHash, BlockNumber}; +use common::{id_type, time, DenseMap, MostSeen, NumStats}; use once_cell::sync::Lazy; +use std::collections::HashSet; use crate::feed_message::{self, FeedMessageSerializer}; use crate::find_location; use super::node::Node; -id_type!{ +id_type! { /// A Node ID that is unique to the chain it's in. pub ChainNodeId(usize) } @@ -36,19 +36,19 @@ pub struct Chain { /// When the best block first arrived timestamp: Option, /// Genesis hash of this chain - genesis_hash: BlockHash + genesis_hash: BlockHash, } pub enum AddNodeResult { Overquota, Added { id: ChainNodeId, - chain_renamed: bool - } + chain_renamed: bool, + }, } pub struct RemoveNodeResult { - pub chain_renamed: bool + pub chain_renamed: bool, } /// Labels of chains we consider "first party". These chains allow any @@ -76,7 +76,7 @@ impl Chain { block_times: NumStats::new(50), average_block_time: None, timestamp: None, - genesis_hash + genesis_hash, } } @@ -90,7 +90,7 @@ impl Chain { /// Assign a node to this chain. pub fn add_node(&mut self, node: Node) -> AddNodeResult { if !self.can_add_node() { - return AddNodeResult::Overquota + return AddNodeResult::Overquota; } let node_chain_label = &node.details().chain; @@ -99,7 +99,7 @@ impl Chain { AddNodeResult::Added { id: node_id, - chain_renamed: label_result.has_changed() + chain_renamed: label_result.has_changed(), } } @@ -107,21 +107,29 @@ impl Chain { pub fn remove_node(&mut self, node_id: ChainNodeId) -> RemoveNodeResult { let node = match self.nodes.remove(node_id) { Some(node) => node, - None => return RemoveNodeResult { chain_renamed: false } + None => { + return RemoveNodeResult { + chain_renamed: false, + } + } }; let node_chain_label = &node.details().chain; let label_result = self.labels.remove(node_chain_label); RemoveNodeResult { - chain_renamed: label_result.has_changed() + chain_renamed: label_result.has_changed(), } } /// Attempt to update the best block seen in this chain. /// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false). - pub fn update_node(&mut self, nid: ChainNodeId, payload: Payload, feed: &mut FeedMessageSerializer) -> bool { - + pub fn update_node( + &mut self, + nid: ChainNodeId, + payload: Payload, + feed: &mut FeedMessageSerializer, + ) -> bool { if let Some(block) = payload.best_block() { self.handle_block(block, nid, feed); } @@ -159,9 +167,7 @@ impl Chain { return true; } Payload::AfgReceivedPrecommit(precommit) => { - if let Ok(finalized_number) = - precommit.target_number.parse::() - { + if let Ok(finalized_number) = precommit.target_number.parse::() { if let Some(addr) = node.details().validator.clone() { let voter = precommit.voter.clone(); feed.push(feed_message::AfgReceivedPrecommit( @@ -175,9 +181,7 @@ impl Chain { return true; } Payload::AfgReceivedPrevote(prevote) => { - if let Ok(finalized_number) = - prevote.target_number.parse::() - { + if let Ok(finalized_number) = prevote.target_number.parse::() { if let Some(addr) = node.details().validator.clone() { let voter = prevote.voter.clone(); feed.push(feed_message::AfgReceivedPrevote( @@ -204,7 +208,10 @@ impl Chain { if finalized.height > self.finalized.height { self.finalized = *finalized; - feed.push(feed_message::BestFinalized(finalized.height, finalized.hash)); + feed.push(feed_message::BestFinalized( + finalized.height, + finalized.hash, + )); } } } @@ -261,7 +268,6 @@ impl Chain { /// Check if the chain is stale (has not received a new best block in a while). /// If so, find a new best block, ignoring any stale nodes and marking them as such. fn update_stale_nodes(&mut self, now: u64, feed: &mut FeedMessageSerializer) { - let threshold = now - STALE_TIMEOUT; let timestamp = match self.timestamp { Some(ts) => ts, @@ -303,11 +309,18 @@ impl Chain { timestamp.unwrap_or(now), None, )); - feed.push(feed_message::BestFinalized(finalized.height, finalized.hash)); + feed.push(feed_message::BestFinalized( + finalized.height, + finalized.hash, + )); } } - pub fn update_node_location(&mut self, node_id: ChainNodeId, location: find_location::Location) -> bool { + pub fn update_node_location( + &mut self, + node_id: ChainNodeId, + location: find_location::Location, + ) -> bool { if let Some(node) = self.nodes.get_mut(node_id) { node.update_location(location); true @@ -319,7 +332,7 @@ impl Chain { pub fn get_node(&self, id: ChainNodeId) -> Option<&Node> { self.nodes.get(id) } - pub fn iter_nodes(&self) -> impl Iterator { + pub fn iter_nodes(&self) -> impl Iterator { self.nodes.iter() } pub fn label(&self) -> &str { @@ -354,4 +367,4 @@ fn max_nodes(label: &str) -> usize { } else { THIRD_PARTY_NETWORKS_MAX_NODES } -} \ No newline at end of file +} diff --git a/backend/telemetry/src/state/mod.rs b/backend/telemetry/src/state/mod.rs index 713f123..df12d23 100644 --- a/backend/telemetry/src/state/mod.rs +++ b/backend/telemetry/src/state/mod.rs @@ -1,7 +1,7 @@ -mod node; mod chain; +mod node; mod state; pub use node::Node; -pub use state::*; \ No newline at end of file +pub use state::*; diff --git a/backend/telemetry/src/state/node.rs b/backend/telemetry/src/state/node.rs index befb931..665103c 100644 --- a/backend/telemetry/src/state/node.rs +++ b/backend/telemetry/src/state/node.rs @@ -1,10 +1,9 @@ +use crate::find_location; +use common::node_message::SystemInterval; use common::node_types::{ - Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeLocation, NodeStats, - Timestamp, + Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeLocation, NodeStats, Timestamp, }; use common::time; -use common::node_message::SystemInterval; -use crate::find_location; /// Minimum time between block below broadcasting updates to the browser gets throttled, in ms. const THROTTLE_THRESHOLD: u64 = 100; diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry/src/state/state.rs index b8cc49d..5ae06c7 100644 --- a/backend/telemetry/src/state/state.rs +++ b/backend/telemetry/src/state/state.rs @@ -1,22 +1,22 @@ -use std::collections::{ HashSet, HashMap }; use super::node::Node; -use common::node_types::{Block, BlockHash, NodeDetails, Timestamp}; -use common::node_message::Payload; -use common::{ id_type, DenseMap }; -use std::iter::IntoIterator; use crate::feed_message::FeedMessageSerializer; use crate::find_location; +use common::node_message::Payload; +use common::node_types::{Block, BlockHash, NodeDetails, Timestamp}; +use common::{id_type, DenseMap}; +use std::collections::{HashMap, HashSet}; +use std::iter::IntoIterator; -use super::chain::{ self, Chain, ChainNodeId }; +use super::chain::{self, Chain, ChainNodeId}; -id_type!{ +id_type! { /// A globally unique Chain ID. pub ChainId(usize) } /// A "global" Node ID is a composite of the ID of the chain it's /// on, and it's chain local ID. -#[derive(Debug,Clone,Copy,Hash,PartialEq,Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct NodeId(ChainId, ChainNodeId); impl NodeId { @@ -44,15 +44,15 @@ pub enum AddNodeResult<'a> { /// The chain is over quota (too many nodes connected), so can't add the node ChainOverQuota, /// The node was added to the chain - NodeAddedToChain(NodeAddedToChain<'a>) + NodeAddedToChain(NodeAddedToChain<'a>), } #[cfg(test)] -impl <'a> AddNodeResult<'a> { +impl<'a> AddNodeResult<'a> { pub fn unwrap_id(&self) -> NodeId { match &self { AddNodeResult::NodeAddedToChain(d) => d.id, - _ => panic!("Attempt to unwrap_id on AddNodeResult that did not succeed") + _ => panic!("Attempt to unwrap_id on AddNodeResult that did not succeed"), } } } @@ -69,7 +69,7 @@ pub struct NodeAddedToChain<'a> { /// Number of nodes in the chain. If 1, the chain was just added. pub chain_node_count: usize, /// Has the chain label been updated? - pub has_chain_label_changed: bool + pub has_chain_label_changed: bool, } /// if removing a node is successful, we get this information back. @@ -85,7 +85,7 @@ pub struct RemovedNode { } impl State { - pub fn new>(denylist: T) -> State { + pub fn new>(denylist: T) -> State { State { chains: DenseMap::new(), chains_by_genesis_hash: HashMap::new(), @@ -94,16 +94,14 @@ impl State { } } - pub fn iter_chains(&self) -> impl Iterator> { + pub fn iter_chains(&self) -> impl Iterator> { self.chains .iter() - .map(move |(_,chain)| StateChain { chain }) + .map(move |(_, chain)| StateChain { chain }) } pub fn get_chain_by_node_id(&self, node_id: NodeId) -> Option> { - self.chains - .get(node_id.0) - .map(|chain| StateChain { chain }) + self.chains.get(node_id.0).map(|chain| StateChain { chain }) } pub fn get_chain_by_genesis_hash(&self, genesis_hash: &BlockHash) -> Option> { @@ -120,7 +118,11 @@ impl State { .map(|chain| StateChain { chain }) } - pub fn add_node(&mut self, genesis_hash: BlockHash, node_details: NodeDetails) -> AddNodeResult<'_> { + pub fn add_node( + &mut self, + genesis_hash: BlockHash, + node_details: NodeDetails, + ) -> AddNodeResult<'_> { if self.denylist.contains(&*node_details.chain) { return AddNodeResult::ChainOnDenyList; } @@ -139,16 +141,15 @@ impl State { }; // Get the chain. - let chain = self.chains.get_mut(chain_id) - .expect("should be known to exist after the above (unless chains_by_genesis_hash out of sync)"); + let chain = self.chains.get_mut(chain_id).expect( + "should be known to exist after the above (unless chains_by_genesis_hash out of sync)", + ); let node = Node::new(node_details); let old_chain_label = chain.label().into(); match chain.add_node(node) { - chain::AddNodeResult::Overquota => { - AddNodeResult::ChainOverQuota - }, + chain::AddNodeResult::Overquota => AddNodeResult::ChainOverQuota, chain::AddNodeResult::Added { id, chain_renamed } => { let chain = &*chain; @@ -165,7 +166,7 @@ impl State { old_chain_label: old_chain_label, new_chain_label: chain.label(), chain_node_count: chain.node_count(), - has_chain_label_changed: chain_renamed + has_chain_label_changed: chain_renamed, }) } } @@ -194,30 +195,43 @@ impl State { // Make sure chains always referenced by their most common label: if remove_result.chain_renamed { self.chains_by_label.remove(&old_chain_label); - self.chains_by_label.insert(new_chain_label.clone(), chain_id); + self.chains_by_label + .insert(new_chain_label.clone(), chain_id); } Some(RemovedNode { old_chain_label, new_chain_label, chain_node_count: chain_node_count, - has_chain_label_changed: remove_result.chain_renamed + has_chain_label_changed: remove_result.chain_renamed, }) } /// Attempt to update the best block seen, given a node and block. /// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false). - pub fn update_node(&mut self, NodeId(chain_id, chain_node_id): NodeId, payload: Payload, feed: &mut FeedMessageSerializer) -> bool { + pub fn update_node( + &mut self, + NodeId(chain_id, chain_node_id): NodeId, + payload: Payload, + feed: &mut FeedMessageSerializer, + ) -> bool { let chain = match self.chains.get_mut(chain_id) { Some(chain) => chain, - None => { log::error!("Cannot find chain for node with ID {:?}", chain_id); return false } + None => { + log::error!("Cannot find chain for node with ID {:?}", chain_id); + return false; + } }; chain.update_node(chain_node_id, payload, feed) } /// Update the location for a node. Return `false` if the node was not found. - pub fn update_node_location(&mut self, NodeId(chain_id, chain_node_id): NodeId, location: find_location::Location) -> bool { + pub fn update_node_location( + &mut self, + NodeId(chain_id, chain_node_id): NodeId, + location: find_location::Location, + ) -> bool { if let Some(chain) = self.chains.get_mut(chain_id) { chain.update_node_location(chain_node_id, location) } else { @@ -226,16 +240,15 @@ impl State { } } - /// When we ask for a chain, we get this struct back. This ensures that we have /// a consistent public interface, and don't expose methods on [`Chain`] that /// aren't really intended for use outside of [`State`] methods. Any modification /// of a chain needs to go through [`State`]. pub struct StateChain<'a> { - chain: &'a Chain + chain: &'a Chain, } -impl <'a> StateChain<'a> { +impl<'a> StateChain<'a> { pub fn label(&self) -> &'a str { self.chain.label() } @@ -257,7 +270,7 @@ impl <'a> StateChain<'a> { pub fn finalized_block(&self) -> &'a Block { self.chain.finalized_block() } - pub fn iter_nodes(&self) -> impl Iterator + 'a { + pub fn iter_nodes(&self) -> impl Iterator + 'a { self.chain.iter_nodes() } } @@ -274,7 +287,7 @@ mod test { version: "0.1".into(), validator: None, network_id: None, - startup_time: None + startup_time: None, } } @@ -284,15 +297,12 @@ mod test { let chain1_genesis = BlockHash::from_low_u64_be(1); - let add_result = state.add_node( - chain1_genesis, - node("A", "Chain One") - ); + let add_result = state.add_node(chain1_genesis, node("A", "Chain One")); let add_node_result = match add_result { AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"), AddNodeResult::ChainOverQuota => panic!("Chain not Overquota"), - AddNodeResult::NodeAddedToChain(details) => details + AddNodeResult::NodeAddedToChain(details) => details, }; assert_eq!(add_node_result.id, NodeId(0.into(), 0.into())); @@ -301,15 +311,12 @@ mod test { assert_eq!(add_node_result.chain_node_count, 1); assert_eq!(add_node_result.has_chain_label_changed, true); - let add_result = state.add_node( - chain1_genesis, - node("A", "Chain One") - ); + let add_result = state.add_node(chain1_genesis, node("A", "Chain One")); let add_node_result = match add_result { AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"), AddNodeResult::ChainOverQuota => panic!("Chain not Overquota"), - AddNodeResult::NodeAddedToChain(details) => details + AddNodeResult::NodeAddedToChain(details) => details, }; assert_eq!(add_node_result.id, NodeId(0.into(), 1.into())); @@ -328,7 +335,13 @@ mod test { .add_node(chain1_genesis, node("A", "Chain One")) // 0 .unwrap_id(); - assert_eq!(state.get_chain_by_node_id(node_id0).expect("Chain should exist").label(), "Chain One"); + assert_eq!( + state + .get_chain_by_node_id(node_id0) + .expect("Chain should exist") + .label(), + "Chain One" + ); assert!(state.get_chain_by_label("Chain One").is_some()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); @@ -337,7 +350,13 @@ mod test { .unwrap_id(); // Chain name hasn't changed yet; "Chain One" as common as "Chain Two".. - assert_eq!(state.get_chain_by_node_id(node_id0).expect("Chain should exist").label(), "Chain One"); + assert_eq!( + state + .get_chain_by_node_id(node_id0) + .expect("Chain should exist") + .label(), + "Chain One" + ); assert!(state.get_chain_by_label("Chain One").is_some()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); @@ -346,7 +365,13 @@ mod test { .unwrap_id(); // 2 // Chain name has changed; "Chain Two" the winner now.. - assert_eq!(state.get_chain_by_node_id(node_id0).expect("Chain should exist").label(), "Chain Two"); + assert_eq!( + state + .get_chain_by_node_id(node_id0) + .expect("Chain should exist") + .label(), + "Chain Two" + ); assert!(state.get_chain_by_label("Chain One").is_none()); assert!(state.get_chain_by_label("Chain Two").is_some()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); @@ -355,7 +380,13 @@ mod test { state.remove_node(node_id2).expect("Removal OK (id: 2)"); // Removed both "Chain Two" nodes; dominant name now "Chain One" again.. - assert_eq!(state.get_chain_by_node_id(node_id0).expect("Chain should exist").label(), "Chain One"); + assert_eq!( + state + .get_chain_by_node_id(node_id0) + .expect("Chain should exist") + .label(), + "Chain One" + ); assert!(state.get_chain_by_label("Chain One").is_some()); assert!(state.get_chain_by_label("Chain Two").is_none()); assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some()); @@ -380,4 +411,4 @@ mod test { assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_none()); assert_eq!(state.iter_chains().count(), 0); } -} \ No newline at end of file +}