From 87866b2d42585a0d2c356ae497210d7f30adce8d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 27 Aug 2021 16:16:26 +0100 Subject: [PATCH] Improve logging and error reporting around IP and location info (#386) * Beef up error reporting of IP and location info * Tidy up error reporting after some manual testing of it * Don't cache erroneous locations; try again when asked again * cargo fmt --- backend/telemetry_core/src/find_location.rs | 115 +++++++++++--------- backend/telemetry_shard/src/main.rs | 14 ++- backend/telemetry_shard/src/real_ip.rs | 50 +++++++-- 3 files changed, 113 insertions(+), 66 deletions(-) diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index f020747..489f044 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -22,6 +22,7 @@ use parking_lot::RwLock; use rustc_hash::FxHashMap; use serde::Deserialize; +use anyhow::Context; use common::node_types::NodeLocation; use tokio::sync::Semaphore; @@ -38,16 +39,16 @@ where let (tx, rx) = flume::unbounded(); // cache entries - let mut cache: FxHashMap>> = FxHashMap::default(); + let mut cache: FxHashMap> = FxHashMap::default(); // Default entry for localhost cache.insert( Ipv4Addr::new(127, 0, 0, 1), - Some(Arc::new(NodeLocation { + Arc::new(NodeLocation { latitude: 52.516_6667, longitude: 13.4, city: "Berlin".into(), - })), + }), ); // Create a locator with our cache. This is used to obtain locations. @@ -68,14 +69,8 @@ where // Once we have acquired our permit, spawn a task to avoid // blocking this loop so that we can handle concurrent requests. tokio::spawn(async move { - match locator.locate(ip_address).await { - Ok(loc) => { - let _ = response_chan.send((id, loc)).await; - } - Err(e) => { - log::debug!("GET error for ip location: {:?}", e); - } - }; + let location = locator.locate(ip_address).await; + let _ = response_chan.send((id, location)).await; // ensure permit is moved into task by dropping it explicitly: drop(permit); @@ -92,11 +87,11 @@ where #[derive(Clone)] struct Locator { client: reqwest::Client, - cache: Arc>>>>, + cache: Arc>>>, } impl Locator { - pub fn new(cache: FxHashMap>>) -> Self { + pub fn new(cache: FxHashMap>) -> Self { let client = reqwest::Client::new(); Locator { @@ -105,68 +100,84 @@ impl Locator { } } - pub async fn locate(&self, ip: Ipv4Addr) -> Result>, reqwest::Error> { + pub async fn locate(&self, ip: Ipv4Addr) -> Option> { // Return location quickly if it's cached: let cached_loc = { let cache_reader = self.cache.read(); cache_reader.get(&ip).cloned() }; - if let Some(loc) = cached_loc { - return Ok(loc); + if cached_loc.is_some() { + return cached_loc; } - // Look it up via the location services if not cached: - let location = self.iplocate_ipapi_co(ip).await?; - let location = match location { - Some(location) => Ok(Some(location)), - None => self.iplocate_ipinfo_io(ip).await, - }?; + // Look it up via ipapi.co: + let mut location = self.iplocate_ipapi_co(ip).await; - self.cache.write().insert(ip, location.clone()); - Ok(location) + // If that fails, try looking it up via ipinfo.co instead: + if let Err(e) = &location { + log::warn!( + "Couldn't obtain location information for {} from ipapi.co: {}", + ip, + e + ); + location = self.iplocate_ipinfo_io(ip).await + } + + // If both fail, we've logged the errors and we'll return None. + if let Err(e) = &location { + log::warn!( + "Couldn't obtain location information for {} from ipinfo.co: {}", + ip, + e + ); + } + + // If we successfully obtained a location, cache it + if let Ok(location) = &location { + self.cache.write().insert(ip, location.clone()); + } + + // Discard the error; we've logged information above. + location.ok() } - async fn iplocate_ipapi_co( - &self, - ip: Ipv4Addr, - ) -> Result>, reqwest::Error> { + async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result, anyhow::Error> { + let location = self.query(&format!("https://ipapi.co/{}/json", ip)).await?; + + Ok(Arc::new(location)) + } + + async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result, anyhow::Error> { let location = self - .query(&format!("https://ipapi.co/{}/json", ip)) + .query::(&format!("https://ipinfo.io/{}/json", ip)) .await? - .map(Arc::new); + .into_node_location() + .with_context(|| "Could not convert response into node location")?; - Ok(location) + Ok(Arc::new(location)) } - async fn iplocate_ipinfo_io( - &self, - ip: Ipv4Addr, - ) -> Result>, reqwest::Error> { - let location = self - .query(&format!("https://ipinfo.io/{}/json", ip)) - .await? - .and_then(|loc: IPApiLocate| loc.into_node_location().map(Arc::new)); - - Ok(location) - } - - async fn query(&self, url: &str) -> Result, reqwest::Error> + async fn query(&self, url: &str) -> Result where for<'de> T: Deserialize<'de>, { - match self.client.get(url).send().await?.json::().await { - Ok(result) => Ok(Some(result)), - Err(err) => { - log::debug!("JSON error for ip location: {:?}", err); - Ok(None) - } - } + let res = self + .client + .get(url) + .send() + .await? + .bytes() + .await + .with_context(|| "Failed to obtain response body")?; + + serde_json::from_slice(&res) + .with_context(|| format!{"Failed to decode '{}'", std::str::from_utf8(&res).unwrap_or("INVALID_UTF8")}) } } /// This is the format returned from ipinfo.co, so we do /// a little conversion to get it into the shape we want. -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone)] struct IPApiLocate { city: Box, loc: Box, diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 86f632c..78e0162 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -134,7 +134,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { (&Method::GET, "/health") => Ok(Response::new("OK".into())), // Nodes send messages here: (&Method::GET, "/submit") => { - let real_addr = real_ip::real_ip(addr, req.headers()); + let (real_addr, real_addr_source) = real_ip::real_ip(addr, req.headers()); if let Some(reason) = block_list.blocked_reason(&real_addr) { return Ok(Response::builder().status(403).body(reason.into()).unwrap()); @@ -143,7 +143,11 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { Ok(http_utils::upgrade_to_websocket( req, move |ws_send, ws_recv| async move { - log::info!("Opening /submit connection from {:?}", addr); + log::info!( + "Opening /submit connection from {:?} (address source: {})", + real_addr, + real_addr_source + ); let tx_to_aggregator = aggregator.subscribe_node(); let (mut tx_to_aggregator, mut ws_send) = handle_node_websocket_connection( @@ -156,7 +160,11 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { block_list, ) .await; - log::info!("Closing /submit connection from {:?}", addr); + log::info!( + "Closing /submit connection from {:?} (address source: {})", + real_addr, + real_addr_source + ); // Tell the aggregator that this connection has closed, so it can tidy up. let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; let _ = ws_send.close().await; diff --git a/backend/telemetry_shard/src/real_ip.rs b/backend/telemetry_shard/src/real_ip.rs index d0828d9..416673b 100644 --- a/backend/telemetry_shard/src/real_ip.rs +++ b/backend/telemetry_shard/src/real_ip.rs @@ -35,13 +35,32 @@ If still no luck, look for the X-Real-IP header, which we expect to contain a si If that _still_ doesn't work, fall back to the socket address of the connection. */ -pub fn real_ip(addr: SocketAddr, headers: &hyper::HeaderMap) -> IpAddr { +pub fn real_ip(addr: SocketAddr, headers: &hyper::HeaderMap) -> (IpAddr, Source) { let forwarded = headers.get("forwarded").and_then(header_as_str); let forwarded_for = headers.get("x-forwarded-for").and_then(header_as_str); let real_ip = headers.get("x-real-ip").and_then(header_as_str); pick_best_ip_from_options(forwarded, forwarded_for, real_ip, addr) } +/// The source of the address returned +pub enum Source { + ForwardedHeader, + XForwardedForHeader, + XRealIpHeader, + SocketAddr, +} + +impl std::fmt::Display for Source { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Source::ForwardedHeader => write!(f, "'Forwarded' header"), + Source::XForwardedForHeader => write!(f, "'X-Forwarded-For' header"), + Source::XRealIpHeader => write!(f, "'X-Real-Ip' header"), + Source::SocketAddr => write!(f, "Socket address"), + } + } +} + fn header_as_str(value: &hyper::header::HeaderValue) -> Option<&str> { std::str::from_utf8(value.as_bytes()).ok() } @@ -55,30 +74,39 @@ fn pick_best_ip_from_options( real_ip: Option<&str>, // socket address (if known) addr: SocketAddr, -) -> IpAddr { +) -> (IpAddr, Source) { let realip = forwarded .as_ref() - .and_then(|val| get_first_addr_from_forwarded_header(val)) + .and_then(|val| { + let addr = get_first_addr_from_forwarded_header(val)?; + Some((addr, Source::ForwardedHeader)) + }) .or_else(|| { // fall back to X-Forwarded-For - forwarded_for - .as_ref() - .and_then(|val| get_first_addr_from_x_forwarded_for_header(val)) + forwarded_for.as_ref().and_then(|val| { + let addr = get_first_addr_from_x_forwarded_for_header(val)?; + Some((addr, Source::XForwardedForHeader)) + }) }) .or_else(|| { // fall back to X-Real-IP - real_ip.as_ref().map(|val| val.trim()) + real_ip.as_ref().and_then(|val| { + let addr = val.trim(); + Some((addr, Source::XRealIpHeader)) + }) }) - .and_then(|ip| { + .and_then(|(ip, source)| { // Try parsing assuming it may have a port first, // and then assuming it doesn't. - ip.parse::() + let addr = ip + .parse::() .map(|s| s.ip()) .or_else(|_| ip.parse::()) - .ok() + .ok()?; + Some((addr, source)) }) // Fall back to local IP address if the above fails - .unwrap_or(addr.ip()); + .unwrap_or((addr.ip(), Source::SocketAddr)); realip }