Update node locations when they come in, and get the real IP addr of nodes

This commit is contained in:
James Wilson
2021-06-24 18:34:32 +01:00
parent e383866322
commit fb80edba47
11 changed files with 201 additions and 17 deletions
+1 -1
View File
@@ -153,7 +153,7 @@ impl Aggregator {
},
ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node, genesis_hash }) => {
// Don't bother doing anything else if we're disconnected, since we'll force the
// ndoe to reconnect anyway when the backend does:
// node to reconnect anyway when the backend does:
if !connected_to_telemetry_core { continue }
// Generate a new "local ID" for messages from this connection:
+7 -5
View File
@@ -1,7 +1,8 @@
mod aggregator;
mod connection;
mod real_ip;
use std::net::SocketAddr;
use std::net::IpAddr;
use structopt::StructOpt;
use http::Uri;
@@ -11,6 +12,7 @@ use warp::Filter;
use warp::filters::ws;
use common::{json, node, log_level::LogLevel};
use aggregator::{ Aggregator, FromWebsocket };
use real_ip::real_ip;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
@@ -82,8 +84,8 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
let ws_route =
warp::path("submit")
.and(warp::ws())
.and(warp::filters::addr::remote())
.map(move |ws: ws::Ws, addr: Option<SocketAddr>| {
.and(real_ip())
.map(move |ws: ws::Ws, addr: Option<IpAddr>| {
// Send messages from the websocket connection to this sink
// to have them pass to the aggregator.
let tx_to_aggregator = aggregator.subscribe_node();
@@ -106,7 +108,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
}
/// This takes care of handling messages from an established socket connection.
async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option<SocketAddr>) -> (S, ws::WebSocket)
async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option<IpAddr>) -> (S, ws::WebSocket)
where S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin
{
// This could be a oneshot channel, but it's useful to be able to clone
@@ -155,7 +157,7 @@ async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_
if let node::Payload::SystemConnected(info) = payload {
let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id,
ip: addr.map(|a| a.ip()),
ip: addr,
node: info.node,
genesis_hash: info.genesis_hash,
}).await;
+117
View File
@@ -0,0 +1,117 @@
use std::net::{ SocketAddr, IpAddr };
use warp::filters::header;
use warp::filters::addr;
use warp::Filter;
/**
A warp filter to extract the "real" IP address of the connection by looking at headers
set by proxies (this is inspired by Actix Web's implementation of the feature).
First, check for the standardised "Forwarded" header. This looks something like:
"Forwarded: for=12.34.56.78;host=example.com;proto=https, for=23.45.67.89"
Each proxy can append to this comma separated list of forwarded-details. We'll look for
the first "for" address and try to decode that.
If this doesn't yield a result, look for the non-standard but common X-Forwarded-For header,
which contains a comma separated list of addresses; each proxy in the potential chain possibly
appending one to the end. So, take the first of these if it exists.
If still no luck, look for the X-Real-IP header, which we expect to contain a single IP address.
If that _still_ doesn't work, fall back to the socket address of the connection.
Return `None` if all of this fails to yield an address.
*/
pub fn real_ip() -> impl warp::Filter<Extract = (Option<IpAddr>,), Error = warp::Rejection> + Clone {
header::optional("forwarded")
.and(header::optional("x-forwarded-for"))
.and(header::optional("x-real-ip"))
.and(addr::remote())
.map(|forwarded: Option<String>, forwarded_for: Option<String>, real_ip: Option<String>, addr: Option<SocketAddr>| {
let realip = forwarded.as_ref().and_then(|val| get_first_addr_from_forwarded_header(val))
.or_else(|| {
// fall back to X-Forwarded-For
forwarded_for.as_ref().and_then(|val| get_first_addr_from_x_forwarded_for_header(val))
})
.or_else(|| {
// fall back to X-Real-IP
real_ip.as_ref().map(|val| val.trim())
})
.and_then(|ip| {
// Trim the port if it exists
ip.split(":").next()
})
.and_then(|ip| {
// Attempt to parse to a socket address
ip.parse::<IpAddr>().ok()
})
// Fall back to local IP address if the above fails
.or(addr.map(|a| a.ip()));
realip
})
}
/// Follow https://datatracker.ietf.org/doc/html/rfc7239 to decode the Forwarded header value.
/// Roughly, proxies can add new sets of values by appending a comma to the existing list
/// (so we have something like "values1, values2, values3" from proxy1, proxy2 and proxy3 for
/// instance) and then the valeus themselves are ';' separated name=value pairs. The value in each
/// pair may or may not be surrounded in double quotes.
///
/// Examples from the RFC:
///
/// Forwarded: for="_gazonk"
/// Forwarded: For="[2001:db8:cafe::17]:4711"
/// Forwarded: for=192.0.2.60;proto=http;by=203.0.113.43
/// Forwarded: for=192.0.2.43, for=198.51.100.17
fn get_first_addr_from_forwarded_header(value: &str) -> Option<&str> {
let first_values = value.split(',').next()?;
for pair in first_values.split(';') {
let mut parts = pair.trim().splitn(2, '=');
let key = parts.next()?;
let value = parts.next()?;
if key.to_lowercase() == "for" {
// trim double quotes if they surround the value:
let value = if value.starts_with('"') && value.ends_with('"') {
&value[1..value.len() - 1]
} else {
value
};
return Some(value);
}
}
None
}
fn get_first_addr_from_x_forwarded_for_header(value: &str) -> Option<&str> {
value.split(",").map(|val| val.trim()).next()
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn get_addr_from_forwarded_rfc_examples() {
let examples = vec![
(r#"for="_gazonk""#, "_gazonk"),
(r#"For="[2001:db8:cafe::17]:4711""#, "[2001:db8:cafe::17]:4711"),
(r#"for=192.0.2.60;proto=http;by=203.0.113.43"#, "192.0.2.60"),
(r#"for=192.0.2.43, for=198.51.100.17"#, "192.0.2.43"),
];
for (value, expected) in examples {
assert_eq!(
get_first_addr_from_forwarded_header(value),
Some(expected),
"Header value: {}", value
);
}
}
}
@@ -3,7 +3,7 @@ use std::sync::atomic::AtomicU64;
use futures::channel::mpsc;
use futures::{ future, Sink, SinkExt };
use super::inner_loop;
use super::find_location::{ self, find_location };
use crate::find_location::find_location;
use crate::state::NodeId;
use std::net::Ipv4Addr;
+33 -5
View File
@@ -8,13 +8,13 @@ use common::{
util::now
};
use bimap::BiMap;
use std::{iter::FromIterator, net::Ipv4Addr, str::FromStr};
use std::{net::{IpAddr, Ipv4Addr}, str::FromStr};
use futures::channel::{ mpsc };
use futures::{ SinkExt, StreamExt };
use std::collections::{ HashMap, HashSet };
use crate::state::{ self, State, NodeId };
use crate::feed_message::{ self, FeedMessageSerializer };
use super::find_location;
use crate::find_location;
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
@@ -143,7 +143,7 @@ pub struct InnerLoop {
/// These feeds want finality info, too.
feed_conn_id_finality: HashSet<ConnId>,
/// Send messages here to make location requests, which are sent back into the loop.
/// Send messages here to make geographical location requests.
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>
}
@@ -184,8 +184,32 @@ impl InnerLoop {
}
}
/// Handle messages that come from the node geographical locator.
async fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) {
// TODO: Update node location here
self.node_state.update_node_location(node_id, location.clone());
if let Some(loc) = location {
let mut feed_message_serializer = FeedMessageSerializer::new();
feed_message_serializer.push(feed_message::LocatedNode(
node_id,
loc.latitude,
loc.longitude,
&loc.city
));
if let Some(bytes) = feed_message_serializer.into_finalized() {
let chain_label = self.node_state
.get_node_chain(node_id)
.map(|chain| chain.label());
if let Some(chain_label) = chain_label {
// Don't hold onto lifetime from self because we call a mut fn next:
let label = chain_label.to_owned();
// Update location for any feeds subscribed to the node's chain.
self.broadcast_to_chain_feeds(&label, ToFeedWebsocket::Bytes(bytes)).await;
}
}
}
}
/// Handle messages coming from shards.
@@ -228,7 +252,10 @@ impl InnerLoop {
).await
}
// TODO: The node has been added. use it's IP to find a location.
// Currently we only geographically locate IPV4 addresses so ignore IPV6;
if let Some(IpAddr::V4(ip_v4)) = ip {
let _ = self.tx_to_locator.send((node_id, ip_v4)).await;
}
},
}
},
@@ -409,6 +436,7 @@ impl InnerLoop {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) {
for &feed_id in feeds {
// How much faster would it be if we processed these in parallel?
// Is it practical to do so given lifetimes and such?
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
chan.send(message.clone()).await;
}
-1
View File
@@ -1,6 +1,5 @@
mod aggregator;
mod inner_loop;
mod find_location;
// Expose the various message types that can be worked with externally:
pub use inner_loop::{ FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket };
+1
View File
@@ -1,6 +1,7 @@
mod aggregator;
mod state;
mod feed_message;
mod find_location;
use std::net::SocketAddr;
use std::str::FromStr;
+3
View File
@@ -90,6 +90,9 @@ impl Chain {
pub fn get_node(&self, node_id: NodeId) -> Option<&Node> {
self.nodes.get(&node_id)
}
pub fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> {
self.nodes.get_mut(&node_id)
}
pub fn label(&self) -> &str {
&self.labels.best()
}
+4 -3
View File
@@ -6,6 +6,7 @@ use common::types::{
};
use common::util::now;
use common::node::SystemInterval;
use crate::find_location;
/// Minimum time between block below broadcasting updates to the browser gets throttled, in ms.
const THROTTLE_THRESHOLD: u64 = 100;
@@ -28,7 +29,7 @@ pub struct Node {
/// Hardware stats over time
hardware: NodeHardware,
/// Physical location details
location: Option<Arc<NodeLocation>>,
location: find_location::Location,
/// Flag marking if the node is stale (not syncing or producing blocks)
stale: bool,
/// Unix timestamp for when node started up (falls back to connection time)
@@ -88,8 +89,8 @@ impl Node {
self.location.as_deref()
}
pub fn update_location(&mut self, location: Arc<NodeLocation>) {
self.location = Some(location);
pub fn update_location(&mut self, location: find_location::Location) {
self.location = location;
}
pub fn block_details(&self) -> &BlockDetails {
+34 -1
View File
@@ -6,6 +6,7 @@ use common::types::{Block, NodeDetails, NodeLocation, Timestamp};
use common::util::{now, DenseMap, NumStats};
use common::node::Payload;
use std::iter::IntoIterator;
use crate::find_location;
use super::chain::{ self, Chain };
@@ -16,7 +17,7 @@ pub type Label = Arc<str>;
pub struct State {
next_id: NodeId,
chains: HashMap<BlockHash, Chain>,
chains_by_label: HashMap<Label, BlockHash>,
chains_by_label: HashMap<String, BlockHash>,
chains_by_node: HashMap<NodeId, BlockHash>,
/// Denylist for networks we do not want to allow connecting.
denylist: HashSet<String>,
@@ -94,6 +95,14 @@ impl State {
AddNodeResult::ChainOverQuota
},
chain::AddNodeResult::Added { chain_renamed } => {
// Update the label we use to reference the chain if
// it changes (it'll always change first time a node's added):
if chain_renamed {
let label = chain.label().to_owned();
self.chains_by_label.remove(&label);
self.chains_by_label.insert(label, genesis_hash);
}
let node = chain.get_node(node_id).unwrap();
AddNodeResult::NodeAddedToChain(NodeAddedToChain {
id: node_id,
@@ -106,6 +115,30 @@ impl State {
}
}
/// Update the location for a node. Return `false` if the node was not found.
pub fn update_node_location(&mut self, node_id: NodeId, location: find_location::Location) -> bool {
if let Some(node) = self.get_node_mut(node_id) {
node.update_location(location);
true
} else {
false
}
}
/// Get the chain that a node belongs to.
pub fn get_node_chain(&self, node_id: NodeId) -> Option<&Chain> {
self.chains_by_node
.get(&node_id)
.and_then(|chain_id| self.chains.get(chain_id))
}
/// Obtain mutable access to a node, if it's found.
fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> {
let chain_id = *self.chains_by_node.get(&node_id)?;
let chain = self.chains.get_mut(&chain_id)?;
chain.get_node_mut(node_id)
}
// /// Add a new node to our state.
// pub fn add_node(&mut self, id: GlobalId, genesis_hash: BlockHash, node: &NodeDetails) -> AddNodeResult {
// if self.denylist.contains(&*node.chain) {