Improve logging and error reporting around IP and location info (#386)

* Beef up error reporting of IP and location info

* Tidy up error reporting after some manual testing of it

* Don't cache erroneous locations; try again when asked again

* cargo fmt
This commit is contained in:
James Wilson
2021-08-27 16:16:26 +01:00
committed by GitHub
parent fc73a2f27a
commit 87866b2d42
3 changed files with 113 additions and 66 deletions
+63 -52
View File
@@ -22,6 +22,7 @@ use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use serde::Deserialize;
use anyhow::Context;
use common::node_types::NodeLocation;
use tokio::sync::Semaphore;
@@ -38,16 +39,16 @@ where
let (tx, rx) = flume::unbounded();
// cache entries
let mut cache: FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>> = FxHashMap::default();
let mut cache: FxHashMap<Ipv4Addr, Arc<NodeLocation>> = FxHashMap::default();
// Default entry for localhost
cache.insert(
Ipv4Addr::new(127, 0, 0, 1),
Some(Arc::new(NodeLocation {
Arc::new(NodeLocation {
latitude: 52.516_6667,
longitude: 13.4,
city: "Berlin".into(),
})),
}),
);
// Create a locator with our cache. This is used to obtain locations.
@@ -68,14 +69,8 @@ where
// Once we have acquired our permit, spawn a task to avoid
// blocking this loop so that we can handle concurrent requests.
tokio::spawn(async move {
match locator.locate(ip_address).await {
Ok(loc) => {
let _ = response_chan.send((id, loc)).await;
}
Err(e) => {
log::debug!("GET error for ip location: {:?}", e);
}
};
let location = locator.locate(ip_address).await;
let _ = response_chan.send((id, location)).await;
// ensure permit is moved into task by dropping it explicitly:
drop(permit);
@@ -92,11 +87,11 @@ where
#[derive(Clone)]
struct Locator {
client: reqwest::Client,
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>>>>,
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Arc<NodeLocation>>>>,
}
impl Locator {
pub fn new(cache: FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>>) -> Self {
pub fn new(cache: FxHashMap<Ipv4Addr, Arc<NodeLocation>>) -> Self {
let client = reqwest::Client::new();
Locator {
@@ -105,68 +100,84 @@ impl Locator {
}
}
pub async fn locate(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
pub async fn locate(&self, ip: Ipv4Addr) -> Option<Arc<NodeLocation>> {
// Return location quickly if it's cached:
let cached_loc = {
let cache_reader = self.cache.read();
cache_reader.get(&ip).cloned()
};
if let Some(loc) = cached_loc {
return Ok(loc);
if cached_loc.is_some() {
return cached_loc;
}
// Look it up via the location services if not cached:
let location = self.iplocate_ipapi_co(ip).await?;
let location = match location {
Some(location) => Ok(Some(location)),
None => self.iplocate_ipinfo_io(ip).await,
}?;
// Look it up via ipapi.co:
let mut location = self.iplocate_ipapi_co(ip).await;
self.cache.write().insert(ip, location.clone());
Ok(location)
// If that fails, try looking it up via ipinfo.co instead:
if let Err(e) = &location {
log::warn!(
"Couldn't obtain location information for {} from ipapi.co: {}",
ip,
e
);
location = self.iplocate_ipinfo_io(ip).await
}
// If both fail, we've logged the errors and we'll return None.
if let Err(e) = &location {
log::warn!(
"Couldn't obtain location information for {} from ipinfo.co: {}",
ip,
e
);
}
// If we successfully obtained a location, cache it
if let Ok(location) = &location {
self.cache.write().insert(ip, location.clone());
}
// Discard the error; we've logged information above.
location.ok()
}
async fn iplocate_ipapi_co(
&self,
ip: Ipv4Addr,
) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Arc<NodeLocation>, anyhow::Error> {
let location = self.query(&format!("https://ipapi.co/{}/json", ip)).await?;
Ok(Arc::new(location))
}
async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result<Arc<NodeLocation>, anyhow::Error> {
let location = self
.query(&format!("https://ipapi.co/{}/json", ip))
.query::<IPApiLocate>(&format!("https://ipinfo.io/{}/json", ip))
.await?
.map(Arc::new);
.into_node_location()
.with_context(|| "Could not convert response into node location")?;
Ok(location)
Ok(Arc::new(location))
}
async fn iplocate_ipinfo_io(
&self,
ip: Ipv4Addr,
) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
let location = self
.query(&format!("https://ipinfo.io/{}/json", ip))
.await?
.and_then(|loc: IPApiLocate| loc.into_node_location().map(Arc::new));
Ok(location)
}
async fn query<T>(&self, url: &str) -> Result<Option<T>, reqwest::Error>
async fn query<T>(&self, url: &str) -> Result<T, anyhow::Error>
where
for<'de> T: Deserialize<'de>,
{
match self.client.get(url).send().await?.json::<T>().await {
Ok(result) => Ok(Some(result)),
Err(err) => {
log::debug!("JSON error for ip location: {:?}", err);
Ok(None)
}
}
let res = self
.client
.get(url)
.send()
.await?
.bytes()
.await
.with_context(|| "Failed to obtain response body")?;
serde_json::from_slice(&res)
.with_context(|| format!{"Failed to decode '{}'", std::str::from_utf8(&res).unwrap_or("INVALID_UTF8")})
}
}
/// This is the format returned from ipinfo.co, so we do
/// a little conversion to get it into the shape we want.
#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone)]
struct IPApiLocate {
city: Box<str>,
loc: Box<str>,
+11 -3
View File
@@ -134,7 +134,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
(&Method::GET, "/health") => Ok(Response::new("OK".into())),
// Nodes send messages here:
(&Method::GET, "/submit") => {
let real_addr = real_ip::real_ip(addr, req.headers());
let (real_addr, real_addr_source) = real_ip::real_ip(addr, req.headers());
if let Some(reason) = block_list.blocked_reason(&real_addr) {
return Ok(Response::builder().status(403).body(reason.into()).unwrap());
@@ -143,7 +143,11 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
Ok(http_utils::upgrade_to_websocket(
req,
move |ws_send, ws_recv| async move {
log::info!("Opening /submit connection from {:?}", addr);
log::info!(
"Opening /submit connection from {:?} (address source: {})",
real_addr,
real_addr_source
);
let tx_to_aggregator = aggregator.subscribe_node();
let (mut tx_to_aggregator, mut ws_send) =
handle_node_websocket_connection(
@@ -156,7 +160,11 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
block_list,
)
.await;
log::info!("Closing /submit connection from {:?}", addr);
log::info!(
"Closing /submit connection from {:?} (address source: {})",
real_addr,
real_addr_source
);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await;
let _ = ws_send.close().await;
+39 -11
View File
@@ -35,13 +35,32 @@ If still no luck, look for the X-Real-IP header, which we expect to contain a si
If that _still_ doesn't work, fall back to the socket address of the connection.
*/
pub fn real_ip(addr: SocketAddr, headers: &hyper::HeaderMap) -> IpAddr {
pub fn real_ip(addr: SocketAddr, headers: &hyper::HeaderMap) -> (IpAddr, Source) {
let forwarded = headers.get("forwarded").and_then(header_as_str);
let forwarded_for = headers.get("x-forwarded-for").and_then(header_as_str);
let real_ip = headers.get("x-real-ip").and_then(header_as_str);
pick_best_ip_from_options(forwarded, forwarded_for, real_ip, addr)
}
/// The source of the address returned
pub enum Source {
ForwardedHeader,
XForwardedForHeader,
XRealIpHeader,
SocketAddr,
}
impl std::fmt::Display for Source {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Source::ForwardedHeader => write!(f, "'Forwarded' header"),
Source::XForwardedForHeader => write!(f, "'X-Forwarded-For' header"),
Source::XRealIpHeader => write!(f, "'X-Real-Ip' header"),
Source::SocketAddr => write!(f, "Socket address"),
}
}
}
fn header_as_str(value: &hyper::header::HeaderValue) -> Option<&str> {
std::str::from_utf8(value.as_bytes()).ok()
}
@@ -55,30 +74,39 @@ fn pick_best_ip_from_options(
real_ip: Option<&str>,
// socket address (if known)
addr: SocketAddr,
) -> IpAddr {
) -> (IpAddr, Source) {
let realip = forwarded
.as_ref()
.and_then(|val| get_first_addr_from_forwarded_header(val))
.and_then(|val| {
let addr = get_first_addr_from_forwarded_header(val)?;
Some((addr, Source::ForwardedHeader))
})
.or_else(|| {
// fall back to X-Forwarded-For
forwarded_for
.as_ref()
.and_then(|val| get_first_addr_from_x_forwarded_for_header(val))
forwarded_for.as_ref().and_then(|val| {
let addr = get_first_addr_from_x_forwarded_for_header(val)?;
Some((addr, Source::XForwardedForHeader))
})
})
.or_else(|| {
// fall back to X-Real-IP
real_ip.as_ref().map(|val| val.trim())
real_ip.as_ref().and_then(|val| {
let addr = val.trim();
Some((addr, Source::XRealIpHeader))
})
})
.and_then(|ip| {
.and_then(|(ip, source)| {
// Try parsing assuming it may have a port first,
// and then assuming it doesn't.
ip.parse::<SocketAddr>()
let addr = ip
.parse::<SocketAddr>()
.map(|s| s.ip())
.or_else(|_| ip.parse::<IpAddr>())
.ok()
.ok()?;
Some((addr, source))
})
// Fall back to local IP address if the above fails
.unwrap_or(addr.ip());
.unwrap_or((addr.ip(), Source::SocketAddr));
realip
}