diff --git a/backend/common/src/http_utils.rs b/backend/common/src/http_utils.rs index 4f4ecfd..76812b0 100644 --- a/backend/common/src/http_utils.rs +++ b/backend/common/src/http_utils.rs @@ -76,7 +76,7 @@ where ); } - // Just a little ceremony we need to go to to return the correct response key: + // Just a little ceremony to return the correct response key: let mut accept_key_buf = [0; 32]; let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf); diff --git a/backend/common/src/node_message.rs b/backend/common/src/node_message.rs index d9c32fc..9b63e8b 100644 --- a/backend/common/src/node_message.rs +++ b/backend/common/src/node_message.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! This is the internal represenation of telemetry messages sent from nodes. +//! This is the internal representation of telemetry messages sent from nodes. //! There is a separate JSON representation of these types, because internally we want to be -//! able to serialize these messages to bincode, and various serde attribtues aren't compatible +//! able to serialize these messages to bincode, and various serde attributes aren't compatible //! with this, hence this separate internal representation. use crate::node_types::{Block, BlockHash, BlockNumber, NodeDetails}; @@ -159,6 +159,7 @@ mod tests { network_id: ArrayString::new(), startup_time: None, sysinfo: None, + ip: Some("127.0.0.1".into()), }, }), }); diff --git a/backend/common/src/node_types.rs b/backend/common/src/node_types.rs index 74816d1..1fdc5d1 100644 --- a/backend/common/src/node_types.rs +++ b/backend/common/src/node_types.rs @@ -42,6 +42,7 @@ pub struct NodeDetails { pub target_arch: Option>, pub target_env: Option>, pub sysinfo: Option, + pub ip: Option>, } /// Hardware and software information for the node. diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index e12599e..c4c7941 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -44,6 +44,8 @@ pub struct AggregatorOpts { /// How many nodes from third party chains are allowed to connect /// before we prevent connections from them. pub max_third_party_nodes: usize, + /// Flag to expose the IP addresses of all connected nodes to the feed subscribers. + pub expose_node_ips: bool, } struct AggregatorInternal { @@ -76,9 +78,7 @@ impl Aggregator { tokio::spawn(Aggregator::handle_messages( rx_from_external, tx_to_locator, - opts.max_queue_len, - opts.denylist, - opts.max_third_party_nodes, + opts, )); // Return a handle to our aggregator: @@ -95,18 +95,11 @@ impl Aggregator { async fn handle_messages( rx_from_external: flume::Receiver, tx_to_aggregator: flume::Sender<(NodeId, IpAddr)>, - max_queue_len: usize, - denylist: Vec, - max_third_party_nodes: usize, + opts: AggregatorOpts, ) { - inner_loop::InnerLoop::new( - tx_to_aggregator, - denylist, - max_queue_len, - max_third_party_nodes, - ) - .handle(rx_from_external) - .await; + inner_loop::InnerLoop::new(tx_to_aggregator, opts) + .handle(rx_from_external) + .await; } /// Gather metrics from our aggregator loop diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 77f3e73..0b42082 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -16,8 +16,8 @@ use super::aggregator::ConnId; use crate::feed_message::{self, FeedMessageSerializer}; -use crate::find_location; use crate::state::{self, NodeId, State}; +use crate::{find_location, AggregatorOpts}; use bimap::BiMap; use common::{ internal_messages::{self, MuteReason, ShardNodeId}, @@ -144,7 +144,7 @@ impl FromStr for FromFeedWebsocket { } } -/// The aggregator can these messages back to a feed connection. +/// The aggregator can send these messages back to a feed connection. #[derive(Clone, Debug)] pub enum ToFeedWebsocket { Bytes(bytes::Bytes), @@ -173,24 +173,23 @@ pub struct InnerLoop { /// How big can the queue of messages coming in to the aggregator get before messages /// are prioritised and dropped to try and get back on track. max_queue_len: usize, + + /// Flag to expose the IP addresses of all connected nodes to the feed subscribers. + expose_node_ips: bool, } impl InnerLoop { /// Create a new inner loop handler with the various state it needs. - pub fn new( - tx_to_locator: flume::Sender<(NodeId, IpAddr)>, - denylist: Vec, - max_queue_len: usize, - max_third_party_nodes: usize, - ) -> Self { + pub fn new(tx_to_locator: flume::Sender<(NodeId, IpAddr)>, opts: AggregatorOpts) -> Self { InnerLoop { - node_state: State::new(denylist, max_third_party_nodes), + node_state: State::new(opts.denylist, opts.max_third_party_nodes), node_ids: BiMap::new(), feed_channels: HashMap::new(), shard_channels: HashMap::new(), chain_to_feed_conn_ids: MultiMapUnique::new(), tx_to_locator, - max_queue_len, + max_queue_len: opts.max_queue_len, + expose_node_ips: opts.expose_node_ips, } } @@ -323,9 +322,11 @@ impl InnerLoop { FromShardWebsocket::Add { local_id, ip, - node, + mut node, genesis_hash, } => { + // Conditionally modify the node's details to include the IP address. + node.ip = self.expose_node_ips.then_some(ip.to_string().into()); match self.node_state.add_node(genesis_hash, node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { @@ -376,7 +377,7 @@ impl InnerLoop { )); self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all); - // Ask for the grographical location of the node. + // Ask for the geographical location of the node. let _ = self.tx_to_locator.send((node_id, ip)); } } @@ -548,7 +549,7 @@ impl InnerLoop { let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } - // Actually make a note of the new chain subsciption: + // Actually make a note of the new chain subscription: let new_genesis_hash = new_chain.genesis_hash(); self.chain_to_feed_conn_ids .insert(new_genesis_hash, feed_conn_id); diff --git a/backend/telemetry_core/src/feed_message.rs b/backend/telemetry_core/src/feed_message.rs index fdf417f..dc5fe4d 100644 --- a/backend/telemetry_core/src/feed_message.rs +++ b/backend/telemetry_core/src/feed_message.rs @@ -45,7 +45,7 @@ where } pub struct FeedMessageSerializer { - /// Current buffer, + /// Current buffer. buffer: Vec, } @@ -189,6 +189,7 @@ impl FeedMessageWrite for AddedNode<'_> { &details.version, &details.validator, &details.network_id, + &details.ip, ); ser.write(&( diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index 2ae6967..afc6e8a 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -73,7 +73,7 @@ where } /// This struct can be used to make location requests, given -/// an IPV4 address. +/// an IPV4 or IPV6 address. #[derive(Debug, Clone)] struct Locator { city: Arc>, diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index b4bd918..93051ac 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -82,6 +82,9 @@ struct Opts { /// How many nodes from third party chains are allowed to connect before we prevent connections from them. #[structopt(long, default_value = "1000")] max_third_party_nodes: usize, + /// Flag to expose the IP addresses of all connected nodes to the feed subscribers. + #[structopt(long)] + pub expose_node_ips: bool, } fn main() { @@ -132,6 +135,7 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> max_queue_len: aggregator_queue_len, denylist: opts.denylist, max_third_party_nodes: opts.max_third_party_nodes, + expose_node_ips: opts.expose_node_ips, }, ) .await?; diff --git a/backend/telemetry_core/src/state/state.rs b/backend/telemetry_core/src/state/state.rs index 671c73f..5385e24 100644 --- a/backend/telemetry_core/src/state/state.rs +++ b/backend/telemetry_core/src/state/state.rs @@ -41,11 +41,11 @@ impl NodeId { } } -/// Our state constains node and chain information +/// Our state contains node and chain information pub struct State { chains: DenseMap, - // Find the right chain given various details. + /// Find the right chain given various details. chains_by_genesis_hash: HashMap, /// Chain labels that we do not want to allow connecting. @@ -56,7 +56,7 @@ pub struct State { max_third_party_nodes: usize, } -/// Adding a node to a chain leads to this node_idult +/// Adding a node to a chain leads to this result. pub enum AddNodeResult<'a> { /// The chain is on the "deny list", so we can't add the node ChainOnDenyList, @@ -300,6 +300,7 @@ mod test { network_id: NetworkId::new(), startup_time: None, sysinfo: None, + ip: None, } } diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 7b7145d..d40807f 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -105,7 +105,7 @@ impl Aggregator { pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result { let (tx_to_aggregator, rx_from_external) = flume::bounded(10); - // Establish a resiliant connection to the core (this retries as needed): + // Establish a resilient connection to the core (this retries as needed): let (tx_to_telemetry_core, rx_from_telemetry_core) = create_ws_connection_to_core(telemetry_uri).await; diff --git a/backend/telemetry_shard/src/json_message/node_message.rs b/backend/telemetry_shard/src/json_message/node_message.rs index 649e9d7..f0abb18 100644 --- a/backend/telemetry_shard/src/json_message/node_message.rs +++ b/backend/telemetry_shard/src/json_message/node_message.rs @@ -248,6 +248,7 @@ pub struct NodeDetails { pub target_arch: Option>, pub target_env: Option>, pub sysinfo: Option, + pub ip: Option>, } impl From for node_types::NodeDetails { @@ -280,6 +281,7 @@ impl From for node_types::NodeDetails { target_arch: details.target_arch, target_env: details.target_env, sysinfo: details.sysinfo.map(|sysinfo| sysinfo.into()), + ip: details.ip, } } } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 778b1d4..a95142a 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -279,7 +279,7 @@ where let mut stale_interval = tokio::time::interval(stale_node_timeout / 2); // Our main select loop atomically receives and handles telemetry messages from the node, - // and periodically checks for stale connections to keep our ndoe state tidy. + // and periodically checks for stale connections to keep our node state tidy. loop { tokio::select! { // We periodically check for stale message IDs and remove nodes associated with diff --git a/backend/test_utils/src/feed_message_de.rs b/backend/test_utils/src/feed_message_de.rs index 9770b1b..b46e614 100644 --- a/backend/test_utils/src/feed_message_de.rs +++ b/backend/test_utils/src/feed_message_de.rs @@ -134,6 +134,7 @@ pub struct NodeDetails { pub version: String, pub validator: Option, pub network_id: Option, + pub ip: Option, } impl FeedMessage { @@ -185,7 +186,7 @@ impl FeedMessage { 3 => { let ( node_id, - (name, implementation, version, validator, network_id), + (name, implementation, version, validator, network_id, ip), stats, io, hardware, @@ -205,6 +206,7 @@ impl FeedMessage { version, validator, network_id, + ip, }, stats, block_details, diff --git a/frontend/src/common/types.ts b/frontend/src/common/types.ts index 1edfb0b..3550e0b 100644 --- a/frontend/src/common/types.ts +++ b/frontend/src/common/types.ts @@ -55,7 +55,8 @@ export type NodeDetails = [ NodeImplementation, NodeVersion, Maybe
, - Maybe + Maybe, + Maybe ]; export type NodeStats = [PeerCount, TransactionCount]; export type NodeIO = [Array];