diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 73057ee..4673e61 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -750,6 +750,15 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +[[package]] +name = "ipnetwork" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4088d739b183546b239688ddbc79891831df421773df95e236daf7867866d355" +dependencies = [ + "serde", +] + [[package]] name = "itertools" version = "0.10.1" @@ -831,6 +840,18 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maxminddb" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe2ba61113f9f7a9f0e87c519682d39c43a6f3f79c2cc42c3ba3dda83b1fa334" +dependencies = [ + "ipnetwork", + "log", + "memchr", + "serde", +] + [[package]] name = "memchr" version = "2.4.0" @@ -1613,6 +1634,7 @@ dependencies = [ "hyper", "jemallocator", "log", + "maxminddb", "num_cpus", "once_cell", "parking_lot", diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 731b575..bed7d32 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -17,6 +17,7 @@ hex = "0.4.3" http = "0.2.4" hyper = "0.14.11" log = "0.4.14" +maxminddb = "0.23.0" num_cpus = "1.13.0" once_cell = "1.8.0" parking_lot = "0.11.1" diff --git a/backend/telemetry_core/src/GeoLite2-City.mmdb b/backend/telemetry_core/src/GeoLite2-City.mmdb new file mode 100644 index 0000000..c5e7b8c Binary files /dev/null and b/backend/telemetry_core/src/GeoLite2-City.mmdb differ diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index f46bdca..e12599e 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -19,7 +19,7 @@ use crate::find_location::find_location; use crate::state::NodeId; use common::id_type; use futures::{future, Sink, SinkExt}; -use std::net::Ipv4Addr; +use std::net::IpAddr; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -94,7 +94,7 @@ impl Aggregator { /// any more, this task will gracefully end. async fn handle_messages( rx_from_external: flume::Receiver, - tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>, + tx_to_aggregator: flume::Sender<(NodeId, IpAddr)>, max_queue_len: usize, denylist: Vec, max_third_party_nodes: usize, diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index d092150..77f3e73 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -30,10 +30,7 @@ use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, }; -use std::{ - net::{IpAddr, Ipv4Addr}, - str::FromStr, -}; +use std::{net::IpAddr, str::FromStr}; /// Incoming messages come via subscriptions, and end up looking like this. #[derive(Clone, Debug)] @@ -171,7 +168,7 @@ pub struct InnerLoop { chain_to_feed_conn_ids: MultiMapUnique, /// Send messages here to make geographical location requests. - tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, + tx_to_locator: flume::Sender<(NodeId, IpAddr)>, /// How big can the queue of messages coming in to the aggregator get before messages /// are prioritised and dropped to try and get back on track. @@ -181,7 +178,7 @@ pub struct InnerLoop { impl InnerLoop { /// Create a new inner loop handler with the various state it needs. pub fn new( - tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, + tx_to_locator: flume::Sender<(NodeId, IpAddr)>, denylist: Vec, max_queue_len: usize, max_third_party_nodes: usize, @@ -380,10 +377,7 @@ impl InnerLoop { self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all); // Ask for the grographical location of the node. - // Currently we only geographically locate IPV4 addresses so ignore IPV6. - if let IpAddr::V4(ip_v4) = ip { - let _ = self.tx_to_locator.send((node_id, ip_v4)); - } + let _ = self.tx_to_locator.send((node_id, ip)); } } } diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index 489f044..2ae6967 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -14,24 +14,22 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::net::Ipv4Addr; +use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; use futures::{Sink, SinkExt}; +use maxminddb::{geoip2::City, Reader as GeoIpReader}; use parking_lot::RwLock; use rustc_hash::FxHashMap; -use serde::Deserialize; -use anyhow::Context; use common::node_types::NodeLocation; -use tokio::sync::Semaphore; /// The returned location is optional; it may be None if not found. pub type Location = Option>; /// This is responsible for taking an IP address and attempting /// to find a geographical location from this -pub fn find_location(response_chan: R) -> flume::Sender<(Id, Ipv4Addr)> +pub fn find_location(response_chan: R) -> flume::Sender<(Id, IpAddr)> where R: Sink<(Id, Option>)> + Unpin + Send + Clone + 'static, Id: Clone + Send + 'static, @@ -39,11 +37,11 @@ where let (tx, rx) = flume::unbounded(); // cache entries - let mut cache: FxHashMap> = FxHashMap::default(); + let mut cache: FxHashMap> = FxHashMap::default(); // Default entry for localhost cache.insert( - Ipv4Addr::new(127, 0, 0, 1), + Ipv4Addr::new(127, 0, 0, 1).into(), Arc::new(NodeLocation { latitude: 52.516_6667, longitude: 13.4, @@ -56,24 +54,16 @@ where // Spawn a loop to handle location requests tokio::spawn(async move { - // Allow 4 requests at a time. acquiring a token will block while the - // number of concurrent location requests is more than this. - let semaphore = Arc::new(Semaphore::new(4)); - loop { while let Ok((id, ip_address)) = rx.recv_async().await { - let permit = semaphore.clone().acquire_owned().await.unwrap(); let mut response_chan = response_chan.clone(); let locator = locator.clone(); - // Once we have acquired our permit, spawn a task to avoid - // blocking this loop so that we can handle concurrent requests. tokio::spawn(async move { - let location = locator.locate(ip_address).await; + let location = tokio::task::spawn_blocking(move || locator.locate(ip_address)) + .await + .expect("Locate never panics"); let _ = response_chan.send((id, location)).await; - - // ensure permit is moved into task by dropping it explicitly: - drop(permit); }); } } @@ -84,23 +74,26 @@ where /// This struct can be used to make location requests, given /// an IPV4 address. -#[derive(Clone)] +#[derive(Debug, Clone)] struct Locator { - client: reqwest::Client, - cache: Arc>>>, + city: Arc>, + cache: Arc>>>, } impl Locator { - pub fn new(cache: FxHashMap>) -> Self { - let client = reqwest::Client::new(); + /// Taken from here: https://github.com/P3TERX/GeoLite.mmdb/releases/tag/2022.06.07 + const CITY_DATA: &'static [u8] = include_bytes!("GeoLite2-City.mmdb"); - Locator { - client, + pub fn new(cache: FxHashMap>) -> Self { + Self { + city: GeoIpReader::from_source(Self::CITY_DATA) + .map(Arc::new) + .expect("City data is always valid"), cache: Arc::new(RwLock::new(cache)), } } - pub async fn locate(&self, ip: Ipv4Addr) -> Option> { + pub fn locate(&self, ip: IpAddr) -> Option> { // Return location quickly if it's cached: let cached_loc = { let cache_reader = self.cache.read(); @@ -110,98 +103,25 @@ impl Locator { return cached_loc; } - // Look it up via ipapi.co: - let mut location = self.iplocate_ipapi_co(ip).await; + let City { city, location, .. } = self.city.lookup(ip.into()).ok()?; + let city = city + .as_ref()? + .names + .as_ref()? + .get("en")? + .to_string() + .into_boxed_str(); + let latitude = location.as_ref()?.latitude? as f32; + let longitude = location?.longitude? as f32; - // If that fails, try looking it up via ipinfo.co instead: - if let Err(e) = &location { - log::warn!( - "Couldn't obtain location information for {} from ipapi.co: {}", - ip, - e - ); - location = self.iplocate_ipinfo_io(ip).await - } - - // If both fail, we've logged the errors and we'll return None. - if let Err(e) = &location { - log::warn!( - "Couldn't obtain location information for {} from ipinfo.co: {}", - ip, - e - ); - } - - // If we successfully obtained a location, cache it - if let Ok(location) = &location { - self.cache.write().insert(ip, location.clone()); - } - - // Discard the error; we've logged information above. - location.ok() - } - - async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result, anyhow::Error> { - let location = self.query(&format!("https://ipapi.co/{}/json", ip)).await?; - - Ok(Arc::new(location)) - } - - async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result, anyhow::Error> { - let location = self - .query::(&format!("https://ipinfo.io/{}/json", ip)) - .await? - .into_node_location() - .with_context(|| "Could not convert response into node location")?; - - Ok(Arc::new(location)) - } - - async fn query(&self, url: &str) -> Result - where - for<'de> T: Deserialize<'de>, - { - let res = self - .client - .get(url) - .send() - .await? - .bytes() - .await - .with_context(|| "Failed to obtain response body")?; - - serde_json::from_slice(&res) - .with_context(|| format!{"Failed to decode '{}'", std::str::from_utf8(&res).unwrap_or("INVALID_UTF8")}) - } -} - -/// This is the format returned from ipinfo.co, so we do -/// a little conversion to get it into the shape we want. -#[derive(Deserialize, Debug, Clone)] -struct IPApiLocate { - city: Box, - loc: Box, -} - -impl IPApiLocate { - fn into_node_location(self) -> Option { - let IPApiLocate { city, loc } = self; - - let mut loc = loc.split(',').map(|n| n.parse()); - - let latitude = loc.next()?.ok()?; - let longitude = loc.next()?.ok()?; - - // Guarantee that the iterator has been exhausted - if loc.next().is_some() { - return None; - } - - Some(NodeLocation { + let location = Arc::new(NodeLocation { + city, latitude, longitude, - city, - }) + }); + self.cache.write().insert(ip, Arc::clone(&location)); + + Some(location) } } @@ -210,28 +130,14 @@ mod tests { use super::*; #[test] - fn ipapi_locate_to_node_location() { - let ipapi = IPApiLocate { - loc: "12.5,56.25".into(), - city: "Foobar".into(), - }; - - let location = ipapi.into_node_location().unwrap(); - - assert_eq!(location.latitude, 12.5); - assert_eq!(location.longitude, 56.25); - assert_eq!(&*location.city, "Foobar"); + fn locator_construction() { + Locator::new(Default::default()); } #[test] - fn ipapi_locate_to_node_location_too_many() { - let ipapi = IPApiLocate { - loc: "12.5,56.25,1.0".into(), - city: "Foobar".into(), - }; - - let location = ipapi.into_node_location(); - - assert!(location.is_none()); + fn locate_random_ip() { + let ip = "12.5.56.25".parse().unwrap(); + let node_location = Locator::new(Default::default()).locate(ip).unwrap(); + assert_eq!(&*node_location.city, "El Paso"); } }