diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index 3c89a7e..c80df6e 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -153,7 +153,7 @@ impl Aggregator { }, ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node, genesis_hash }) => { // Don't bother doing anything else if we're disconnected, since we'll force the - // ndoe to reconnect anyway when the backend does: + // node to reconnect anyway when the backend does: if !connected_to_telemetry_core { continue } // Generate a new "local ID" for messages from this connection: diff --git a/backend/shard/src/main.rs b/backend/shard/src/main.rs index a1ed08e..ed3a434 100644 --- a/backend/shard/src/main.rs +++ b/backend/shard/src/main.rs @@ -1,7 +1,8 @@ mod aggregator; mod connection; +mod real_ip; -use std::net::SocketAddr; +use std::net::IpAddr; use structopt::StructOpt; use http::Uri; @@ -11,6 +12,7 @@ use warp::Filter; use warp::filters::ws; use common::{json, node, log_level::LogLevel}; use aggregator::{ Aggregator, FromWebsocket }; +use real_ip::real_ip; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -82,8 +84,8 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { let ws_route = warp::path("submit") .and(warp::ws()) - .and(warp::filters::addr::remote()) - .map(move |ws: ws::Ws, addr: Option| { + .and(real_ip()) + .map(move |ws: ws::Ws, addr: Option| { // Send messages from the websocket connection to this sink // to have them pass to the aggregator. let tx_to_aggregator = aggregator.subscribe_node(); @@ -106,7 +108,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { } /// This takes care of handling messages from an established socket connection. -async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option) -> (S, ws::WebSocket) +async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option) -> (S, ws::WebSocket) where S: futures::Sink + Unpin { // This could be a oneshot channel, but it's useful to be able to clone @@ -155,7 +157,7 @@ async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_ if let node::Payload::SystemConnected(info) = payload { let _ = tx_to_aggregator.send(FromWebsocket::Add { message_id, - ip: addr.map(|a| a.ip()), + ip: addr, node: info.node, genesis_hash: info.genesis_hash, }).await; diff --git a/backend/shard/src/real_ip.rs b/backend/shard/src/real_ip.rs new file mode 100644 index 0000000..9966df8 --- /dev/null +++ b/backend/shard/src/real_ip.rs @@ -0,0 +1,117 @@ +use std::net::{ SocketAddr, IpAddr }; +use warp::filters::header; +use warp::filters::addr; +use warp::Filter; + +/** +A warp filter to extract the "real" IP address of the connection by looking at headers +set by proxies (this is inspired by Actix Web's implementation of the feature). + +First, check for the standardised "Forwarded" header. This looks something like: + +"Forwarded: for=12.34.56.78;host=example.com;proto=https, for=23.45.67.89" + +Each proxy can append to this comma separated list of forwarded-details. We'll look for +the first "for" address and try to decode that. + +If this doesn't yield a result, look for the non-standard but common X-Forwarded-For header, +which contains a comma separated list of addresses; each proxy in the potential chain possibly +appending one to the end. So, take the first of these if it exists. + +If still no luck, look for the X-Real-IP header, which we expect to contain a single IP address. + +If that _still_ doesn't work, fall back to the socket address of the connection. + +Return `None` if all of this fails to yield an address. +*/ +pub fn real_ip() -> impl warp::Filter,), Error = warp::Rejection> + Clone { + header::optional("forwarded") + .and(header::optional("x-forwarded-for")) + .and(header::optional("x-real-ip")) + .and(addr::remote()) + .map(|forwarded: Option, forwarded_for: Option, real_ip: Option, addr: Option| { + let realip = forwarded.as_ref().and_then(|val| get_first_addr_from_forwarded_header(val)) + .or_else(|| { + // fall back to X-Forwarded-For + forwarded_for.as_ref().and_then(|val| get_first_addr_from_x_forwarded_for_header(val)) + }) + .or_else(|| { + // fall back to X-Real-IP + real_ip.as_ref().map(|val| val.trim()) + }) + .and_then(|ip| { + // Trim the port if it exists + ip.split(":").next() + }) + .and_then(|ip| { + // Attempt to parse to a socket address + ip.parse::().ok() + }) + // Fall back to local IP address if the above fails + .or(addr.map(|a| a.ip())); + + realip + }) +} + +/// Follow https://datatracker.ietf.org/doc/html/rfc7239 to decode the Forwarded header value. +/// Roughly, proxies can add new sets of values by appending a comma to the existing list +/// (so we have something like "values1, values2, values3" from proxy1, proxy2 and proxy3 for +/// instance) and then the valeus themselves are ';' separated name=value pairs. The value in each +/// pair may or may not be surrounded in double quotes. +/// +/// Examples from the RFC: +/// +/// Forwarded: for="_gazonk" +/// Forwarded: For="[2001:db8:cafe::17]:4711" +/// Forwarded: for=192.0.2.60;proto=http;by=203.0.113.43 +/// Forwarded: for=192.0.2.43, for=198.51.100.17 +fn get_first_addr_from_forwarded_header(value: &str) -> Option<&str> { + let first_values = value.split(',').next()?; + + for pair in first_values.split(';') { + let mut parts = pair.trim().splitn(2, '='); + let key = parts.next()?; + let value = parts.next()?; + + if key.to_lowercase() == "for" { + // trim double quotes if they surround the value: + let value = if value.starts_with('"') && value.ends_with('"') { + &value[1..value.len() - 1] + } else { + value + }; + return Some(value); + } + } + + None +} + +fn get_first_addr_from_x_forwarded_for_header(value: &str) -> Option<&str> { + value.split(",").map(|val| val.trim()).next() +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn get_addr_from_forwarded_rfc_examples() { + let examples = vec![ + (r#"for="_gazonk""#, "_gazonk"), + (r#"For="[2001:db8:cafe::17]:4711""#, "[2001:db8:cafe::17]:4711"), + (r#"for=192.0.2.60;proto=http;by=203.0.113.43"#, "192.0.2.60"), + (r#"for=192.0.2.43, for=198.51.100.17"#, "192.0.2.43"), + ]; + + for (value, expected) in examples { + assert_eq!( + get_first_addr_from_forwarded_header(value), + Some(expected), + "Header value: {}", value + ); + } + } + +} \ No newline at end of file diff --git a/backend/telemetry/src/aggregator/aggregator.rs b/backend/telemetry/src/aggregator/aggregator.rs index a2deddb..6bc1963 100644 --- a/backend/telemetry/src/aggregator/aggregator.rs +++ b/backend/telemetry/src/aggregator/aggregator.rs @@ -3,7 +3,7 @@ use std::sync::atomic::AtomicU64; use futures::channel::mpsc; use futures::{ future, Sink, SinkExt }; use super::inner_loop; -use super::find_location::{ self, find_location }; +use crate::find_location::find_location; use crate::state::NodeId; use std::net::Ipv4Addr; diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs index b5ea7df..4a21640 100644 --- a/backend/telemetry/src/aggregator/inner_loop.rs +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -8,13 +8,13 @@ use common::{ util::now }; use bimap::BiMap; -use std::{iter::FromIterator, net::Ipv4Addr, str::FromStr}; +use std::{net::{IpAddr, Ipv4Addr}, str::FromStr}; use futures::channel::{ mpsc }; use futures::{ SinkExt, StreamExt }; use std::collections::{ HashMap, HashSet }; use crate::state::{ self, State, NodeId }; use crate::feed_message::{ self, FeedMessageSerializer }; -use super::find_location; +use crate::find_location; /// A unique Id is assigned per websocket connection (or more accurately, /// per feed socket and per shard socket). This can be combined with the @@ -143,7 +143,7 @@ pub struct InnerLoop { /// These feeds want finality info, too. feed_conn_id_finality: HashSet, - /// Send messages here to make location requests, which are sent back into the loop. + /// Send messages here to make geographical location requests. tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)> } @@ -184,8 +184,32 @@ impl InnerLoop { } } + /// Handle messages that come from the node geographical locator. async fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { - // TODO: Update node location here + self.node_state.update_node_location(node_id, location.clone()); + + if let Some(loc) = location { + let mut feed_message_serializer = FeedMessageSerializer::new(); + feed_message_serializer.push(feed_message::LocatedNode( + node_id, + loc.latitude, + loc.longitude, + &loc.city + )); + + if let Some(bytes) = feed_message_serializer.into_finalized() { + let chain_label = self.node_state + .get_node_chain(node_id) + .map(|chain| chain.label()); + + if let Some(chain_label) = chain_label { + // Don't hold onto lifetime from self because we call a mut fn next: + let label = chain_label.to_owned(); + // Update location for any feeds subscribed to the node's chain. + self.broadcast_to_chain_feeds(&label, ToFeedWebsocket::Bytes(bytes)).await; + } + } + } } /// Handle messages coming from shards. @@ -228,7 +252,10 @@ impl InnerLoop { ).await } - // TODO: The node has been added. use it's IP to find a location. + // Currently we only geographically locate IPV4 addresses so ignore IPV6; + if let Some(IpAddr::V4(ip_v4)) = ip { + let _ = self.tx_to_locator.send((node_id, ip_v4)).await; + } }, } }, @@ -409,6 +436,7 @@ impl InnerLoop { if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) { for &feed_id in feeds { // How much faster would it be if we processed these in parallel? + // Is it practical to do so given lifetimes and such? if let Some(chan) = self.feed_channels.get_mut(&feed_id) { chan.send(message.clone()).await; } diff --git a/backend/telemetry/src/aggregator/mod.rs b/backend/telemetry/src/aggregator/mod.rs index 30622d3..1ac1108 100644 --- a/backend/telemetry/src/aggregator/mod.rs +++ b/backend/telemetry/src/aggregator/mod.rs @@ -1,6 +1,5 @@ mod aggregator; mod inner_loop; -mod find_location; // Expose the various message types that can be worked with externally: pub use inner_loop::{ FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket }; diff --git a/backend/telemetry/src/aggregator/find_location.rs b/backend/telemetry/src/find_location.rs similarity index 100% rename from backend/telemetry/src/aggregator/find_location.rs rename to backend/telemetry/src/find_location.rs diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index 434f3b1..eb1fbad 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -1,6 +1,7 @@ mod aggregator; mod state; mod feed_message; +mod find_location; use std::net::SocketAddr; use std::str::FromStr; diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index b4fb611..874d888 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -90,6 +90,9 @@ impl Chain { pub fn get_node(&self, node_id: NodeId) -> Option<&Node> { self.nodes.get(&node_id) } + pub fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> { + self.nodes.get_mut(&node_id) + } pub fn label(&self) -> &str { &self.labels.best() } diff --git a/backend/telemetry/src/state/node.rs b/backend/telemetry/src/state/node.rs index 613b5ab..cc92af3 100644 --- a/backend/telemetry/src/state/node.rs +++ b/backend/telemetry/src/state/node.rs @@ -6,6 +6,7 @@ use common::types::{ }; use common::util::now; use common::node::SystemInterval; +use crate::find_location; /// Minimum time between block below broadcasting updates to the browser gets throttled, in ms. const THROTTLE_THRESHOLD: u64 = 100; @@ -28,7 +29,7 @@ pub struct Node { /// Hardware stats over time hardware: NodeHardware, /// Physical location details - location: Option>, + location: find_location::Location, /// Flag marking if the node is stale (not syncing or producing blocks) stale: bool, /// Unix timestamp for when node started up (falls back to connection time) @@ -88,8 +89,8 @@ impl Node { self.location.as_deref() } - pub fn update_location(&mut self, location: Arc) { - self.location = Some(location); + pub fn update_location(&mut self, location: find_location::Location) { + self.location = location; } pub fn block_details(&self) -> &BlockDetails { diff --git a/backend/telemetry/src/state/state.rs b/backend/telemetry/src/state/state.rs index 6da8c45..65d3c8e 100644 --- a/backend/telemetry/src/state/state.rs +++ b/backend/telemetry/src/state/state.rs @@ -6,6 +6,7 @@ use common::types::{Block, NodeDetails, NodeLocation, Timestamp}; use common::util::{now, DenseMap, NumStats}; use common::node::Payload; use std::iter::IntoIterator; +use crate::find_location; use super::chain::{ self, Chain }; @@ -16,7 +17,7 @@ pub type Label = Arc; pub struct State { next_id: NodeId, chains: HashMap, - chains_by_label: HashMap, + chains_by_label: HashMap, chains_by_node: HashMap, /// Denylist for networks we do not want to allow connecting. denylist: HashSet, @@ -94,6 +95,14 @@ impl State { AddNodeResult::ChainOverQuota }, chain::AddNodeResult::Added { chain_renamed } => { + // Update the label we use to reference the chain if + // it changes (it'll always change first time a node's added): + if chain_renamed { + let label = chain.label().to_owned(); + self.chains_by_label.remove(&label); + self.chains_by_label.insert(label, genesis_hash); + } + let node = chain.get_node(node_id).unwrap(); AddNodeResult::NodeAddedToChain(NodeAddedToChain { id: node_id, @@ -106,6 +115,30 @@ impl State { } } + /// Update the location for a node. Return `false` if the node was not found. + pub fn update_node_location(&mut self, node_id: NodeId, location: find_location::Location) -> bool { + if let Some(node) = self.get_node_mut(node_id) { + node.update_location(location); + true + } else { + false + } + } + + /// Get the chain that a node belongs to. + pub fn get_node_chain(&self, node_id: NodeId) -> Option<&Chain> { + self.chains_by_node + .get(&node_id) + .and_then(|chain_id| self.chains.get(chain_id)) + } + + /// Obtain mutable access to a node, if it's found. + fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> { + let chain_id = *self.chains_by_node.get(&node_id)?; + let chain = self.chains.get_mut(&chain_id)?; + chain.get_node_mut(node_id) + } + // /// Add a new node to our state. // pub fn add_node(&mut self, id: GlobalId, genesis_hash: BlockHash, node: &NodeDetails) -> AddNodeResult { // if self.denylist.contains(&*node.chain) {