diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8f9d056875..7e0b2ae379 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -1125,6 +1125,18 @@ dependencies = [ "syn", ] +[[package]] +name = "coarsetime" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "441947d9f3582f20b35fdd2bc5ada3a8c74c9ea380d66268607cb399b510ee08" +dependencies = [ + "libc", + "once_cell", + "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", +] + [[package]] name = "color-eyre" version = "0.6.1" @@ -1340,6 +1352,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f25d8400f4a7a5778f0e4e52384a48cbd9b5c495d110786187fc750075277a2" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -4459,11 +4481,14 @@ name = "metered-channel" version = "0.9.18" dependencies = [ "assert_matches", + "coarsetime", + "crossbeam-queue", "derive_more", "env_logger 0.9.0", "futures 0.3.21", "futures-timer", "log", + "nanorand", "thiserror", "tracing", "tracing-gum", @@ -4704,6 +4729,12 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" + [[package]] name = "native-tls" version = "0.2.8" @@ -11978,6 +12009,12 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.78" diff --git a/polkadot/node/metered-channel/Cargo.toml b/polkadot/node/metered-channel/Cargo.toml index 9681345f07..fb4562b77e 100644 --- a/polkadot/node/metered-channel/Cargo.toml +++ b/polkadot/node/metered-channel/Cargo.toml @@ -11,6 +11,9 @@ futures-timer = "3.0.2" derive_more = "0.99" gum = { package = "tracing-gum", path = "../gum" } thiserror = "1.0.30" +crossbeam-queue = "0.3.5" +nanorand = { version = "0.7.0", default-features = false, features = ["wyrand"] } +coarsetime = "0.1.21" [dev-dependencies] futures = { version = "0.3.21", features = ["thread-pool"] } diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index ecfc0d60b4..0543ece928 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -25,11 +25,11 @@ use futures::{ use std::{pin::Pin, result}; -use super::Meter; +use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter}; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. pub fn channel(capacity: usize) -> (MeteredSender, MeteredReceiver) { - let (tx, rx) = mpsc::channel(capacity); + let (tx, rx) = mpsc::channel::>(capacity); let shared_meter = Meter::default(); let tx = MeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = MeteredReceiver { meter: shared_meter, inner: rx }; @@ -41,11 +41,11 @@ pub fn channel(capacity: usize) -> (MeteredSender, MeteredReceiver) { pub struct MeteredReceiver { // count currently contained messages meter: Meter, - inner: mpsc::Receiver, + inner: mpsc::Receiver>, } impl std::ops::Deref for MeteredReceiver { - type Target = mpsc::Receiver; + type Target = mpsc::Receiver>; fn deref(&self) -> &Self::Target { &self.inner } @@ -61,11 +61,8 @@ impl Stream for MeteredReceiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match mpsc::Receiver::poll_next(Pin::new(&mut self.inner), cx) { - Poll::Ready(x) => { - self.meter.note_received(); - Poll::Ready(x) - }, - other => other, + Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)), + Poll::Pending => Poll::Pending, } } @@ -76,6 +73,23 @@ impl Stream for MeteredReceiver { } impl MeteredReceiver { + fn maybe_meter_tof(&mut self, maybe_value: Option>) -> Option { + self.meter.note_received(); + maybe_value.map(|value| { + match value { + MaybeTimeOfFlight::::WithTimeOfFlight(value, tof_start) => { + // do not use `.elapsed()` of `std::time`, it may panic + // `coarsetime` does a saturating sub for all `CoarseInstant` substractions + let duration = tof_start.elapsed(); + self.meter.note_time_of_flight(duration); + value + }, + MaybeTimeOfFlight::::Bare(value) => value, + } + .into() + }) + } + /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { &self.meter @@ -84,10 +98,7 @@ impl MeteredReceiver { /// Attempt to receive the next item. pub fn try_next(&mut self) -> Result, mpsc::TryRecvError> { match self.inner.try_next()? { - Some(x) => { - self.meter.note_received(); - Ok(Some(x)) - }, + Some(value) => Ok(self.maybe_meter_tof(Some(value))), None => Ok(None), } } @@ -104,7 +115,7 @@ impl futures::stream::FusedStream for MeteredReceiver { #[derive(Debug)] pub struct MeteredSender { meter: Meter, - inner: mpsc::Sender, + inner: mpsc::Sender>, } impl Clone for MeteredSender { @@ -114,7 +125,7 @@ impl Clone for MeteredSender { } impl std::ops::Deref for MeteredSender { - type Target = mpsc::Sender; + type Target = mpsc::Sender>; fn deref(&self) -> &Self::Target { &self.inner } @@ -127,18 +138,28 @@ impl std::ops::DerefMut for MeteredSender { } impl MeteredSender { + fn prepare_with_tof(&self, item: T) -> MaybeTimeOfFlight { + let previous = self.meter.note_sent(); + let item = if measure_tof_check(previous) { + MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now()) + } else { + MaybeTimeOfFlight::Bare(item) + }; + item + } + /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { &self.meter } /// Send message, wait until capacity is available. - pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError> + pub async fn send(&mut self, msg: T) -> result::Result<(), mpsc::SendError> where Self: Unpin, { - self.meter.note_sent(); - let fut = self.inner.send(item); + let msg = self.prepare_with_tof(msg); + let fut = self.inner.send(msg); futures::pin_mut!(fut); fut.await.map_err(|e| { self.meter.retract_sent(); @@ -147,8 +168,11 @@ impl MeteredSender { } /// Attempt to send message or fail immediately. - pub fn try_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError> { - self.meter.note_sent(); + pub fn try_send( + &mut self, + msg: T, + ) -> result::Result<(), mpsc::TrySendError>> { + let msg = self.prepare_with_tof(msg); self.inner.try_send(msg).map_err(|e| { self.meter.retract_sent(); e diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs index 9646cbaee9..ee276583cd 100644 --- a/polkadot/node/metered-channel/src/lib.rs +++ b/polkadot/node/metered-channel/src/lib.rs @@ -21,7 +21,7 @@ use std::sync::{ Arc, }; -use derive_more::{Add, Display}; +use derive_more::Display; mod bounded; pub mod oneshot; @@ -29,24 +29,44 @@ mod unbounded; pub use self::{bounded::*, unbounded::*}; +pub use coarsetime::Duration as CoarseDuration; +use coarsetime::Instant as CoarseInstant; + +#[cfg(test)] +mod tests; + /// A peek into the inner state of a meter. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct Meter { // Number of sends on this channel. sent: Arc, // Number of receives on this channel. received: Arc, + // Atomic ringbuffer of the last 50 time of flight values + tof: Arc>, +} + +impl std::default::Default for Meter { + fn default() -> Self { + Self { + sent: Arc::new(AtomicUsize::new(0)), + received: Arc::new(AtomicUsize::new(0)), + tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)), + } + } } /// A readout of sizes from the meter. Note that it is possible, due to asynchrony, for received /// to be slightly higher than sent. -#[derive(Debug, Add, Display, Clone, Default, PartialEq)] +#[derive(Debug, Display, Clone, Default, PartialEq)] #[display(fmt = "(sent={} received={})", sent, received)] pub struct Readout { /// The amount of messages sent on the channel, in aggregate. pub sent: usize, /// The amount of messages received on the channel, in aggregate. pub received: usize, + /// Time of flight in micro seconds (us) + pub tof: Vec, } impl Meter { @@ -57,11 +77,18 @@ impl Meter { Readout { sent: self.sent.load(Ordering::Relaxed), received: self.received.load(Ordering::Relaxed), + tof: { + let mut acc = Vec::with_capacity(self.tof.len()); + while let Some(value) = self.tof.pop() { + acc.push(value) + } + acc + }, } } - fn note_sent(&self) { - self.sent.fetch_add(1, Ordering::Relaxed); + fn note_sent(&self) -> usize { + self.sent.fetch_add(1, Ordering::Relaxed) } fn retract_sent(&self) { @@ -71,114 +98,59 @@ impl Meter { fn note_received(&self) { self.received.fetch_add(1, Ordering::Relaxed); } -} -#[cfg(test)] -mod tests { - use super::*; - use futures::{executor::block_on, StreamExt}; - - #[derive(Clone, Copy, Debug, Default)] - struct Msg { - val: u8, - } - - #[test] - fn try_send_try_next() { - block_on(async move { - let (mut tx, mut rx) = channel::(5); - let msg = Msg::default(); - assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 }); - tx.try_send(msg).unwrap(); - assert_eq!(tx.meter().read(), Readout { sent: 1, received: 0 }); - tx.try_send(msg).unwrap(); - tx.try_send(msg).unwrap(); - tx.try_send(msg).unwrap(); - assert_eq!(tx.meter().read(), Readout { sent: 4, received: 0 }); - rx.try_next().unwrap(); - assert_eq!(rx.meter().read(), Readout { sent: 4, received: 1 }); - rx.try_next().unwrap(); - rx.try_next().unwrap(); - assert_eq!(tx.meter().read(), Readout { sent: 4, received: 3 }); - rx.try_next().unwrap(); - assert_eq!(rx.meter().read(), Readout { sent: 4, received: 4 }); - assert!(rx.try_next().is_err()); - }); - } - - #[test] - fn with_tasks() { - let (ready, go) = futures::channel::oneshot::channel(); - - let (mut tx, mut rx) = channel::(5); - block_on(async move { - futures::join!( - async move { - let msg = Msg::default(); - assert_eq!(tx.meter().read(), Readout { sent: 0, received: 0 }); - tx.try_send(msg).unwrap(); - assert_eq!(tx.meter().read(), Readout { sent: 1, received: 0 }); - tx.try_send(msg).unwrap(); - tx.try_send(msg).unwrap(); - tx.try_send(msg).unwrap(); - ready.send(()).expect("Helper oneshot channel must work. qed"); - }, - async move { - go.await.expect("Helper oneshot channel must work. qed"); - assert_eq!(rx.meter().read(), Readout { sent: 4, received: 0 }); - rx.try_next().unwrap(); - assert_eq!(rx.meter().read(), Readout { sent: 4, received: 1 }); - rx.try_next().unwrap(); - rx.try_next().unwrap(); - assert_eq!(rx.meter().read(), Readout { sent: 4, received: 3 }); - rx.try_next().unwrap(); - assert_eq!(dbg!(rx.meter().read()), Readout { sent: 4, received: 4 }); - } - ) - }); - } - - use futures_timer::Delay; - use std::time::Duration; - - #[test] - fn stream_and_sink() { - let (mut tx, mut rx) = channel::(5); - - block_on(async move { - futures::join!( - async move { - for i in 0..15 { - println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().read()); - let msg = Msg { val: i as u8 + 1u8 }; - tx.send(msg).await.unwrap(); - assert!(tx.meter().read().sent > 0usize); - Delay::new(Duration::from_millis(20)).await; - } - () - }, - async move { - while let Some(msg) = rx.next().await { - println!("rx'd one {} with {} backlogged", msg.val, rx.meter().read()); - Delay::new(Duration::from_millis(29)).await; - } - } - ) - }); - } - - #[test] - fn failed_send_does_not_inc_sent() { - let (mut bounded, _) = channel::(5); - let (unbounded, _) = unbounded::(); - - block_on(async move { - assert!(bounded.send(Msg::default()).await.is_err()); - assert!(bounded.try_send(Msg::default()).is_err()); - assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 }); - - assert!(unbounded.unbounded_send(Msg::default()).is_err()); - assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 }); - }); + fn note_time_of_flight(&self, tof: CoarseDuration) { + let _ = self.tof.force_push(tof); + } +} + +/// Determine if this instance shall be measured +#[inline(always)] +fn measure_tof_check(nth: usize) -> bool { + if cfg!(test) { + // for tests, be deterministic and pick every second + nth & 0x01 == 0 + } else { + use nanorand::Rng; + let mut rng = nanorand::WyRand::new_seed(nth as u64); + let pick = rng.generate_range(1_usize..=1000); + // measure 5.3% + pick <= 53 + } +} + +/// Measure the time of flight between insertion and removal +/// of a single type `T` + +#[derive(Debug)] +pub enum MaybeTimeOfFlight { + Bare(T), + WithTimeOfFlight(T, CoarseInstant), +} + +impl From for MaybeTimeOfFlight { + fn from(value: T) -> Self { + Self::Bare(value) + } +} + +// Has some unexplicable conflict with a wildcard impl of std +impl MaybeTimeOfFlight { + /// Extract the inner `T` value. + pub fn into(self) -> T { + match self { + Self::Bare(value) => value, + Self::WithTimeOfFlight(value, _tof_start) => value, + } + } +} + +impl std::ops::Deref for MaybeTimeOfFlight { + type Target = T; + fn deref(&self) -> &Self::Target { + match self { + Self::Bare(ref value) => value, + Self::WithTimeOfFlight(ref value, _tof_start) => value, + } } } diff --git a/polkadot/node/metered-channel/src/oneshot.rs b/polkadot/node/metered-channel/src/oneshot.rs index 3ccd8eead2..520531beab 100644 --- a/polkadot/node/metered-channel/src/oneshot.rs +++ b/polkadot/node/metered-channel/src/oneshot.rs @@ -20,7 +20,6 @@ use std::{ ops::Deref, pin::Pin, task::{Context, Poll}, - time::{Duration, Instant}, }; use futures::{ @@ -30,6 +29,8 @@ use futures::{ }; use futures_timer::Delay; +use crate::{CoarseDuration, CoarseInstant}; + /// Provides the reason for termination. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] @@ -43,9 +44,9 @@ pub enum Reason { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Measurements { /// Duration between first poll and polling termination. - first_poll_till_end: Duration, + first_poll_till_end: CoarseDuration, /// Duration starting with creation until polling termination. - creation_till_end: Duration, + creation_till_end: CoarseDuration, /// Reason for resolving the future. reason: Reason, } @@ -53,13 +54,13 @@ pub struct Measurements { impl Measurements { /// Obtain the duration of a finished or canceled /// `oneshot` channel. - pub fn duration_since_first_poll(&self) -> &Duration { + pub fn duration_since_first_poll(&self) -> &CoarseDuration { &self.first_poll_till_end } /// Obtain the duration of a finished or canceled /// `oneshot` channel. - pub fn duration_since_creation(&self) -> &Duration { + pub fn duration_since_creation(&self) -> &CoarseDuration { &self.creation_till_end } @@ -72,8 +73,8 @@ impl Measurements { /// Create a new pair of `OneshotMetered{Sender,Receiver}`. pub fn channel( name: &'static str, - soft_timeout: Duration, - hard_timeout: Duration, + soft_timeout: CoarseDuration, + hard_timeout: CoarseDuration, ) -> (MeteredSender, MeteredReceiver) { let (tx, rx) = oneshot::channel(); @@ -87,7 +88,7 @@ pub fn channel( soft_timeout_fut: None, hard_timeout_fut: None, first_poll_timestamp: None, - creation_timestamp: Instant::now(), + creation_timestamp: CoarseInstant::now(), }, ) } @@ -97,8 +98,8 @@ pub fn channel( pub enum Error { #[error("Oneshot was canceled.")] Canceled(#[source] Canceled, Measurements), - #[error("Oneshot did not receive a response within {}", Duration::as_secs_f64(.0))] - HardTimeout(Duration, Measurements), + #[error("Oneshot did not receive a response within {}", CoarseDuration::as_f64(.0))] + HardTimeout(CoarseDuration, Measurements), } impl Measurable for Error { @@ -113,14 +114,14 @@ impl Measurable for Error { /// Oneshot sender, created by [`channel`]. #[derive(Debug)] pub struct MeteredSender { - inner: oneshot::Sender<(Instant, T)>, + inner: oneshot::Sender<(CoarseInstant, T)>, } impl MeteredSender { /// Send a value. pub fn send(self, t: T) -> Result<(), T> { let Self { inner } = self; - inner.send((Instant::now(), t)).map_err(|(_, t)| t) + inner.send((CoarseInstant::now(), t)).map_err(|(_, t)| t) } /// Poll if the thing is already canceled. @@ -129,7 +130,7 @@ impl MeteredSender { } /// Access the cancellation object. - pub fn cancellation(&mut self) -> Cancellation<'_, (Instant, T)> { + pub fn cancellation(&mut self) -> Cancellation<'_, (CoarseInstant, T)> { self.inner.cancellation() } @@ -148,16 +149,16 @@ impl MeteredSender { #[derive(Debug)] pub struct MeteredReceiver { name: &'static str, - inner: oneshot::Receiver<(Instant, T)>, + inner: oneshot::Receiver<(CoarseInstant, T)>, /// Soft timeout, on expire a warning is printed. soft_timeout_fut: Option>, - soft_timeout: Duration, + soft_timeout: CoarseDuration, /// Hard timeout, terminating the sender. hard_timeout_fut: Option, - hard_timeout: Duration, + hard_timeout: CoarseDuration, /// The first time the receiver was polled. - first_poll_timestamp: Option, - creation_timestamp: Instant, + first_poll_timestamp: Option, + creation_timestamp: CoarseInstant, } impl MeteredReceiver { @@ -179,7 +180,7 @@ impl MeteredReceiver { }, Err(e) => { let measurements = self.create_measurement( - self.first_poll_timestamp.unwrap_or_else(|| Instant::now()), + self.first_poll_timestamp.unwrap_or_else(|| CoarseInstant::now()), Reason::Cancellation, ); Err(Error::Canceled(e, measurements)) @@ -191,8 +192,8 @@ impl MeteredReceiver { /// Helper to create a measurement. /// /// `start` determines the first possible time where poll can resolve with `Ready`. - fn create_measurement(&self, start: Instant, reason: Reason) -> Measurements { - let end = Instant::now(); + fn create_measurement(&self, start: CoarseInstant, reason: Reason) -> Measurements { + let end = CoarseInstant::now(); Measurements { // negative values are ok, if `send` was called before we poll for the first time. first_poll_till_end: end - start, @@ -216,9 +217,9 @@ impl Future for MeteredReceiver { ctx: &mut Context<'_>, ) -> Poll, Error>> { let first_poll_timestamp = - self.first_poll_timestamp.get_or_insert_with(|| Instant::now()).clone(); + self.first_poll_timestamp.get_or_insert_with(|| CoarseInstant::now()).clone(); - let soft_timeout = self.soft_timeout.clone(); + let soft_timeout = self.soft_timeout.clone().into(); let soft_timeout = self .soft_timeout_fut .get_or_insert_with(move || Delay::new(soft_timeout).fuse()); @@ -227,7 +228,7 @@ impl Future for MeteredReceiver { gum::warn!("Oneshot `{name}` exceeded the soft threshold", name = &self.name); } - let hard_timeout = self.hard_timeout.clone(); + let hard_timeout = self.hard_timeout.clone().into(); let hard_timeout = self.hard_timeout_fut.get_or_insert_with(move || Delay::new(hard_timeout)); @@ -311,6 +312,7 @@ impl Deref for OutputWithMeasurements { mod tests { use assert_matches::assert_matches; use futures::{executor::ThreadPool, task::SpawnExt}; + use std::time::Duration; use super::*; @@ -335,7 +337,7 @@ mod tests { let _ = env_logger::builder().is_test(true).filter_level(LevelFilter::Trace).try_init(); let pool = ThreadPool::new().unwrap(); - let (tx, rx) = channel(name, Duration::from_secs(1), Duration::from_secs(3)); + let (tx, rx) = channel(name, CoarseDuration::from_secs(1), CoarseDuration::from_secs(3)); futures::executor::block_on(async move { let handle_receiver = pool.spawn_with_handle(gen_receiver_test(rx)).unwrap(); let handle_sender = pool.spawn_with_handle(gen_sender_test(tx)).unwrap(); diff --git a/polkadot/node/metered-channel/src/tests.rs b/polkadot/node/metered-channel/src/tests.rs new file mode 100644 index 0000000000..4eecea453a --- /dev/null +++ b/polkadot/node/metered-channel/src/tests.rs @@ -0,0 +1,129 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; +use assert_matches::assert_matches; +use futures::{executor::block_on, StreamExt}; + +#[derive(Clone, Copy, Debug, Default)] +struct Msg { + val: u8, +} + +#[test] +fn try_send_try_next() { + block_on(async move { + let (mut tx, mut rx) = channel::(5); + let msg = Msg::default(); + assert_matches!(rx.meter().read(), Readout { sent: 0, received: 0, .. }); + tx.try_send(msg).unwrap(); + assert_matches!(tx.meter().read(), Readout { sent: 1, received: 0, .. }); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + assert_matches!(tx.meter().read(), Readout { sent: 4, received: 0, .. }); + rx.try_next().unwrap(); + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. }); + rx.try_next().unwrap(); + rx.try_next().unwrap(); + assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, tof } => { + // every second in test, consumed before + assert_eq!(dbg!(tof).len(), 1); + }); + rx.try_next().unwrap(); + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, tof } => { + // every second in test, consumed before + assert_eq!(dbg!(tof).len(), 0); + }); + assert!(rx.try_next().is_err()); + }); +} + +#[test] +fn with_tasks() { + let (ready, go) = futures::channel::oneshot::channel(); + + let (mut tx, mut rx) = channel::(5); + block_on(async move { + futures::join!( + async move { + let msg = Msg::default(); + assert_matches!(tx.meter().read(), Readout { sent: 0, received: 0, .. }); + tx.try_send(msg).unwrap(); + assert_matches!(tx.meter().read(), Readout { sent: 1, received: 0, .. }); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + ready.send(()).expect("Helper oneshot channel must work. qed"); + }, + async move { + go.await.expect("Helper oneshot channel must work. qed"); + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 0, .. }); + rx.try_next().unwrap(); + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. }); + rx.try_next().unwrap(); + rx.try_next().unwrap(); + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 3, .. }); + rx.try_next().unwrap(); + assert_matches!(dbg!(rx.meter().read()), Readout { sent: 4, received: 4, .. }); + } + ) + }); +} + +use futures_timer::Delay; +use std::time::Duration; + +#[test] +fn stream_and_sink() { + let (mut tx, mut rx) = channel::(5); + + block_on(async move { + futures::join!( + async move { + for i in 0..15 { + println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().read()); + let msg = Msg { val: i as u8 + 1u8 }; + tx.send(msg).await.unwrap(); + assert!(tx.meter().read().sent > 0usize); + Delay::new(Duration::from_millis(20)).await; + } + () + }, + async move { + while let Some(msg) = rx.next().await { + println!("rx'd one {} with {} backlogged", msg.val, rx.meter().read()); + Delay::new(Duration::from_millis(29)).await; + } + } + ) + }); +} + +#[test] +fn failed_send_does_not_inc_sent() { + let (mut bounded, _) = channel::(5); + let (unbounded, _) = unbounded::(); + + block_on(async move { + assert!(bounded.send(Msg::default()).await.is_err()); + assert!(bounded.try_send(Msg::default()).is_err()); + assert_matches!(bounded.meter().read(), Readout { sent: 0, received: 0, .. }); + + assert!(unbounded.unbounded_send(Msg::default()).is_err()); + assert_matches!(unbounded.meter().read(), Readout { sent: 0, received: 0, .. }); + }); +} diff --git a/polkadot/node/metered-channel/src/unbounded.rs b/polkadot/node/metered-channel/src/unbounded.rs index 3b846611fb..8ade941d68 100644 --- a/polkadot/node/metered-channel/src/unbounded.rs +++ b/polkadot/node/metered-channel/src/unbounded.rs @@ -24,11 +24,11 @@ use futures::{ use std::{pin::Pin, result}; -use super::Meter; +use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter}; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. pub fn unbounded() -> (UnboundedMeteredSender, UnboundedMeteredReceiver) { - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = mpsc::unbounded::>(); let shared_meter = Meter::default(); let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx }; @@ -40,11 +40,11 @@ pub fn unbounded() -> (UnboundedMeteredSender, UnboundedMeteredReceiver pub struct UnboundedMeteredReceiver { // count currently contained messages meter: Meter, - inner: mpsc::UnboundedReceiver, + inner: mpsc::UnboundedReceiver>, } impl std::ops::Deref for UnboundedMeteredReceiver { - type Target = mpsc::UnboundedReceiver; + type Target = mpsc::UnboundedReceiver>; fn deref(&self) -> &Self::Target { &self.inner } @@ -60,11 +60,8 @@ impl Stream for UnboundedMeteredReceiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match mpsc::UnboundedReceiver::poll_next(Pin::new(&mut self.inner), cx) { - Poll::Ready(x) => { - self.meter.note_received(); - Poll::Ready(x) - }, - other => other, + Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)), + Poll::Pending => Poll::Pending, } } @@ -75,6 +72,23 @@ impl Stream for UnboundedMeteredReceiver { } impl UnboundedMeteredReceiver { + fn maybe_meter_tof(&mut self, maybe_value: Option>) -> Option { + self.meter.note_received(); + maybe_value.map(|value| { + match value { + MaybeTimeOfFlight::::WithTimeOfFlight(value, tof_start) => { + // do not use `.elapsed()` of `std::time`, it may panic + // `coarsetime` does a saturating substractio for all `CoarseInstant`s + let duration = tof_start.elapsed(); + self.meter.note_time_of_flight(duration); + value + }, + MaybeTimeOfFlight::::Bare(value) => value, + } + .into() + }) + } + /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { &self.meter @@ -83,10 +97,7 @@ impl UnboundedMeteredReceiver { /// Attempt to receive the next item. pub fn try_next(&mut self) -> Result, mpsc::TryRecvError> { match self.inner.try_next()? { - Some(x) => { - self.meter.note_received(); - Ok(Some(x)) - }, + Some(value) => Ok(self.maybe_meter_tof(Some(value))), None => Ok(None), } } @@ -103,7 +114,7 @@ impl futures::stream::FusedStream for UnboundedMeteredReceiver { #[derive(Debug)] pub struct UnboundedMeteredSender { meter: Meter, - inner: mpsc::UnboundedSender, + inner: mpsc::UnboundedSender>, } impl Clone for UnboundedMeteredSender { @@ -113,7 +124,7 @@ impl Clone for UnboundedMeteredSender { } impl std::ops::Deref for UnboundedMeteredSender { - type Target = mpsc::UnboundedSender; + type Target = mpsc::UnboundedSender>; fn deref(&self) -> &Self::Target { &self.inner } @@ -126,14 +137,27 @@ impl std::ops::DerefMut for UnboundedMeteredSender { } impl UnboundedMeteredSender { + fn prepare_with_tof(&self, item: T) -> MaybeTimeOfFlight { + let previous = self.meter.note_sent(); + let item = if measure_tof_check(previous) { + MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now()) + } else { + MaybeTimeOfFlight::Bare(item) + }; + item + } + /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { &self.meter } /// Attempt to send message or fail immediately. - pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError> { - self.meter.note_sent(); + pub fn unbounded_send( + &self, + msg: T, + ) -> result::Result<(), mpsc::TrySendError>> { + let msg = self.prepare_with_tof(msg); self.inner.unbounded_send(msg).map_err(|e| { self.meter.retract_sent(); e diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index dec173747f..73a5e32131 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -555,7 +555,7 @@ where // We combine the amount of messages from subsystems to the overseer // as well as the amount of messages from external sources to the overseer // into one `to_overseer` value. - metronome_metrics.channel_fill_level_snapshot( + metronome_metrics.channel_metrics_snapshot( subsystem_meters .iter() .cloned() diff --git a/polkadot/node/overseer/src/metrics.rs b/polkadot/node/overseer/src/metrics.rs index e10020a258..58a826f52f 100644 --- a/polkadot/node/overseer/src/metrics.rs +++ b/polkadot/node/overseer/src/metrics.rs @@ -27,10 +27,15 @@ struct MetricsInner { activated_heads_total: prometheus::Counter, deactivated_heads_total: prometheus::Counter, messages_relayed_total: prometheus::Counter, + + to_subsystem_bounded_tof: prometheus::HistogramVec, to_subsystem_bounded_sent: prometheus::GaugeVec, to_subsystem_bounded_received: prometheus::GaugeVec, + + to_subsystem_unbounded_tof: prometheus::HistogramVec, to_subsystem_unbounded_sent: prometheus::GaugeVec, to_subsystem_unbounded_received: prometheus::GaugeVec, + signals_sent: prometheus::GaugeVec, signals_received: prometheus::GaugeVec, @@ -68,7 +73,7 @@ impl Metrics { } } - pub(crate) fn channel_fill_level_snapshot( + pub(crate) fn channel_metrics_snapshot( &self, collection: impl IntoIterator, ) { @@ -105,6 +110,17 @@ impl Metrics { .signals_received .with_label_values(&[name]) .set(readouts.signals.received as u64); + + let hist_bounded = metrics.to_subsystem_bounded_tof.with_label_values(&[name]); + for tof in readouts.bounded.tof { + hist_bounded.observe(tof.as_f64()); + } + + let hist_unbounded = + metrics.to_subsystem_unbounded_tof.with_label_values(&[name]); + for tof in readouts.unbounded.tof { + hist_unbounded.observe(tof.as_f64()); + } }); } } @@ -134,6 +150,16 @@ impl MetricsTrait for Metrics { )?, registry, )?, + to_subsystem_bounded_tof: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_subsystem_bounded_tof", + "Duration spent in a particular channel from entrance to removal", + ), + &["subsystem_name"], + )?, + registry, + )?, to_subsystem_bounded_sent: prometheus::register( prometheus::GaugeVec::::new( prometheus::Opts::new( @@ -154,6 +180,16 @@ impl MetricsTrait for Metrics { )?, registry, )?, + to_subsystem_unbounded_tof: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_subsystem_unbounded_tof", + "Duration spent in a particular channel from entrance to removal", + ), + &["subsystem_name"], + )?, + registry, + )?, to_subsystem_unbounded_sent: prometheus::register( prometheus::GaugeVec::::new( prometheus::Opts::new(