From d7adf8f20187adc6d522d25d2995fb095bd50702 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Wed, 13 Jan 2021 17:40:27 +0100 Subject: [PATCH] metered mpsc channels (#2235) --- polkadot/Cargo.lock | 11 ++ polkadot/Cargo.toml | 1 + polkadot/node/metered-channel/Cargo.toml | 14 ++ polkadot/node/metered-channel/src/bounded.rs | 178 +++++++++++++++++ polkadot/node/metered-channel/src/lib.rs | 150 +++++++++++++++ .../node/metered-channel/src/unbounded.rs | 179 ++++++++++++++++++ polkadot/node/network/bridge/Cargo.toml | 1 + polkadot/node/network/bridge/src/lib.rs | 8 +- polkadot/node/overseer/src/lib.rs | 157 ++++++++++----- polkadot/node/subsystem-util/Cargo.toml | 1 + polkadot/node/subsystem-util/src/lib.rs | 100 +++++++++- 11 files changed, 744 insertions(+), 56 deletions(-) create mode 100644 polkadot/node/metered-channel/Cargo.toml create mode 100644 polkadot/node/metered-channel/src/bounded.rs create mode 100644 polkadot/node/metered-channel/src/lib.rs create mode 100644 polkadot/node/metered-channel/src/unbounded.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8ba8bad1f6..776ab22e4c 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -3475,6 +3475,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "metered-channel" +version = "0.1.0" +dependencies = [ + "assert_matches", + "futures 0.3.8", + "futures-timer 3.0.2", +] + [[package]] name = "mick-jaeger" version = "0.1.4" @@ -5013,6 +5022,7 @@ dependencies = [ "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", "polkadot-primitives", "sc-authority-discovery", "sc-network", @@ -5315,6 +5325,7 @@ dependencies = [ "futures 0.3.8", "futures-timer 3.0.2", "log", + "metered-channel", "parity-scale-codec", "parking_lot 0.11.1", "pin-project 1.0.4", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 972a07ba58..a69f8afab4 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -66,6 +66,7 @@ members = [ "node/subsystem-test-helpers", "node/subsystem-util", "node/jaeger", + "node/metered-channel", "node/test/client", "node/test/service", "parachain/test-parachains", diff --git a/polkadot/node/metered-channel/Cargo.toml b/polkadot/node/metered-channel/Cargo.toml new file mode 100644 index 0000000000..0194c31e07 --- /dev/null +++ b/polkadot/node/metered-channel/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "metered-channel" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +description = "Channels with attached Meters" + +[dependencies] +futures = "0.3.8" +futures-timer = "3.0.2" + +[dev-dependencies] +assert_matches = "1.4.0" +futures = { version = "0.3.8", features = ["thread-pool"] } diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs new file mode 100644 index 0000000000..82740266a8 --- /dev/null +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -0,0 +1,178 @@ +// Copyright 2017-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Metered variant of bounded mpsc channels to be able to extract metrics. + +use super::*; + +/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. +pub fn channel(capacity: usize, name: &'static str) -> (MeteredSender, MeteredReceiver) { + let (tx, rx) = mpsc::channel(capacity); + let mut shared_meter = Meter::default(); + shared_meter.name = name; + let tx = MeteredSender { meter: shared_meter.clone(), inner: tx }; + let rx = MeteredReceiver { meter: shared_meter, inner: rx }; + (tx, rx) +} + +/// A receiver tracking the messages consumed by itself. +#[derive(Debug)] +pub struct MeteredReceiver { + // count currently contained messages + meter: Meter, + inner: mpsc::Receiver, +} + +impl std::ops::Deref for MeteredReceiver { + type Target = mpsc::Receiver; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for MeteredReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Stream for MeteredReceiver { + type Item = T; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match mpsc::Receiver::poll_next(Pin::new(&mut self.inner), cx) { + Poll::Ready(x) => { + // always use Ordering::SeqCst to avoid underflows + self.meter.fill.fetch_sub(1, Ordering::SeqCst); + Poll::Ready(x) + } + other => other, + } + } + + /// Don't rely on the unreliable size hint. + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl MeteredReceiver { + /// Get an updated accessor object for all metrics collected. + pub fn meter(&self) -> &Meter { + &self.meter + } + + /// Attempt to receive the next item. + pub fn try_next(&mut self) -> Result, mpsc::TryRecvError> { + match self.inner.try_next()? { + Some(x) => { + self.meter.fill.fetch_sub(1, Ordering::SeqCst); + Ok(Some(x)) + } + None => Ok(None), + } + } +} + +impl futures::stream::FusedStream for MeteredReceiver { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + + +/// The sender component, tracking the number of items +/// sent across it. +#[derive(Debug)] +pub struct MeteredSender { + meter: Meter, + inner: mpsc::Sender, +} + +impl Clone for MeteredSender { + fn clone(&self) -> Self { + Self { meter: self.meter.clone(), inner: self.inner.clone() } + } +} + +impl std::ops::Deref for MeteredSender { + type Target = mpsc::Sender; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for MeteredSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl MeteredSender { + /// Get an updated accessor object for all metrics collected. + pub fn meter(&self) -> &Meter { + &self.meter + } + + /// Send message, wait until capacity is available. + pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError> + where + Self: Unpin, + { + self.meter.fill.fetch_add(1, Ordering::SeqCst); + let fut = self.inner.send(item); + futures::pin_mut!(fut); + fut.await + } + + /// Attempt to send message or fail immediately. + pub fn try_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError> { + self.inner.try_send(msg)?; + self.meter.fill.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +impl futures::sink::Sink for MeteredSender { + type Error = mpsc::SendError; + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Pin::new(&mut self.inner).start_send(item) + } + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_ready(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_close(cx) { + val @ Poll::Ready(_)=> { + self.meter.fill.store(0, Ordering::SeqCst); + val + } + other => other, + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_flush(cx) { + val @ Poll::Ready(_)=> { + self.meter.fill.fetch_add(1, Ordering::SeqCst); + val + } + other => other, + } + } +} diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs new file mode 100644 index 0000000000..b7188689b0 --- /dev/null +++ b/polkadot/node/metered-channel/src/lib.rs @@ -0,0 +1,150 @@ +// Copyright 2017-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Metered variant of mpsc channels to be able to extract metrics. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::{channel::mpsc, task::Poll, task::Context, sink::SinkExt, stream::Stream}; + +use std::result; +use std::sync::Arc; +use std::pin::Pin; + +mod bounded; +mod unbounded; + +pub use self::bounded::*; +pub use self::unbounded::*; + +/// A peek into the inner state of a meter. +#[derive(Debug, Clone, Default)] +pub struct Meter { + /// Name of the receiver and sender pair. + name: &'static str, + // fill state of the channel + fill: Arc, +} + +impl Meter { + /// Count the number of items queued up inside the channel. + pub fn queue_count(&self) -> usize { + // when obtaining we don't care much about off by one + // accuracy + self.fill.load(Ordering::Relaxed) + } + + /// Obtain the name of the channel `Sender` and `Receiver` pair. + pub fn name(&self) -> &'static str { + self.name + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::executor::block_on; + use futures::StreamExt; + + #[derive(Clone, Copy, Debug, Default)] + struct Msg { + val: u8, + } + + #[test] + fn try_send_try_next() { + block_on(async move { + let (mut tx, mut rx) = channel::(5, "goofy"); + let msg = Msg::default(); + assert_eq!(rx.meter().queue_count(), 0); + tx.try_send(msg).unwrap(); + assert_eq!(tx.meter().queue_count(), 1); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + assert_eq!(tx.meter().queue_count(), 4); + rx.try_next().unwrap(); + assert_eq!(rx.meter().queue_count(), 3); + rx.try_next().unwrap(); + rx.try_next().unwrap(); + assert_eq!(tx.meter().queue_count(), 1); + rx.try_next().unwrap(); + assert_eq!(rx.meter().queue_count(), 0); + assert!(rx.try_next().is_err()); + }); + } + + #[test] + fn with_tasks() { + let (ready, go) = futures::channel::oneshot::channel(); + + let (mut tx, mut rx) = channel::(5, "goofy"); + block_on(async move { + futures::join!( + async move { + let msg = Msg::default(); + assert_eq!(tx.meter().queue_count(), 0); + tx.try_send(msg).unwrap(); + assert_eq!(tx.meter().queue_count(), 1); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + tx.try_send(msg).unwrap(); + ready.send(()).expect("Helper oneshot channel must work. qed"); + }, + async move { + go.await.expect("Helper oneshot channel must work. qed"); + assert_eq!(rx.meter().queue_count(), 4); + rx.try_next().unwrap(); + assert_eq!(rx.meter().queue_count(), 3); + rx.try_next().unwrap(); + rx.try_next().unwrap(); + assert_eq!(rx.meter().queue_count(), 1); + rx.try_next().unwrap(); + assert_eq!(dbg!(rx.meter().queue_count()), 0); + } + ) + }); + } + + use std::time::Duration; + use futures_timer::Delay; + + #[test] + fn stream_and_sink() { + let (mut tx, mut rx) = channel::(5, "goofy"); + + block_on(async move { + futures::join!( + async move { + for i in 0..15 { + println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().queue_count()); + let msg = Msg { val: i as u8 + 1u8 }; + tx.send(msg).await.unwrap(); + assert!(tx.meter().queue_count() > 0usize); + Delay::new(Duration::from_millis(20)).await; + } + () + }, + async move { + while let Some(msg) = rx.next().await { + println!("rx'd one {} with {} backlogged", msg.val, rx.meter().queue_count()); + Delay::new(Duration::from_millis(29)).await; + } + } + ) + }); + } +} diff --git a/polkadot/node/metered-channel/src/unbounded.rs b/polkadot/node/metered-channel/src/unbounded.rs new file mode 100644 index 0000000000..1a58ac6f1c --- /dev/null +++ b/polkadot/node/metered-channel/src/unbounded.rs @@ -0,0 +1,179 @@ +// Copyright 2017-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Metered variant of unbounded mpsc channels to be able to extract metrics. + +use super::*; + +/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. +pub fn unbounded(name: &'static str) -> (UnboundedMeteredSender, UnboundedMeteredReceiver) { + let (tx, rx) = mpsc::unbounded(); + let mut shared_meter = Meter::default(); + shared_meter.name = name; + let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx }; + let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx }; + (tx, rx) +} + +/// A receiver tracking the messages consumed by itself. +#[derive(Debug)] +pub struct UnboundedMeteredReceiver { + // count currently contained messages + meter: Meter, + inner: mpsc::UnboundedReceiver, +} + +impl std::ops::Deref for UnboundedMeteredReceiver { + type Target = mpsc::UnboundedReceiver; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for UnboundedMeteredReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Stream for UnboundedMeteredReceiver { + type Item = T; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match mpsc::UnboundedReceiver::poll_next(Pin::new(&mut self.inner), cx) { + Poll::Ready(x) => { + // always use Ordering::SeqCst to avoid underflows + self.meter.fill.fetch_sub(1, Ordering::SeqCst); + Poll::Ready(x) + } + other => other, + } + } + + /// Don't rely on the unreliable size hint. + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl UnboundedMeteredReceiver { + /// Get an updated accessor object for all metrics collected. + pub fn meter(&self) -> &Meter { + &self.meter + } + + /// Attempt to receive the next item. + pub fn try_next(&mut self) -> Result, mpsc::TryRecvError> { + match self.inner.try_next()? { + Some(x) => { + self.meter.fill.fetch_sub(1, Ordering::SeqCst); + Ok(Some(x)) + } + None => Ok(None), + } + } +} + +impl futures::stream::FusedStream for UnboundedMeteredReceiver { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + + +/// The sender component, tracking the number of items +/// sent across it. +#[derive(Debug)] +pub struct UnboundedMeteredSender { + meter: Meter, + inner: mpsc::UnboundedSender, +} + +impl Clone for UnboundedMeteredSender { + fn clone(&self) -> Self { + Self { meter: self.meter.clone(), inner: self.inner.clone() } + } +} + +impl std::ops::Deref for UnboundedMeteredSender { + type Target = mpsc::UnboundedSender; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for UnboundedMeteredSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl UnboundedMeteredSender { + /// Get an updated accessor object for all metrics collected. + pub fn meter(&self) -> &Meter { + &self.meter + } + + /// Send message, wait until capacity is available. + pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError> + where + Self: Unpin, + { + self.meter.fill.fetch_add(1, Ordering::SeqCst); + let fut = self.inner.send(item); + futures::pin_mut!(fut); + fut.await + } + + + /// Attempt to send message or fail immediately. + pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError> { + self.inner.unbounded_send(msg).expect("Unbounded send never fails. qed"); + self.meter.fill.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +impl futures::sink::Sink for UnboundedMeteredSender { + type Error = as futures::sink::Sink>::Error; + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Pin::new(&mut self.inner).start_send(item) + } + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_ready(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_ready(cx) { + val @ Poll::Ready(_)=> { + self.meter.fill.store(0, Ordering::SeqCst); + val + } + other => other, + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_ready(cx) { + val @ Poll::Ready(_)=> { + self.meter.fill.fetch_add(1, Ordering::SeqCst); + val + } + other => other, + } + } +} diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 2d3d214f02..0e6d299420 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -20,5 +20,6 @@ polkadot-node-network-protocol = { path = "../protocol" } assert_matches = "1.4.0" parking_lot = "0.11.1" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } +polkadot-node-subsystem-util = { path = "../../subsystem-util"} sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index a015947912..3512ee48ee 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -791,7 +791,6 @@ where #[cfg(test)] mod tests { use super::*; - use futures::channel::mpsc; use futures::executor; use std::borrow::Cow; @@ -805,6 +804,7 @@ mod tests { use polkadot_node_subsystem_test_helpers::{ SingleItemSink, SingleItemStream, TestSubsystemContextHandle, }; + use polkadot_node_subsystem_util::metered; use polkadot_node_network_protocol::view; use sc_network::Multiaddr; use sp_keyring::Sr25519Keyring; @@ -812,7 +812,7 @@ mod tests { // The subsystem's view of the network - only supports a single call to `event_stream`. struct TestNetwork { net_events: Arc>>>, - action_tx: mpsc::UnboundedSender, + action_tx: metered::UnboundedMeteredSender, } struct TestAuthorityDiscovery; @@ -820,7 +820,7 @@ mod tests { // The test's view of the network. This receives updates from the subsystem in the form // of `NetworkAction`s. struct TestNetworkHandle { - action_rx: mpsc::UnboundedReceiver, + action_rx: metered::UnboundedMeteredReceiver, net_tx: SingleItemSink, } @@ -830,7 +830,7 @@ mod tests { TestAuthorityDiscovery, ) { let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); - let (action_tx, action_rx) = mpsc::unbounded(); + let (action_tx, action_rx) = metered::unbounded("test_action"); ( TestNetwork { diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 598f4121c7..d61b883386 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -66,12 +66,12 @@ use std::task::Poll; use std::time::Duration; use std::collections::{hash_map, HashMap}; -use futures::channel::{mpsc, oneshot}; +use futures::channel::{oneshot, mpsc}; use futures::{ poll, select, future::BoxFuture, stream::{FuturesUnordered, Fuse}, - Future, FutureExt, SinkExt, StreamExt, + Future, FutureExt, StreamExt, }; use futures_timer::Delay; use oorandom::Rand32; @@ -90,7 +90,7 @@ pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, JaegerSpan, jaeger, }; -use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}}; +use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome}; use polkadot_node_primitives::SpawnNamed; // A capacity of bounded channels inside the overseer. @@ -102,6 +102,8 @@ const LOG_TARGET: &'static str = "overseer"; // Rate at which messages are timed. const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; + + /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// /// It wraps a system-wide [`AllMessages`] type that represents all possible @@ -188,7 +190,7 @@ enum ExternalRequest { /// [`Overseer`]: struct.Overseer.html #[derive(Clone)] pub struct OverseerHandler { - events_tx: mpsc::Sender, + events_tx: metered::MeteredSender, } impl OverseerHandler { @@ -289,7 +291,7 @@ impl Debug for ToOverseer { /// /// [`Subsystem`]: trait.Subsystem.html struct SubsystemInstance { - tx: mpsc::Sender>, + tx: metered::MeteredSender>, name: &'static str, } @@ -322,8 +324,8 @@ impl From for MaybeTimed { /// [`SubsystemJob`]: trait.SubsystemJob.html #[derive(Debug)] pub struct OverseerSubsystemContext{ - rx: mpsc::Receiver>, - tx: mpsc::UnboundedSender>, + rx: metered::MeteredReceiver>, + tx: metered::UnboundedMeteredSender>, metrics: Metrics, rng: Rand32, threshold: u32, @@ -338,8 +340,8 @@ impl OverseerSubsystemContext { /// `capture_rate` determines what fraction of messages are timed. Its value is clamped /// to the range `0.0..=1.0`. fn new( - rx: mpsc::Receiver>, - tx: mpsc::UnboundedSender>, + rx: metered::MeteredReceiver>, + tx: metered::UnboundedMeteredSender>, metrics: Metrics, increment: u64, mut capture_rate: f64, @@ -361,8 +363,8 @@ impl OverseerSubsystemContext { /// Intended for tests. #[allow(unused)] fn new_unmetered( - rx: mpsc::Receiver>, - tx: mpsc::UnboundedSender>, + rx: metered::MeteredReceiver>, + tx: metered::UnboundedMeteredSender>, ) -> Self { let metrics = Metrics::default(); OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) @@ -559,10 +561,10 @@ pub struct Overseer { running_subsystems: FuturesUnordered>>, /// Gather running subsystems' outbound streams into one. - to_overseer_rx: Fuse>>, + to_overseer_rx: Fuse>>, /// Events that are sent to the overseer from the outside world - events_rx: mpsc::Receiver, + events_rx: metered::MeteredReceiver, /// External listeners waiting for a hash to be in the active-leave set. activation_external_listeners: HashMap>>>, @@ -1045,6 +1047,8 @@ struct MetricsInner { deactivated_heads_total: prometheus::Counter, messages_relayed_total: prometheus::Counter, message_relay_timing: prometheus::Histogram, + channel_fill_level_to_overseer: prometheus::Histogram, + channel_fill_level_from_overseer: prometheus::Histogram, } #[derive(Default, Clone)] @@ -1073,6 +1077,11 @@ impl Metrics { fn time_message_hold(&self) -> MaybeTimer { self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer()) } + + fn channel_fill_level_snapshot(&self, from_overseer: usize, to_overseer: usize) { + self.0.as_ref().map(|metrics| metrics.channel_fill_level_to_overseer.observe(to_overseer as f64)); + self.0.as_ref().map(|metrics| metrics.channel_fill_level_from_overseer.observe(from_overseer as f64)); + } } impl metrics::Metrics for Metrics { @@ -1121,6 +1130,30 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + channel_fill_level_from_overseer: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts { + common_opts: prometheus::Opts::new( + "overseer_channel_fill_level_from_overseer", + "Number of elements sitting in the channel waiting to be processed.", + ), + buckets: prometheus::exponential_buckets(0.00001_f64, 2_f64, (CHANNEL_CAPACITY as f64).log2().ceil() as usize).expect("inputs are within documented range; qed"), + } + )?, + registry, + )?, + channel_fill_level_to_overseer: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts { + common_opts: prometheus::Opts::new( + "overseer_channel_fill_level_to_overseer", + "Number of elements sitting in the channel waiting to be processed.", + ), + buckets: prometheus::exponential_buckets(0.00001_f64, 2_f64, (CHANNEL_CAPACITY as f64).log2().ceil() as usize).expect("inputs are within documented range; qed"), + } + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } @@ -1242,7 +1275,7 @@ where CG: Subsystem> + Send, CP: Subsystem> + Send, { - let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY, "overseer_events"); let handler = OverseerHandler { events_tx: events_tx.clone(), @@ -1250,7 +1283,23 @@ where let metrics = ::register(prometheus_registry)?; - let (to_overseer_tx, to_overseer_rx) = mpsc::unbounded(); + let (to_overseer_tx, to_overseer_rx) = metered::unbounded("to_overseer"); + + { + let meter_from_overseer = events_rx.meter().clone(); + let meter_to_overseer = to_overseer_rx.meter().clone(); + let metronome_metrics = metrics.clone(); + let metronome = Metronome::new(std::time::Duration::from_millis(137)) + .for_each(move |_| { + metronome_metrics.channel_fill_level_snapshot(meter_from_overseer.queue_count(), meter_to_overseer.queue_count()); + + async move { + () + } + }); + s.spawn("metrics_metronome", Box::pin(metronome)); + } + let mut running_subsystems = FuturesUnordered::new(); let mut seed = 0x533d; // arbitrary @@ -1258,7 +1307,7 @@ where let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.candidate_validation, &metrics, &mut seed, @@ -1267,7 +1316,7 @@ where let candidate_backing_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.candidate_backing, &metrics, &mut seed, @@ -1276,7 +1325,7 @@ where let candidate_selection_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.candidate_selection, &metrics, &mut seed, @@ -1285,7 +1334,7 @@ where let statement_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.statement_distribution, &metrics, &mut seed, @@ -1294,7 +1343,7 @@ where let availability_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.availability_distribution, &metrics, &mut seed, @@ -1303,7 +1352,7 @@ where let bitfield_signing_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.bitfield_signing, &metrics, &mut seed, @@ -1312,7 +1361,7 @@ where let bitfield_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.bitfield_distribution, &metrics, &mut seed, @@ -1321,7 +1370,7 @@ where let provisioner_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.provisioner, &metrics, &mut seed, @@ -1330,7 +1379,7 @@ where let pov_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.pov_distribution, &metrics, &mut seed, @@ -1339,7 +1388,7 @@ where let runtime_api_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.runtime_api, &metrics, &mut seed, @@ -1348,7 +1397,7 @@ where let availability_store_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.availability_store, &metrics, &mut seed, @@ -1357,7 +1406,7 @@ where let network_bridge_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.network_bridge, &metrics, &mut seed, @@ -1366,7 +1415,7 @@ where let chain_api_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.chain_api, &metrics, &mut seed, @@ -1375,7 +1424,7 @@ where let collation_generation_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.collation_generation, &metrics, &mut seed, @@ -1385,7 +1434,7 @@ where let collator_protocol_subsystem = spawn( &mut s, &mut running_subsystems, - to_overseer_tx.clone(), + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), all_subsystems.collator_protocol, &metrics, &mut seed, @@ -1735,12 +1784,12 @@ where fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>>, - to_overseer: mpsc::UnboundedSender>, + to_overseer: metered::UnboundedMeteredSender>, s: impl Subsystem>, metrics: &Metrics, seed: &mut u64, ) -> SubsystemResult> { - let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (to_tx, to_rx) = metered::channel(CHANNEL_CAPACITY, "subsystem_spawn"); let ctx = OverseerSubsystemContext::new( to_rx, to_overseer, @@ -1783,18 +1832,19 @@ fn spawn( mod tests { use std::sync::atomic; use std::collections::HashMap; - use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending}; + use futures::{executor, pin_mut, select, FutureExt, pending}; use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash}; use polkadot_subsystem::{messages::RuntimeApiRequest, JaegerSpan}; use polkadot_node_primitives::{Collation, CollationGenerationConfig}; use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent}; + use polkadot_node_subsystem_util::metered; use sp_core::crypto::Pair as _; use super::*; - struct TestSubsystem1(mpsc::Sender); + struct TestSubsystem1(metered::MeteredSender); impl Subsystem for TestSubsystem1 where C: SubsystemContext @@ -1822,7 +1872,7 @@ mod tests { } } - struct TestSubsystem2(mpsc::Sender); + struct TestSubsystem2(metered::MeteredSender); impl Subsystem for TestSubsystem2 where C: SubsystemContext @@ -1893,8 +1943,11 @@ mod tests { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (s1_tx, mut s1_rx) = mpsc::channel::(64); - let (s2_tx, mut s2_rx) = mpsc::channel::(64); + let (s1_tx, s1_rx) = metered::channel::(64, "overseer_test"); + let (s2_tx, s2_rx) = metered::channel::(64, "overseer_test"); + + let mut s1_rx = s1_rx.fuse(); + let mut s2_rx = s2_rx.fuse(); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem1(s1_tx)) @@ -1999,13 +2052,15 @@ mod tests { fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> { let gather = registry.gather(); - assert_eq!(gather[0].get_name(), "overseer_messages_relay_timing"); - assert_eq!(gather[1].get_name(), "parachain_activated_heads_total"); - assert_eq!(gather[2].get_name(), "parachain_deactivated_heads_total"); - assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total"); - let activated = gather[1].get_metric()[0].get_counter().get_value() as u64; - let deactivated = gather[2].get_metric()[0].get_counter().get_value() as u64; - let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64; + assert_eq!(gather[0].get_name(), "overseer_channel_fill_level_from_overseer"); + assert_eq!(gather[1].get_name(), "overseer_channel_fill_level_to_overseer"); + assert_eq!(gather[2].get_name(), "overseer_messages_relay_timing"); + assert_eq!(gather[3].get_name(), "parachain_activated_heads_total"); + assert_eq!(gather[4].get_name(), "parachain_deactivated_heads_total"); + assert_eq!(gather[5].get_name(), "parachain_messages_relayed_total"); + let activated = gather[3].get_metric()[0].get_counter().get_value() as u64; + let deactivated = gather[4].get_metric()[0].get_counter().get_value() as u64; + let relayed = gather[5].get_metric()[0].get_counter().get_value() as u64; let mut result = HashMap::new(); result.insert("activated", activated); result.insert("deactivated", deactivated); @@ -2034,7 +2089,7 @@ mod tests { }) } - struct TestSubsystem5(mpsc::Sender); + struct TestSubsystem5(metered::MeteredSender); impl Subsystem for TestSubsystem5 where C: SubsystemContext @@ -2065,7 +2120,7 @@ mod tests { } } - struct TestSubsystem6(mpsc::Sender); + struct TestSubsystem6(metered::MeteredSender); impl Subsystem for TestSubsystem6 where C: SubsystemContext @@ -2123,8 +2178,8 @@ mod tests { number: 3, }; - let (tx_5, mut rx_5) = mpsc::channel(64); - let (tx_6, mut rx_6) = mpsc::channel(64); + let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); + let (tx_6, mut rx_6) = metered::channel(64, "overseer_test"); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) .replace_candidate_backing(TestSubsystem6(tx_6)); @@ -2216,8 +2271,8 @@ mod tests { number: 3, }; - let (tx_5, mut rx_5) = mpsc::channel(64); - let (tx_6, mut rx_6) = mpsc::channel(64); + let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); + let (tx_6, mut rx_6) = metered::channel(64, "overseer_test"); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) @@ -2308,7 +2363,7 @@ mod tests { number: 1, }; - let (tx_5, mut rx_5) = mpsc::channel(64); + let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_backing(TestSubsystem6(tx_5)); diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 57fb5a8b7e..b74c656d98 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -21,6 +21,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem = { path = "../subsystem" } polkadot-node-jaeger = { path = "../jaeger" } polkadot-primitives = { path = "../../primitives" } +metered-channel = { path = "../metered-channel"} sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index e9fd475f10..9ce1258e8a 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -50,6 +50,7 @@ use streamunordered::{StreamUnordered, StreamYield}; use thiserror::Error; pub mod validator_discovery; +pub use metered_channel as metered; /// These reexports are required so that external crates can use the `delegated_subsystem` macro properly. pub mod reexports { @@ -987,9 +988,64 @@ impl Future for Timeout { } } + +#[derive(Copy, Clone)] +enum MetronomeState { + Snooze, + SetAlarm, +} + +/// Create a stream of ticks with a defined cycle duration. +pub struct Metronome { + delay: Delay, + period: Duration, + state: MetronomeState, +} + +impl Metronome +{ + /// Create a new metronome source with a defined cycle duration. + pub fn new(cycle: Duration) -> Self { + let period = cycle.into(); + Self { + period, + delay: Delay::new(period), + state: MetronomeState::Snooze, + } + } +} + +impl futures::Stream for Metronome +{ + type Item = (); + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_> + ) -> Poll> { + loop { + match self.state { + MetronomeState::SetAlarm => { + let val = self.period.clone(); + self.delay.reset(val); + self.state = MetronomeState::Snooze; + } + MetronomeState::Snooze => { + if !Pin::new(&mut self.delay).poll(cx).is_ready() { + break + } + self.state = MetronomeState::SetAlarm; + return Poll::Ready(Some(())); + } + } + } + Poll::Pending + } +} + #[cfg(test)] mod tests { use super::*; + use executor::block_on; use thiserror::Error; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, @@ -999,7 +1055,7 @@ mod tests { use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt}; use polkadot_primitives::v1::Hash; use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; - use std::{pin::Pin, time::Duration, sync::Arc}; + use std::{pin::Pin, sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration}; // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; // you can leave the subsystem itself to the job manager. @@ -1183,4 +1239,46 @@ mod tests { FakeCandidateSelectionSubsystem::new(pool, false, ()).start(context); assert_eq!(name, "FakeCandidateSelection"); } + + + #[test] + fn tick_tack_metronome() { + let n = Arc::new(AtomicUsize::default()); + + let (tick, mut block) = mpsc::unbounded(); + + let metronome = { + let n = n.clone(); + let stream = Metronome::new(Duration::from_millis(137_u64)); + stream.for_each(move |_res| { + let _ = n.fetch_add(1, Ordering::Relaxed); + let mut tick = tick.clone(); + async move { + tick.send(()).await.expect("Test helper channel works. qed"); + } + }).fuse() + }; + + let f2 = async move { + block.next().await; + assert_eq!(n.load(Ordering::Relaxed), 1_usize); + block.next().await; + assert_eq!(n.load(Ordering::Relaxed), 2_usize); + block.next().await; + assert_eq!(n.load(Ordering::Relaxed), 3_usize); + block.next().await; + assert_eq!(n.load(Ordering::Relaxed), 4_usize); + }.fuse(); + + futures::pin_mut!(f2); + futures::pin_mut!(metronome); + + block_on(async move { + // futures::join!(metronome, f2) + futures::select!( + _ = metronome => unreachable!("Metronome never stops. qed"), + _ = f2 => (), + ) + }); + } }