diff --git a/substrate/utils/prometheus/Cargo.toml b/substrate/utils/prometheus/Cargo.toml index 7766904a57..16feedb2b5 100644 --- a/substrate/utils/prometheus/Cargo.toml +++ b/substrate/utils/prometheus/Cargo.toml @@ -22,3 +22,7 @@ derive_more = "0.99" async-std = { version = "1.6.5", features = ["unstable"] } tokio = "1.10" hyper = { version = "0.14.11", default-features = false, features = ["http1", "server", "tcp"] } + +[dev-dependencies] +hyper = { version = "0.14.11", features = ["client"] } +tokio = { version = "1.10", features = ["rt-multi-thread"] } diff --git a/substrate/utils/prometheus/src/lib.rs b/substrate/utils/prometheus/src/lib.rs index 9f6f6e8f22..5771b65567 100644 --- a/substrate/utils/prometheus/src/lib.rs +++ b/substrate/utils/prometheus/src/lib.rs @@ -137,12 +137,21 @@ mod known_os { prometheus_addr: SocketAddr, registry: Registry, ) -> Result<(), Error> { - use networking::Incoming; let listener = async_std::net::TcpListener::bind(&prometheus_addr) .await .map_err(|_| Error::PortInUse(prometheus_addr))?; - log::info!("〽️ Prometheus exporter started at {}", prometheus_addr); + init_prometheus_with_listener(listener, registry).await + } + + /// Init prometheus using the given listener. + pub(crate) async fn init_prometheus_with_listener( + listener: async_std::net::TcpListener, + registry: Registry, + ) -> Result<(), Error> { + use networking::Incoming; + + log::info!("〽️ Prometheus exporter started at {}", listener.local_addr()?); let service = make_service_fn(move |_| { let registry = registry.clone(); @@ -162,3 +171,46 @@ mod known_os { result } } + +#[cfg(test)] +mod tests { + use super::*; + use hyper::{Client, Uri}; + use std::convert::TryFrom; + + #[test] + fn prometheus_works() { + const METRIC_NAME: &str = "test_test_metric_name_test_test"; + + let runtime = tokio::runtime::Runtime::new().expect("Creates the runtime"); + + let listener = runtime + .block_on(async_std::net::TcpListener::bind("127.0.0.1:0")) + .expect("Creates listener"); + + let local_addr = listener.local_addr().expect("Returns the local addr"); + + let registry = Registry::default(); + register( + prometheus::Counter::new(METRIC_NAME, "yeah").expect("Creates test counter"), + ®istry, + ) + .expect("Registers the test metric"); + + runtime.spawn(known_os::init_prometheus_with_listener(listener, registry)); + + runtime.block_on(async { + let client = Client::new(); + + let res = client + .get(Uri::try_from(&format!("http://{}/metrics", local_addr)).expect("Parses URI")) + .await + .expect("Requests metrics"); + + let buf = hyper::body::to_bytes(res).await.expect("Converts body to bytes"); + + let body = String::from_utf8(buf.to_vec()).expect("Converts body to String"); + assert!(body.contains(&format!("{} 0", METRIC_NAME))); + }); + } +} diff --git a/substrate/utils/prometheus/src/networking.rs b/substrate/utils/prometheus/src/networking.rs index de1a1c41d6..a24216bd23 100644 --- a/substrate/utils/prometheus/src/networking.rs +++ b/substrate/utils/prometheus/src/networking.rs @@ -47,8 +47,8 @@ impl tokio::io::AsyncRead for TcpStream { buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { Pin::new(&mut Pin::into_inner(self).0) - .poll_read(cx, buf.initialized_mut()) - .map_ok(drop) + .poll_read(cx, buf.initialize_unfilled()) + .map_ok(|s| buf.set_filled(s)) } }