Add geoip db (#477)

* Use geoip for ipv4 city lookup

Signed-off-by: i1i1 <vanyarybin1@live.ru>

* Add support for ipv6

Signed-off-by: i1i1 <vanyarybin1@live.ru>

* Wrap locator into a blocking task

* Dummy change to try and trigger CI pipeline.

Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Ivan
2022-07-20 00:06:07 +03:00
committed by GitHub
parent a93a6cadfc
commit 5bff41d0c1
6 changed files with 70 additions and 147 deletions
+22
View File
@@ -750,6 +750,15 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9"
[[package]]
name = "ipnetwork"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4088d739b183546b239688ddbc79891831df421773df95e236daf7867866d355"
dependencies = [
"serde",
]
[[package]]
name = "itertools"
version = "0.10.1"
@@ -831,6 +840,18 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "maxminddb"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe2ba61113f9f7a9f0e87c519682d39c43a6f3f79c2cc42c3ba3dda83b1fa334"
dependencies = [
"ipnetwork",
"log",
"memchr",
"serde",
]
[[package]]
name = "memchr"
version = "2.4.0"
@@ -1613,6 +1634,7 @@ dependencies = [
"hyper",
"jemallocator",
"log",
"maxminddb",
"num_cpus",
"once_cell",
"parking_lot",
+1
View File
@@ -17,6 +17,7 @@ hex = "0.4.3"
http = "0.2.4"
hyper = "0.14.11"
log = "0.4.14"
maxminddb = "0.23.0"
num_cpus = "1.13.0"
once_cell = "1.8.0"
parking_lot = "0.11.1"
Binary file not shown.

After

Width:  |  Height:  |  Size: 66 MiB

@@ -19,7 +19,7 @@ use crate::find_location::find_location;
use crate::state::NodeId;
use common::id_type;
use futures::{future, Sink, SinkExt};
use std::net::Ipv4Addr;
use std::net::IpAddr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
@@ -94,7 +94,7 @@ impl Aggregator {
/// any more, this task will gracefully end.
async fn handle_messages(
rx_from_external: flume::Receiver<inner_loop::ToAggregator>,
tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>,
tx_to_aggregator: flume::Sender<(NodeId, IpAddr)>,
max_queue_len: usize,
denylist: Vec<String>,
max_third_party_nodes: usize,
@@ -30,10 +30,7 @@ use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};
use std::{net::IpAddr, str::FromStr};
/// Incoming messages come via subscriptions, and end up looking like this.
#[derive(Clone, Debug)]
@@ -171,7 +168,7 @@ pub struct InnerLoop {
chain_to_feed_conn_ids: MultiMapUnique<BlockHash, ConnId>,
/// Send messages here to make geographical location requests.
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
tx_to_locator: flume::Sender<(NodeId, IpAddr)>,
/// How big can the queue of messages coming in to the aggregator get before messages
/// are prioritised and dropped to try and get back on track.
@@ -181,7 +178,7 @@ pub struct InnerLoop {
impl InnerLoop {
/// Create a new inner loop handler with the various state it needs.
pub fn new(
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
tx_to_locator: flume::Sender<(NodeId, IpAddr)>,
denylist: Vec<String>,
max_queue_len: usize,
max_third_party_nodes: usize,
@@ -380,10 +377,7 @@ impl InnerLoop {
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all);
// Ask for the grographical location of the node.
// Currently we only geographically locate IPV4 addresses so ignore IPV6.
if let IpAddr::V4(ip_v4) = ip {
let _ = self.tx_to_locator.send((node_id, ip_v4));
}
let _ = self.tx_to_locator.send((node_id, ip));
}
}
}
+41 -135
View File
@@ -14,24 +14,22 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::net::Ipv4Addr;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use futures::{Sink, SinkExt};
use maxminddb::{geoip2::City, Reader as GeoIpReader};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use serde::Deserialize;
use anyhow::Context;
use common::node_types::NodeLocation;
use tokio::sync::Semaphore;
/// The returned location is optional; it may be None if not found.
pub type Location = Option<Arc<NodeLocation>>;
/// This is responsible for taking an IP address and attempting
/// to find a geographical location from this
pub fn find_location<Id, R>(response_chan: R) -> flume::Sender<(Id, Ipv4Addr)>
pub fn find_location<Id, R>(response_chan: R) -> flume::Sender<(Id, IpAddr)>
where
R: Sink<(Id, Option<Arc<NodeLocation>>)> + Unpin + Send + Clone + 'static,
Id: Clone + Send + 'static,
@@ -39,11 +37,11 @@ where
let (tx, rx) = flume::unbounded();
// cache entries
let mut cache: FxHashMap<Ipv4Addr, Arc<NodeLocation>> = FxHashMap::default();
let mut cache: FxHashMap<IpAddr, Arc<NodeLocation>> = FxHashMap::default();
// Default entry for localhost
cache.insert(
Ipv4Addr::new(127, 0, 0, 1),
Ipv4Addr::new(127, 0, 0, 1).into(),
Arc::new(NodeLocation {
latitude: 52.516_6667,
longitude: 13.4,
@@ -56,24 +54,16 @@ where
// Spawn a loop to handle location requests
tokio::spawn(async move {
// Allow 4 requests at a time. acquiring a token will block while the
// number of concurrent location requests is more than this.
let semaphore = Arc::new(Semaphore::new(4));
loop {
while let Ok((id, ip_address)) = rx.recv_async().await {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut response_chan = response_chan.clone();
let locator = locator.clone();
// Once we have acquired our permit, spawn a task to avoid
// blocking this loop so that we can handle concurrent requests.
tokio::spawn(async move {
let location = locator.locate(ip_address).await;
let location = tokio::task::spawn_blocking(move || locator.locate(ip_address))
.await
.expect("Locate never panics");
let _ = response_chan.send((id, location)).await;
// ensure permit is moved into task by dropping it explicitly:
drop(permit);
});
}
}
@@ -84,23 +74,26 @@ where
/// This struct can be used to make location requests, given
/// an IPV4 address.
#[derive(Clone)]
#[derive(Debug, Clone)]
struct Locator {
client: reqwest::Client,
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Arc<NodeLocation>>>>,
city: Arc<maxminddb::Reader<&'static [u8]>>,
cache: Arc<RwLock<FxHashMap<IpAddr, Arc<NodeLocation>>>>,
}
impl Locator {
pub fn new(cache: FxHashMap<Ipv4Addr, Arc<NodeLocation>>) -> Self {
let client = reqwest::Client::new();
/// Taken from here: https://github.com/P3TERX/GeoLite.mmdb/releases/tag/2022.06.07
const CITY_DATA: &'static [u8] = include_bytes!("GeoLite2-City.mmdb");
Locator {
client,
pub fn new(cache: FxHashMap<IpAddr, Arc<NodeLocation>>) -> Self {
Self {
city: GeoIpReader::from_source(Self::CITY_DATA)
.map(Arc::new)
.expect("City data is always valid"),
cache: Arc::new(RwLock::new(cache)),
}
}
pub async fn locate(&self, ip: Ipv4Addr) -> Option<Arc<NodeLocation>> {
pub fn locate(&self, ip: IpAddr) -> Option<Arc<NodeLocation>> {
// Return location quickly if it's cached:
let cached_loc = {
let cache_reader = self.cache.read();
@@ -110,98 +103,25 @@ impl Locator {
return cached_loc;
}
// Look it up via ipapi.co:
let mut location = self.iplocate_ipapi_co(ip).await;
let City { city, location, .. } = self.city.lookup(ip.into()).ok()?;
let city = city
.as_ref()?
.names
.as_ref()?
.get("en")?
.to_string()
.into_boxed_str();
let latitude = location.as_ref()?.latitude? as f32;
let longitude = location?.longitude? as f32;
// If that fails, try looking it up via ipinfo.co instead:
if let Err(e) = &location {
log::warn!(
"Couldn't obtain location information for {} from ipapi.co: {}",
ip,
e
);
location = self.iplocate_ipinfo_io(ip).await
}
// If both fail, we've logged the errors and we'll return None.
if let Err(e) = &location {
log::warn!(
"Couldn't obtain location information for {} from ipinfo.co: {}",
ip,
e
);
}
// If we successfully obtained a location, cache it
if let Ok(location) = &location {
self.cache.write().insert(ip, location.clone());
}
// Discard the error; we've logged information above.
location.ok()
}
async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Arc<NodeLocation>, anyhow::Error> {
let location = self.query(&format!("https://ipapi.co/{}/json", ip)).await?;
Ok(Arc::new(location))
}
async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result<Arc<NodeLocation>, anyhow::Error> {
let location = self
.query::<IPApiLocate>(&format!("https://ipinfo.io/{}/json", ip))
.await?
.into_node_location()
.with_context(|| "Could not convert response into node location")?;
Ok(Arc::new(location))
}
async fn query<T>(&self, url: &str) -> Result<T, anyhow::Error>
where
for<'de> T: Deserialize<'de>,
{
let res = self
.client
.get(url)
.send()
.await?
.bytes()
.await
.with_context(|| "Failed to obtain response body")?;
serde_json::from_slice(&res)
.with_context(|| format!{"Failed to decode '{}'", std::str::from_utf8(&res).unwrap_or("INVALID_UTF8")})
}
}
/// This is the format returned from ipinfo.co, so we do
/// a little conversion to get it into the shape we want.
#[derive(Deserialize, Debug, Clone)]
struct IPApiLocate {
city: Box<str>,
loc: Box<str>,
}
impl IPApiLocate {
fn into_node_location(self) -> Option<NodeLocation> {
let IPApiLocate { city, loc } = self;
let mut loc = loc.split(',').map(|n| n.parse());
let latitude = loc.next()?.ok()?;
let longitude = loc.next()?.ok()?;
// Guarantee that the iterator has been exhausted
if loc.next().is_some() {
return None;
}
Some(NodeLocation {
let location = Arc::new(NodeLocation {
city,
latitude,
longitude,
city,
})
});
self.cache.write().insert(ip, Arc::clone(&location));
Some(location)
}
}
@@ -210,28 +130,14 @@ mod tests {
use super::*;
#[test]
fn ipapi_locate_to_node_location() {
let ipapi = IPApiLocate {
loc: "12.5,56.25".into(),
city: "Foobar".into(),
};
let location = ipapi.into_node_location().unwrap();
assert_eq!(location.latitude, 12.5);
assert_eq!(location.longitude, 56.25);
assert_eq!(&*location.city, "Foobar");
fn locator_construction() {
Locator::new(Default::default());
}
#[test]
fn ipapi_locate_to_node_location_too_many() {
let ipapi = IPApiLocate {
loc: "12.5,56.25,1.0".into(),
city: "Foobar".into(),
};
let location = ipapi.into_node_location();
assert!(location.is_none());
fn locate_random_ip() {
let ip = "12.5.56.25".parse().unwrap();
let node_location = Locator::new(Default::default()).locate(ip).unwrap();
assert_eq!(&*node_location.city, "El Paso");
}
}