track tof for inter subsystem messages (#5135)

This commit is contained in:
Bernhard Schuster
2022-03-17 11:19:11 +01:00
committed by GitHub
parent 5c21822e17
commit 16fe04dc79
9 changed files with 404 additions and 177 deletions
+37
View File
@@ -1125,6 +1125,18 @@ dependencies = [
"syn",
]
[[package]]
name = "coarsetime"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "441947d9f3582f20b35fdd2bc5ada3a8c74c9ea380d66268607cb399b510ee08"
dependencies = [
"libc",
"once_cell",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
[[package]]
name = "color-eyre"
version = "0.6.1"
@@ -1340,6 +1352,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f25d8400f4a7a5778f0e4e52384a48cbd9b5c495d110786187fc750075277a2"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
@@ -4459,11 +4481,14 @@ name = "metered-channel"
version = "0.9.18"
dependencies = [
"assert_matches",
"coarsetime",
"crossbeam-queue",
"derive_more",
"env_logger 0.9.0",
"futures 0.3.21",
"futures-timer",
"log",
"nanorand",
"thiserror",
"tracing",
"tracing-gum",
@@ -4704,6 +4729,12 @@ dependencies = [
"rand 0.8.5",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
[[package]]
name = "native-tls"
version = "0.2.8"
@@ -11978,6 +12009,12 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.78"
+3
View File
@@ -11,6 +11,9 @@ futures-timer = "3.0.2"
derive_more = "0.99"
gum = { package = "tracing-gum", path = "../gum" }
thiserror = "1.0.30"
crossbeam-queue = "0.3.5"
nanorand = { version = "0.7.0", default-features = false, features = ["wyrand"] }
coarsetime = "0.1.21"
[dev-dependencies]
futures = { version = "0.3.21", features = ["thread-pool"] }
+44 -20
View File
@@ -25,11 +25,11 @@ use futures::{
use std::{pin::Pin, result};
use super::Meter;
use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter};
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) {
let (tx, rx) = mpsc::channel(capacity);
let (tx, rx) = mpsc::channel::<MaybeTimeOfFlight<T>>(capacity);
let shared_meter = Meter::default();
let tx = MeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = MeteredReceiver { meter: shared_meter, inner: rx };
@@ -41,11 +41,11 @@ pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) {
pub struct MeteredReceiver<T> {
// count currently contained messages
meter: Meter,
inner: mpsc::Receiver<T>,
inner: mpsc::Receiver<MaybeTimeOfFlight<T>>,
}
impl<T> std::ops::Deref for MeteredReceiver<T> {
type Target = mpsc::Receiver<T>;
type Target = mpsc::Receiver<MaybeTimeOfFlight<T>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
@@ -61,11 +61,8 @@ impl<T> Stream for MeteredReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match mpsc::Receiver::poll_next(Pin::new(&mut self.inner), cx) {
Poll::Ready(x) => {
self.meter.note_received();
Poll::Ready(x)
},
other => other,
Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)),
Poll::Pending => Poll::Pending,
}
}
@@ -76,6 +73,23 @@ impl<T> Stream for MeteredReceiver<T> {
}
impl<T> MeteredReceiver<T> {
fn maybe_meter_tof(&mut self, maybe_value: Option<MaybeTimeOfFlight<T>>) -> Option<T> {
self.meter.note_received();
maybe_value.map(|value| {
match value {
MaybeTimeOfFlight::<T>::WithTimeOfFlight(value, tof_start) => {
// do not use `.elapsed()` of `std::time`, it may panic
// `coarsetime` does a saturating sub for all `CoarseInstant` substractions
let duration = tof_start.elapsed();
self.meter.note_time_of_flight(duration);
value
},
MaybeTimeOfFlight::<T>::Bare(value) => value,
}
.into()
})
}
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
@@ -84,10 +98,7 @@ impl<T> MeteredReceiver<T> {
/// Attempt to receive the next item.
pub fn try_next(&mut self) -> Result<Option<T>, mpsc::TryRecvError> {
match self.inner.try_next()? {
Some(x) => {
self.meter.note_received();
Ok(Some(x))
},
Some(value) => Ok(self.maybe_meter_tof(Some(value))),
None => Ok(None),
}
}
@@ -104,7 +115,7 @@ impl<T> futures::stream::FusedStream for MeteredReceiver<T> {
#[derive(Debug)]
pub struct MeteredSender<T> {
meter: Meter,
inner: mpsc::Sender<T>,
inner: mpsc::Sender<MaybeTimeOfFlight<T>>,
}
impl<T> Clone for MeteredSender<T> {
@@ -114,7 +125,7 @@ impl<T> Clone for MeteredSender<T> {
}
impl<T> std::ops::Deref for MeteredSender<T> {
type Target = mpsc::Sender<T>;
type Target = mpsc::Sender<MaybeTimeOfFlight<T>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
@@ -127,18 +138,28 @@ impl<T> std::ops::DerefMut for MeteredSender<T> {
}
impl<T> MeteredSender<T> {
fn prepare_with_tof(&self, item: T) -> MaybeTimeOfFlight<T> {
let previous = self.meter.note_sent();
let item = if measure_tof_check(previous) {
MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
} else {
MaybeTimeOfFlight::Bare(item)
};
item
}
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
}
/// Send message, wait until capacity is available.
pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError>
pub async fn send(&mut self, msg: T) -> result::Result<(), mpsc::SendError>
where
Self: Unpin,
{
self.meter.note_sent();
let fut = self.inner.send(item);
let msg = self.prepare_with_tof(msg);
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
@@ -147,8 +168,11 @@ impl<T> MeteredSender<T> {
}
/// Attempt to send message or fail immediately.
pub fn try_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.meter.note_sent();
pub fn try_send(
&mut self,
msg: T,
) -> result::Result<(), mpsc::TrySendError<MaybeTimeOfFlight<T>>> {
let msg = self.prepare_with_tof(msg);
self.inner.try_send(msg).map_err(|e| {
self.meter.retract_sent();
e
+85 -113
View File
@@ -21,7 +21,7 @@ use std::sync::{
Arc,
};
use derive_more::{Add, Display};
use derive_more::Display;
mod bounded;
pub mod oneshot;
@@ -29,24 +29,44 @@ mod unbounded;
pub use self::{bounded::*, unbounded::*};
pub use coarsetime::Duration as CoarseDuration;
use coarsetime::Instant as CoarseInstant;
#[cfg(test)]
mod tests;
/// A peek into the inner state of a meter.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct Meter {
// Number of sends on this channel.
sent: Arc<AtomicUsize>,
// Number of receives on this channel.
received: Arc<AtomicUsize>,
// Atomic ringbuffer of the last 50 time of flight values
tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
}
impl std::default::Default for Meter {
fn default() -> Self {
Self {
sent: Arc::new(AtomicUsize::new(0)),
received: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)),
}
}
}
/// A readout of sizes from the meter. Note that it is possible, due to asynchrony, for received
/// to be slightly higher than sent.
#[derive(Debug, Add, Display, Clone, Default, PartialEq)]
#[derive(Debug, Display, Clone, Default, PartialEq)]
#[display(fmt = "(sent={} received={})", sent, received)]
pub struct Readout {
/// The amount of messages sent on the channel, in aggregate.
pub sent: usize,
/// The amount of messages received on the channel, in aggregate.
pub received: usize,
/// Time of flight in micro seconds (us)
pub tof: Vec<CoarseDuration>,
}
impl Meter {
@@ -57,11 +77,18 @@ impl Meter {
Readout {
sent: self.sent.load(Ordering::Relaxed),
received: self.received.load(Ordering::Relaxed),
tof: {
let mut acc = Vec::with_capacity(self.tof.len());
while let Some(value) = self.tof.pop() {
acc.push(value)
}
acc
},
}
}
fn note_sent(&self) {
self.sent.fetch_add(1, Ordering::Relaxed);
fn note_sent(&self) -> usize {
self.sent.fetch_add(1, Ordering::Relaxed)
}
fn retract_sent(&self) {
@@ -71,114 +98,59 @@ impl Meter {
fn note_received(&self) {
self.received.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::block_on, StreamExt};
#[derive(Clone, Copy, Debug, Default)]
struct Msg {
val: u8,
}
#[test]
fn try_send_try_next() {
block_on(async move {
let (mut tx, mut rx) = channel::<Msg>(5);
let msg = Msg::default();
assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 });
tx.try_send(msg).unwrap();
assert_eq!(tx.meter().read(), Readout { sent: 1, received: 0 });
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
assert_eq!(tx.meter().read(), Readout { sent: 4, received: 0 });
rx.try_next().unwrap();
assert_eq!(rx.meter().read(), Readout { sent: 4, received: 1 });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_eq!(tx.meter().read(), Readout { sent: 4, received: 3 });
rx.try_next().unwrap();
assert_eq!(rx.meter().read(), Readout { sent: 4, received: 4 });
assert!(rx.try_next().is_err());
});
}
#[test]
fn with_tasks() {
let (ready, go) = futures::channel::oneshot::channel();
let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move {
futures::join!(
async move {
let msg = Msg::default();
assert_eq!(tx.meter().read(), Readout { sent: 0, received: 0 });
tx.try_send(msg).unwrap();
assert_eq!(tx.meter().read(), Readout { sent: 1, received: 0 });
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
ready.send(()).expect("Helper oneshot channel must work. qed");
},
async move {
go.await.expect("Helper oneshot channel must work. qed");
assert_eq!(rx.meter().read(), Readout { sent: 4, received: 0 });
rx.try_next().unwrap();
assert_eq!(rx.meter().read(), Readout { sent: 4, received: 1 });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_eq!(rx.meter().read(), Readout { sent: 4, received: 3 });
rx.try_next().unwrap();
assert_eq!(dbg!(rx.meter().read()), Readout { sent: 4, received: 4 });
}
)
});
}
use futures_timer::Delay;
use std::time::Duration;
#[test]
fn stream_and_sink() {
let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move {
futures::join!(
async move {
for i in 0..15 {
println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().read());
let msg = Msg { val: i as u8 + 1u8 };
tx.send(msg).await.unwrap();
assert!(tx.meter().read().sent > 0usize);
Delay::new(Duration::from_millis(20)).await;
}
()
},
async move {
while let Some(msg) = rx.next().await {
println!("rx'd one {} with {} backlogged", msg.val, rx.meter().read());
Delay::new(Duration::from_millis(29)).await;
}
}
)
});
}
#[test]
fn failed_send_does_not_inc_sent() {
let (mut bounded, _) = channel::<Msg>(5);
let (unbounded, _) = unbounded::<Msg>();
block_on(async move {
assert!(bounded.send(Msg::default()).await.is_err());
assert!(bounded.try_send(Msg::default()).is_err());
assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 });
assert!(unbounded.unbounded_send(Msg::default()).is_err());
assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 });
});
fn note_time_of_flight(&self, tof: CoarseDuration) {
let _ = self.tof.force_push(tof);
}
}
/// Determine if this instance shall be measured
#[inline(always)]
fn measure_tof_check(nth: usize) -> bool {
if cfg!(test) {
// for tests, be deterministic and pick every second
nth & 0x01 == 0
} else {
use nanorand::Rng;
let mut rng = nanorand::WyRand::new_seed(nth as u64);
let pick = rng.generate_range(1_usize..=1000);
// measure 5.3%
pick <= 53
}
}
/// Measure the time of flight between insertion and removal
/// of a single type `T`
#[derive(Debug)]
pub enum MaybeTimeOfFlight<T> {
Bare(T),
WithTimeOfFlight(T, CoarseInstant),
}
impl<T> From<T> for MaybeTimeOfFlight<T> {
fn from(value: T) -> Self {
Self::Bare(value)
}
}
// Has some unexplicable conflict with a wildcard impl of std
impl<T> MaybeTimeOfFlight<T> {
/// Extract the inner `T` value.
pub fn into(self) -> T {
match self {
Self::Bare(value) => value,
Self::WithTimeOfFlight(value, _tof_start) => value,
}
}
}
impl<T> std::ops::Deref for MaybeTimeOfFlight<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
Self::Bare(ref value) => value,
Self::WithTimeOfFlight(ref value, _tof_start) => value,
}
}
}
+27 -25
View File
@@ -20,7 +20,6 @@ use std::{
ops::Deref,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{
@@ -30,6 +29,8 @@ use futures::{
};
use futures_timer::Delay;
use crate::{CoarseDuration, CoarseInstant};
/// Provides the reason for termination.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
@@ -43,9 +44,9 @@ pub enum Reason {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Measurements {
/// Duration between first poll and polling termination.
first_poll_till_end: Duration,
first_poll_till_end: CoarseDuration,
/// Duration starting with creation until polling termination.
creation_till_end: Duration,
creation_till_end: CoarseDuration,
/// Reason for resolving the future.
reason: Reason,
}
@@ -53,13 +54,13 @@ pub struct Measurements {
impl Measurements {
/// Obtain the duration of a finished or canceled
/// `oneshot` channel.
pub fn duration_since_first_poll(&self) -> &Duration {
pub fn duration_since_first_poll(&self) -> &CoarseDuration {
&self.first_poll_till_end
}
/// Obtain the duration of a finished or canceled
/// `oneshot` channel.
pub fn duration_since_creation(&self) -> &Duration {
pub fn duration_since_creation(&self) -> &CoarseDuration {
&self.creation_till_end
}
@@ -72,8 +73,8 @@ impl Measurements {
/// Create a new pair of `OneshotMetered{Sender,Receiver}`.
pub fn channel<T>(
name: &'static str,
soft_timeout: Duration,
hard_timeout: Duration,
soft_timeout: CoarseDuration,
hard_timeout: CoarseDuration,
) -> (MeteredSender<T>, MeteredReceiver<T>) {
let (tx, rx) = oneshot::channel();
@@ -87,7 +88,7 @@ pub fn channel<T>(
soft_timeout_fut: None,
hard_timeout_fut: None,
first_poll_timestamp: None,
creation_timestamp: Instant::now(),
creation_timestamp: CoarseInstant::now(),
},
)
}
@@ -97,8 +98,8 @@ pub fn channel<T>(
pub enum Error {
#[error("Oneshot was canceled.")]
Canceled(#[source] Canceled, Measurements),
#[error("Oneshot did not receive a response within {}", Duration::as_secs_f64(.0))]
HardTimeout(Duration, Measurements),
#[error("Oneshot did not receive a response within {}", CoarseDuration::as_f64(.0))]
HardTimeout(CoarseDuration, Measurements),
}
impl Measurable for Error {
@@ -113,14 +114,14 @@ impl Measurable for Error {
/// Oneshot sender, created by [`channel`].
#[derive(Debug)]
pub struct MeteredSender<T> {
inner: oneshot::Sender<(Instant, T)>,
inner: oneshot::Sender<(CoarseInstant, T)>,
}
impl<T> MeteredSender<T> {
/// Send a value.
pub fn send(self, t: T) -> Result<(), T> {
let Self { inner } = self;
inner.send((Instant::now(), t)).map_err(|(_, t)| t)
inner.send((CoarseInstant::now(), t)).map_err(|(_, t)| t)
}
/// Poll if the thing is already canceled.
@@ -129,7 +130,7 @@ impl<T> MeteredSender<T> {
}
/// Access the cancellation object.
pub fn cancellation(&mut self) -> Cancellation<'_, (Instant, T)> {
pub fn cancellation(&mut self) -> Cancellation<'_, (CoarseInstant, T)> {
self.inner.cancellation()
}
@@ -148,16 +149,16 @@ impl<T> MeteredSender<T> {
#[derive(Debug)]
pub struct MeteredReceiver<T> {
name: &'static str,
inner: oneshot::Receiver<(Instant, T)>,
inner: oneshot::Receiver<(CoarseInstant, T)>,
/// Soft timeout, on expire a warning is printed.
soft_timeout_fut: Option<Fuse<Delay>>,
soft_timeout: Duration,
soft_timeout: CoarseDuration,
/// Hard timeout, terminating the sender.
hard_timeout_fut: Option<Delay>,
hard_timeout: Duration,
hard_timeout: CoarseDuration,
/// The first time the receiver was polled.
first_poll_timestamp: Option<Instant>,
creation_timestamp: Instant,
first_poll_timestamp: Option<CoarseInstant>,
creation_timestamp: CoarseInstant,
}
impl<T> MeteredReceiver<T> {
@@ -179,7 +180,7 @@ impl<T> MeteredReceiver<T> {
},
Err(e) => {
let measurements = self.create_measurement(
self.first_poll_timestamp.unwrap_or_else(|| Instant::now()),
self.first_poll_timestamp.unwrap_or_else(|| CoarseInstant::now()),
Reason::Cancellation,
);
Err(Error::Canceled(e, measurements))
@@ -191,8 +192,8 @@ impl<T> MeteredReceiver<T> {
/// Helper to create a measurement.
///
/// `start` determines the first possible time where poll can resolve with `Ready`.
fn create_measurement(&self, start: Instant, reason: Reason) -> Measurements {
let end = Instant::now();
fn create_measurement(&self, start: CoarseInstant, reason: Reason) -> Measurements {
let end = CoarseInstant::now();
Measurements {
// negative values are ok, if `send` was called before we poll for the first time.
first_poll_till_end: end - start,
@@ -216,9 +217,9 @@ impl<T> Future for MeteredReceiver<T> {
ctx: &mut Context<'_>,
) -> Poll<Result<OutputWithMeasurements<T>, Error>> {
let first_poll_timestamp =
self.first_poll_timestamp.get_or_insert_with(|| Instant::now()).clone();
self.first_poll_timestamp.get_or_insert_with(|| CoarseInstant::now()).clone();
let soft_timeout = self.soft_timeout.clone();
let soft_timeout = self.soft_timeout.clone().into();
let soft_timeout = self
.soft_timeout_fut
.get_or_insert_with(move || Delay::new(soft_timeout).fuse());
@@ -227,7 +228,7 @@ impl<T> Future for MeteredReceiver<T> {
gum::warn!("Oneshot `{name}` exceeded the soft threshold", name = &self.name);
}
let hard_timeout = self.hard_timeout.clone();
let hard_timeout = self.hard_timeout.clone().into();
let hard_timeout =
self.hard_timeout_fut.get_or_insert_with(move || Delay::new(hard_timeout));
@@ -311,6 +312,7 @@ impl<T> Deref for OutputWithMeasurements<T> {
mod tests {
use assert_matches::assert_matches;
use futures::{executor::ThreadPool, task::SpawnExt};
use std::time::Duration;
use super::*;
@@ -335,7 +337,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).filter_level(LevelFilter::Trace).try_init();
let pool = ThreadPool::new().unwrap();
let (tx, rx) = channel(name, Duration::from_secs(1), Duration::from_secs(3));
let (tx, rx) = channel(name, CoarseDuration::from_secs(1), CoarseDuration::from_secs(3));
futures::executor::block_on(async move {
let handle_receiver = pool.spawn_with_handle(gen_receiver_test(rx)).unwrap();
let handle_sender = pool.spawn_with_handle(gen_sender_test(tx)).unwrap();
+129
View File
@@ -0,0 +1,129 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use assert_matches::assert_matches;
use futures::{executor::block_on, StreamExt};
#[derive(Clone, Copy, Debug, Default)]
struct Msg {
val: u8,
}
#[test]
fn try_send_try_next() {
block_on(async move {
let (mut tx, mut rx) = channel::<Msg>(5);
let msg = Msg::default();
assert_matches!(rx.meter().read(), Readout { sent: 0, received: 0, .. });
tx.try_send(msg).unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 1, received: 0, .. });
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 0, .. });
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 1);
});
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 0);
});
assert!(rx.try_next().is_err());
});
}
#[test]
fn with_tasks() {
let (ready, go) = futures::channel::oneshot::channel();
let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move {
futures::join!(
async move {
let msg = Msg::default();
assert_matches!(tx.meter().read(), Readout { sent: 0, received: 0, .. });
tx.try_send(msg).unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 1, received: 0, .. });
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
ready.send(()).expect("Helper oneshot channel must work. qed");
},
async move {
go.await.expect("Helper oneshot channel must work. qed");
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 0, .. });
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 3, .. });
rx.try_next().unwrap();
assert_matches!(dbg!(rx.meter().read()), Readout { sent: 4, received: 4, .. });
}
)
});
}
use futures_timer::Delay;
use std::time::Duration;
#[test]
fn stream_and_sink() {
let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move {
futures::join!(
async move {
for i in 0..15 {
println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().read());
let msg = Msg { val: i as u8 + 1u8 };
tx.send(msg).await.unwrap();
assert!(tx.meter().read().sent > 0usize);
Delay::new(Duration::from_millis(20)).await;
}
()
},
async move {
while let Some(msg) = rx.next().await {
println!("rx'd one {} with {} backlogged", msg.val, rx.meter().read());
Delay::new(Duration::from_millis(29)).await;
}
}
)
});
}
#[test]
fn failed_send_does_not_inc_sent() {
let (mut bounded, _) = channel::<Msg>(5);
let (unbounded, _) = unbounded::<Msg>();
block_on(async move {
assert!(bounded.send(Msg::default()).await.is_err());
assert!(bounded.try_send(Msg::default()).is_err());
assert_matches!(bounded.meter().read(), Readout { sent: 0, received: 0, .. });
assert!(unbounded.unbounded_send(Msg::default()).is_err());
assert_matches!(unbounded.meter().read(), Readout { sent: 0, received: 0, .. });
});
}
+41 -17
View File
@@ -24,11 +24,11 @@ use futures::{
use std::{pin::Pin, result};
use super::Meter;
use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter};
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = mpsc::unbounded::<MaybeTimeOfFlight<T>>();
let shared_meter = Meter::default();
let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
@@ -40,11 +40,11 @@ pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>
pub struct UnboundedMeteredReceiver<T> {
// count currently contained messages
meter: Meter,
inner: mpsc::UnboundedReceiver<T>,
inner: mpsc::UnboundedReceiver<MaybeTimeOfFlight<T>>,
}
impl<T> std::ops::Deref for UnboundedMeteredReceiver<T> {
type Target = mpsc::UnboundedReceiver<T>;
type Target = mpsc::UnboundedReceiver<MaybeTimeOfFlight<T>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
@@ -60,11 +60,8 @@ impl<T> Stream for UnboundedMeteredReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match mpsc::UnboundedReceiver::poll_next(Pin::new(&mut self.inner), cx) {
Poll::Ready(x) => {
self.meter.note_received();
Poll::Ready(x)
},
other => other,
Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)),
Poll::Pending => Poll::Pending,
}
}
@@ -75,6 +72,23 @@ impl<T> Stream for UnboundedMeteredReceiver<T> {
}
impl<T> UnboundedMeteredReceiver<T> {
fn maybe_meter_tof(&mut self, maybe_value: Option<MaybeTimeOfFlight<T>>) -> Option<T> {
self.meter.note_received();
maybe_value.map(|value| {
match value {
MaybeTimeOfFlight::<T>::WithTimeOfFlight(value, tof_start) => {
// do not use `.elapsed()` of `std::time`, it may panic
// `coarsetime` does a saturating substractio for all `CoarseInstant`s
let duration = tof_start.elapsed();
self.meter.note_time_of_flight(duration);
value
},
MaybeTimeOfFlight::<T>::Bare(value) => value,
}
.into()
})
}
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
@@ -83,10 +97,7 @@ impl<T> UnboundedMeteredReceiver<T> {
/// Attempt to receive the next item.
pub fn try_next(&mut self) -> Result<Option<T>, mpsc::TryRecvError> {
match self.inner.try_next()? {
Some(x) => {
self.meter.note_received();
Ok(Some(x))
},
Some(value) => Ok(self.maybe_meter_tof(Some(value))),
None => Ok(None),
}
}
@@ -103,7 +114,7 @@ impl<T> futures::stream::FusedStream for UnboundedMeteredReceiver<T> {
#[derive(Debug)]
pub struct UnboundedMeteredSender<T> {
meter: Meter,
inner: mpsc::UnboundedSender<T>,
inner: mpsc::UnboundedSender<MaybeTimeOfFlight<T>>,
}
impl<T> Clone for UnboundedMeteredSender<T> {
@@ -113,7 +124,7 @@ impl<T> Clone for UnboundedMeteredSender<T> {
}
impl<T> std::ops::Deref for UnboundedMeteredSender<T> {
type Target = mpsc::UnboundedSender<T>;
type Target = mpsc::UnboundedSender<MaybeTimeOfFlight<T>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
@@ -126,14 +137,27 @@ impl<T> std::ops::DerefMut for UnboundedMeteredSender<T> {
}
impl<T> UnboundedMeteredSender<T> {
fn prepare_with_tof(&self, item: T) -> MaybeTimeOfFlight<T> {
let previous = self.meter.note_sent();
let item = if measure_tof_check(previous) {
MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
} else {
MaybeTimeOfFlight::Bare(item)
};
item
}
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
}
/// Attempt to send message or fail immediately.
pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.meter.note_sent();
pub fn unbounded_send(
&self,
msg: T,
) -> result::Result<(), mpsc::TrySendError<MaybeTimeOfFlight<T>>> {
let msg = self.prepare_with_tof(msg);
self.inner.unbounded_send(msg).map_err(|e| {
self.meter.retract_sent();
e
+1 -1
View File
@@ -555,7 +555,7 @@ where
// We combine the amount of messages from subsystems to the overseer
// as well as the amount of messages from external sources to the overseer
// into one `to_overseer` value.
metronome_metrics.channel_fill_level_snapshot(
metronome_metrics.channel_metrics_snapshot(
subsystem_meters
.iter()
.cloned()
+37 -1
View File
@@ -27,10 +27,15 @@ struct MetricsInner {
activated_heads_total: prometheus::Counter<prometheus::U64>,
deactivated_heads_total: prometheus::Counter<prometheus::U64>,
messages_relayed_total: prometheus::Counter<prometheus::U64>,
to_subsystem_bounded_tof: prometheus::HistogramVec,
to_subsystem_bounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_received: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_unbounded_tof: prometheus::HistogramVec,
to_subsystem_unbounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_unbounded_received: prometheus::GaugeVec<prometheus::U64>,
signals_sent: prometheus::GaugeVec<prometheus::U64>,
signals_received: prometheus::GaugeVec<prometheus::U64>,
@@ -68,7 +73,7 @@ impl Metrics {
}
}
pub(crate) fn channel_fill_level_snapshot(
pub(crate) fn channel_metrics_snapshot(
&self,
collection: impl IntoIterator<Item = (&'static str, SubsystemMeterReadouts)>,
) {
@@ -105,6 +110,17 @@ impl Metrics {
.signals_received
.with_label_values(&[name])
.set(readouts.signals.received as u64);
let hist_bounded = metrics.to_subsystem_bounded_tof.with_label_values(&[name]);
for tof in readouts.bounded.tof {
hist_bounded.observe(tof.as_f64());
}
let hist_unbounded =
metrics.to_subsystem_unbounded_tof.with_label_values(&[name]);
for tof in readouts.unbounded.tof {
hist_unbounded.observe(tof.as_f64());
}
});
}
}
@@ -134,6 +150,16 @@ impl MetricsTrait for Metrics {
)?,
registry,
)?,
to_subsystem_bounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_subsystem_bounded_tof",
"Duration spent in a particular channel from entrance to removal",
),
&["subsystem_name"],
)?,
registry,
)?,
to_subsystem_bounded_sent: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
@@ -154,6 +180,16 @@ impl MetricsTrait for Metrics {
)?,
registry,
)?,
to_subsystem_unbounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_subsystem_unbounded_tof",
"Duration spent in a particular channel from entrance to removal",
),
&["subsystem_name"],
)?,
registry,
)?,
to_subsystem_unbounded_sent: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(