diff --git a/backend/Cargo.lock b/backend/Cargo.lock index bcc187d..4136dbf 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -107,16 +107,6 @@ dependencies = [ "serde", ] -[[package]] -name = "buf_redux" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" -dependencies = [ - "memchr", - "safemem", -] - [[package]] name = "bumpalo" version = "3.7.0" @@ -395,7 +385,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", - "rand 0.8.4", + "rand", "rustc-hex", "static_assertions", ] @@ -541,17 +531,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.2.3" @@ -560,7 +539,7 @@ checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ "cfg-if", "libc", - "wasi 0.10.2+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -594,31 +573,6 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" -[[package]] -name = "headers" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855" -dependencies = [ - "base64", - "bitflags", - "bytes", - "headers-core", - "http", - "mime", - "sha-1", - "time", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http", -] - [[package]] name = "heck" version = "0.3.3" @@ -753,15 +707,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "input_buffer" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" -dependencies = [ - "bytes", -] - [[package]] name = "instant" version = "0.1.9" @@ -867,16 +812,6 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" -[[package]] -name = "mime_guess" -version = "2.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "mio" version = "0.7.13" @@ -899,24 +834,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "multipart" -version = "0.17.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050aeedc89243f5347c3e237e3e13dc76fbe4ae3742a57b94dc14f69acf76d4" -dependencies = [ - "buf_redux", - "httparse", - "log", - "mime", - "mime_guess", - "quick-error", - "rand 0.7.3", - "safemem", - "tempfile", - "twoway", -] - [[package]] name = "native-tls" version = "0.2.7" @@ -1067,26 +984,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" -[[package]] -name = "pin-project" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.7" @@ -1196,12 +1093,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quote" version = "1.0.9" @@ -1217,19 +1108,6 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb" -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc 0.2.0", -] - [[package]] name = "rand" version = "0.8.4" @@ -1237,19 +1115,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.3", - "rand_hc 0.3.1", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", + "rand_chacha", + "rand_core", + "rand_hc", ] [[package]] @@ -1259,16 +1127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.3", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", + "rand_core", ] [[package]] @@ -1277,16 +1136,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.3", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", + "getrandom", ] [[package]] @@ -1295,7 +1145,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" dependencies = [ - "rand_core 0.6.3", + "rand_core", ] [[package]] @@ -1426,12 +1276,6 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" -[[package]] -name = "safemem" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" - [[package]] name = "same-file" version = "1.0.6" @@ -1451,12 +1295,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "scoped-tls" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" - [[package]] name = "scopeguard" version = "1.1.0" @@ -1623,7 +1461,7 @@ dependencies = [ "futures", "httparse", "log", - "rand 0.8.4", + "rand", "sha-1", ] @@ -1711,7 +1549,6 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "warp", ] [[package]] @@ -1724,6 +1561,7 @@ dependencies = [ "futures", "hex", "http", + "hyper", "log", "primitive-types", "serde", @@ -1734,7 +1572,6 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "warp", ] [[package]] @@ -1745,7 +1582,7 @@ checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ "cfg-if", "libc", - "rand 0.8.4", + "rand", "redox_syscall", "remove_dir_all", "winapi", @@ -1872,30 +1709,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8864d706fdb3cc0843a49647ac892720dac98a6eeb818b77190592cf4994066" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - -[[package]] -name = "tokio-tungstenite" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" -dependencies = [ - "futures-util", - "log", - "pin-project", - "tokio", - "tungstenite", -] - [[package]] name = "tokio-util" version = "0.6.7" @@ -1925,7 +1738,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if", - "log", "pin-project-lite", "tracing-core", ] @@ -1945,34 +1757,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "tungstenite" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" -dependencies = [ - "base64", - "byteorder", - "bytes", - "http", - "httparse", - "input_buffer", - "log", - "rand 0.8.4", - "sha-1", - "url", - "utf-8", -] - -[[package]] -name = "twoway" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1" -dependencies = [ - "memchr", -] - [[package]] name = "typenum" version = "1.13.0" @@ -1991,15 +1775,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.5" @@ -2048,12 +1823,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf-8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" - [[package]] name = "vcpkg" version = "0.2.15" @@ -2093,41 +1862,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "warp" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "332d47745e9a0c38636dbd454729b147d16bd1ed08ae67b3ab281c4506771054" -dependencies = [ - "bytes", - "futures", - "headers", - "http", - "hyper", - "log", - "mime", - "mime_guess", - "multipart", - "percent-encoding", - "pin-project", - "scoped-tls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-stream", - "tokio-tungstenite", - "tokio-util", - "tower-service", - "tracing", -] - -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index 941dfeb..8a0ecd8 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -20,7 +20,7 @@ pub enum FromShardAggregator { /// Get information about a new node, including it's IP /// address and chain genesis hash. AddNode { - ip: Option, + ip: IpAddr, node: NodeDetails, local_id: ShardNodeId, genesis_hash: BlockHash, diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 5aca032..6015a79 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -30,7 +30,6 @@ structopt = "0.3.21" thiserror = "1.0.25" tokio = { version = "1.7.0", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } -warp = "0.3.1" [dev-dependencies] criterion = { version = "0.3.4", features = ["async", "async_tokio"] } diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index da20e2a..d3dc61a 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -36,7 +36,7 @@ pub enum FromShardWebsocket { /// Tell the aggregator about a new node. Add { local_id: ShardNodeId, - ip: Option, + ip: std::net::IpAddr, node: common::node_types::NodeDetails, genesis_hash: common::node_types::BlockHash, }, @@ -277,7 +277,7 @@ impl InnerLoop { // Ask for the grographical location of the node. // Currently we only geographically locate IPV4 addresses so ignore IPV6. - if let Some(IpAddr::V4(ip_v4)) = ip { + if let IpAddr::V4(ip_v4) = ip { let _ = self.tx_to_locator.unbounded_send((node_id, ip_v4)); } } diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index ffc28f7..182568a 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -12,6 +12,7 @@ common = { path = "../common" } futures = "0.3.15" hex = "0.4.3" http = "0.2.4" +hyper = "0.14.11" log = "0.4.14" primitive-types = { version = "0.9.0", features = ["serde"] } serde = { version = "1.0.126", features = ["derive"] } @@ -22,4 +23,3 @@ structopt = "0.3.21" thiserror = "1.0.25" tokio = { version = "1.7.0", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } -warp = "0.3.1" diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 0f0e04c..2dc7e56 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -49,7 +49,7 @@ pub enum FromWebsocket { /// Tell the aggregator about a new node. Add { message_id: node_message::NodeMessageId, - ip: Option, + ip: std::net::IpAddr, node: common::node_types::NodeDetails, genesis_hash: BlockHash, }, diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 6572966..711a9f2 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -11,11 +11,10 @@ use aggregator::{Aggregator, FromWebsocket}; use common::node_message; use futures::{channel::mpsc, SinkExt, StreamExt}; use http::Uri; -use real_ip::real_ip; use simple_logger::SimpleLogger; use structopt::StructOpt; -use warp::filters::ws; -use warp::Filter; +use hyper::{ Response, Method }; +use common::http_utils; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -63,44 +62,51 @@ async fn main() { /// Declare our routes and start the server. async fn start_server(opts: Opts) -> anyhow::Result<()> { let aggregator = Aggregator::spawn(opts.core_url).await?; + let server = http_utils::start_server(opts.socket, move |addr, req| { + let aggregator = aggregator.clone(); + async move { + match (req.method(), req.uri().path().trim_end_matches('/')) { + // Check that the server is up and running: + (&Method::GET, "/health") => { + Ok(Response::new("OK".into())) + }, + // Nodes send messages here: + (&Method::GET, "/submit") => { + let real_addr = real_ip::real_ip(addr, req.headers()); + Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move { + let tx_to_aggregator = aggregator.subscribe_node(); + let (mut tx_to_aggregator, mut ws_send) + = handle_node_websocket_connection(real_addr, ws_send, ws_recv, tx_to_aggregator).await; + log::info!("Closing /submit connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; + let _ = ws_send.close().await; + })) + }, + // 404 for anything else: + _ => { + Ok(Response::builder() + .status(404) + .body("Not found".into()) + .unwrap()) + } + } + } + }); - // Handle requests to /health by returning OK. - let health_route = warp::path("health").map(|| "OK"); - - // Handle websocket requests to /submit. - let ws_route = warp::path("submit").and(warp::ws()).and(real_ip()).map( - move |ws: ws::Ws, addr: Option| { - // Send messages from the websocket connection to this sink - // to have them pass to the aggregator. - let tx_to_aggregator = aggregator.subscribe_node(); - log::info!("Opening /submit connection from {:?}", addr); - ws.on_upgrade(move |websocket| async move { - let (mut tx_to_aggregator, websocket) = - handle_node_websocket_connection(websocket, tx_to_aggregator, addr).await; - log::info!("Closing /submit connection from {:?}", addr); - // Tell the aggregator that this connection has closed, so it can tidy up. - let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; - // Note: IF we want to close with a status code and reason, we need to construct - // a ws::Message using `ws::Message::close_with`, rather than using this method: - let _ = websocket.close().await; - }) - }, - ); - - // Merge the routes and start our server: - let routes = ws_route.or(health_route); - warp::serve(routes).run(opts.socket).await; + server.await?; Ok(()) } /// This takes care of handling messages from an established socket connection. async fn handle_node_websocket_connection( - mut websocket: ws::WebSocket, + real_addr: IpAddr, + ws_send: http_utils::WsSender, + mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, - addr: Option, -) -> (S, ws::WebSocket) +) -> (S, http_utils::WsSender) where - S: futures::Sink + Unpin, + S: futures::Sink + Unpin + Send + 'static, { // This could be a oneshot channel, but it's useful to be able to clone // messages, and we can't clone oneshot channel senders. @@ -112,7 +118,7 @@ where }; if let Err(e) = tx_to_aggregator.send(init_msg).await { log::error!("Error sending message to aggregator: {}", e); - return (tx_to_aggregator, websocket); + return (tx_to_aggregator, ws_send); } // Now we've "initialized", wait for messages from the node. Messages will @@ -120,38 +126,27 @@ where // of messages with some message ID will be sent (a node could have more // than one of these), or updates linked to a specific message_id. loop { + let mut bytes = Vec::new(); tokio::select! { - // The close channel has fired, so end the loop: + // The close channel has fired, so end the loop. `ws_recv.receive_data` is + // *not* cancel safe, but since we're closing the connection we don't care. _ = close_connection_rx.next() => { - log::info!("connection to {:?} being closed by aggregator", addr); + log::info!("connection to {:?} being closed by aggregator", real_addr); break }, // A message was received; handle it: - msg = websocket.next() => { - let msg = match msg { - Some(msg) => msg, - None => { log::warn!("Websocket connection from {:?} closed", addr); break } - }; - - // If we see any errors, log them and end our loop: - let msg = match msg { - Err(e) => { log::error!("Error in node websocket connection: {}", e); break }, - Ok(msg) => msg, - }; - - // Close message? Break to close connection. - if msg.is_close() { + msg_info = ws_recv.receive_data(&mut bytes) => { + // Handle the socket closing, or errors receiving the message. + if let Err(soketto::connection::Error::Closed) = msg_info { + break; + } + if let Err(e) = msg_info { + log::error!("Shutting down websocket connection: Failed to receive data: {}", e); break; } - // If the message isn't something we want to handle, just ignore it. - // This includes system messages like "pings" and such, so don't log anything. - if !msg.is_binary() && !msg.is_text() { - continue; - } // Deserialize from JSON, warning in debug mode if deserialization fails: - let bytes = msg.as_bytes(); - let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) { + let node_message: json_message::NodeMessage = match serde_json::from_slice(&bytes) { Ok(node_message) => node_message, #[cfg(debug)] Err(e) => { @@ -177,7 +172,7 @@ where if let node_message::Payload::SystemConnected(info) = payload { let _ = tx_to_aggregator.send(FromWebsocket::Add { message_id, - ip: addr, + ip: real_addr, node: info.node, genesis_hash: info.genesis_hash, }).await; @@ -193,5 +188,5 @@ where } // Return what we need to close the connection gracefully: - (tx_to_aggregator, websocket) + (tx_to_aggregator, ws_send) } diff --git a/backend/telemetry_shard/src/real_ip.rs b/backend/telemetry_shard/src/real_ip.rs index 604a7ff..5f2aa3f 100644 --- a/backend/telemetry_shard/src/real_ip.rs +++ b/backend/telemetry_shard/src/real_ip.rs @@ -1,10 +1,7 @@ use std::net::{IpAddr, SocketAddr}; -use warp::filters::addr; -use warp::filters::header; -use warp::Filter; /** -A warp filter to extract the "real" IP address of the connection by looking at headers +Extract the "real" IP address of the connection by looking at headers set by proxies (this is inspired by Actix Web's implementation of the feature). First, check for the standardised "Forwarded" header. This looks something like: @@ -21,28 +18,28 @@ appending one to the end. So, take the first of these if it exists. If still no luck, look for the X-Real-IP header, which we expect to contain a single IP address. If that _still_ doesn't work, fall back to the socket address of the connection. - -Return `None` if all of this fails to yield an address. */ -pub fn real_ip() -> impl warp::Filter,), Error = warp::Rejection> + Clone -{ - header::optional("forwarded") - .and(header::optional("x-forwarded-for")) - .and(header::optional("x-real-ip")) - .and(addr::remote()) - .map(pick_best_ip_from_options) +pub fn real_ip(addr: SocketAddr, headers: &hyper::HeaderMap) -> IpAddr { + let forwarded = headers.get("forwarded").and_then(header_as_str); + let forwarded_for = headers.get("x-forwarded-for").and_then(header_as_str); + let real_ip = headers.get("x-real-ip").and_then(header_as_str); + pick_best_ip_from_options(forwarded, forwarded_for, real_ip, addr) +} + +fn header_as_str(value: &hyper::header::HeaderValue) -> Option<&str> { + std::str::from_utf8(value.as_bytes()).ok() } fn pick_best_ip_from_options( // Forwarded header value (if present) - forwarded: Option, + forwarded: Option<&str>, // X-Forwarded-For header value (if present) - forwarded_for: Option, + forwarded_for: Option<&str>, // X-Real-IP header value (if present) - real_ip: Option, + real_ip: Option<&str>, // socket address (if known) - addr: Option, -) -> Option { + addr: SocketAddr, +) -> IpAddr { let realip = forwarded .as_ref() .and_then(|val| get_first_addr_from_forwarded_header(val)) @@ -65,7 +62,7 @@ fn pick_best_ip_from_options( .ok() }) // Fall back to local IP address if the above fails - .or(addr.map(|a| a.ip())); + .unwrap_or(addr.ip()); realip }