metered mpsc channels (#2235)

This commit is contained in:
Bernhard Schuster
2021-01-13 17:40:27 +01:00
committed by GitHub
parent c644c39f3d
commit d7adf8f201
11 changed files with 744 additions and 56 deletions
+11
View File
@@ -3475,6 +3475,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "metered-channel"
version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.8",
"futures-timer 3.0.2",
]
[[package]]
name = "mick-jaeger"
version = "0.1.4"
@@ -5013,6 +5022,7 @@ dependencies = [
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-authority-discovery",
"sc-network",
@@ -5315,6 +5325,7 @@ dependencies = [
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"metered-channel",
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.4",
+1
View File
@@ -66,6 +66,7 @@ members = [
"node/subsystem-test-helpers",
"node/subsystem-util",
"node/jaeger",
"node/metered-channel",
"node/test/client",
"node/test/service",
"parachain/test-parachains",
+14
View File
@@ -0,0 +1,14 @@
[package]
name = "metered-channel"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Channels with attached Meters"
[dependencies]
futures = "0.3.8"
futures-timer = "3.0.2"
[dev-dependencies]
assert_matches = "1.4.0"
futures = { version = "0.3.8", features = ["thread-pool"] }
@@ -0,0 +1,178 @@
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metered variant of bounded mpsc channels to be able to extract metrics.
use super::*;
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn channel<T>(capacity: usize, name: &'static str) -> (MeteredSender<T>, MeteredReceiver<T>) {
let (tx, rx) = mpsc::channel(capacity);
let mut shared_meter = Meter::default();
shared_meter.name = name;
let tx = MeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = MeteredReceiver { meter: shared_meter, inner: rx };
(tx, rx)
}
/// A receiver tracking the messages consumed by itself.
#[derive(Debug)]
pub struct MeteredReceiver<T> {
// count currently contained messages
meter: Meter,
inner: mpsc::Receiver<T>,
}
impl<T> std::ops::Deref for MeteredReceiver<T> {
type Target = mpsc::Receiver<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> std::ops::DerefMut for MeteredReceiver<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> Stream for MeteredReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match mpsc::Receiver::poll_next(Pin::new(&mut self.inner), cx) {
Poll::Ready(x) => {
// always use Ordering::SeqCst to avoid underflows
self.meter.fill.fetch_sub(1, Ordering::SeqCst);
Poll::Ready(x)
}
other => other,
}
}
/// Don't rely on the unreliable size hint.
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<T> MeteredReceiver<T> {
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
}
/// Attempt to receive the next item.
pub fn try_next(&mut self) -> Result<Option<T>, mpsc::TryRecvError> {
match self.inner.try_next()? {
Some(x) => {
self.meter.fill.fetch_sub(1, Ordering::SeqCst);
Ok(Some(x))
}
None => Ok(None),
}
}
}
impl<T> futures::stream::FusedStream for MeteredReceiver<T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
/// The sender component, tracking the number of items
/// sent across it.
#[derive(Debug)]
pub struct MeteredSender<T> {
meter: Meter,
inner: mpsc::Sender<T>,
}
impl<T> Clone for MeteredSender<T> {
fn clone(&self) -> Self {
Self { meter: self.meter.clone(), inner: self.inner.clone() }
}
}
impl<T> std::ops::Deref for MeteredSender<T> {
type Target = mpsc::Sender<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> std::ops::DerefMut for MeteredSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> MeteredSender<T> {
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
}
/// Send message, wait until capacity is available.
pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError>
where
Self: Unpin,
{
self.meter.fill.fetch_add(1, Ordering::SeqCst);
let fut = self.inner.send(item);
futures::pin_mut!(fut);
fut.await
}
/// Attempt to send message or fail immediately.
pub fn try_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.inner.try_send(msg)?;
self.meter.fill.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
impl<T> futures::sink::Sink<T> for MeteredSender<T> {
type Error = mpsc::SendError;
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.inner).start_send(item)
}
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_close(cx) {
val @ Poll::Ready(_)=> {
self.meter.fill.store(0, Ordering::SeqCst);
val
}
other => other,
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_flush(cx) {
val @ Poll::Ready(_)=> {
self.meter.fill.fetch_add(1, Ordering::SeqCst);
val
}
other => other,
}
}
}
+150
View File
@@ -0,0 +1,150 @@
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metered variant of mpsc channels to be able to extract metrics.
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{channel::mpsc, task::Poll, task::Context, sink::SinkExt, stream::Stream};
use std::result;
use std::sync::Arc;
use std::pin::Pin;
mod bounded;
mod unbounded;
pub use self::bounded::*;
pub use self::unbounded::*;
/// A peek into the inner state of a meter.
#[derive(Debug, Clone, Default)]
pub struct Meter {
/// Name of the receiver and sender pair.
name: &'static str,
// fill state of the channel
fill: Arc<AtomicUsize>,
}
impl Meter {
/// Count the number of items queued up inside the channel.
pub fn queue_count(&self) -> usize {
// when obtaining we don't care much about off by one
// accuracy
self.fill.load(Ordering::Relaxed)
}
/// Obtain the name of the channel `Sender` and `Receiver` pair.
pub fn name(&self) -> &'static str {
self.name
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
use futures::StreamExt;
#[derive(Clone, Copy, Debug, Default)]
struct Msg {
val: u8,
}
#[test]
fn try_send_try_next() {
block_on(async move {
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
let msg = Msg::default();
assert_eq!(rx.meter().queue_count(), 0);
tx.try_send(msg).unwrap();
assert_eq!(tx.meter().queue_count(), 1);
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
assert_eq!(tx.meter().queue_count(), 4);
rx.try_next().unwrap();
assert_eq!(rx.meter().queue_count(), 3);
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_eq!(tx.meter().queue_count(), 1);
rx.try_next().unwrap();
assert_eq!(rx.meter().queue_count(), 0);
assert!(rx.try_next().is_err());
});
}
#[test]
fn with_tasks() {
let (ready, go) = futures::channel::oneshot::channel();
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
block_on(async move {
futures::join!(
async move {
let msg = Msg::default();
assert_eq!(tx.meter().queue_count(), 0);
tx.try_send(msg).unwrap();
assert_eq!(tx.meter().queue_count(), 1);
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
tx.try_send(msg).unwrap();
ready.send(()).expect("Helper oneshot channel must work. qed");
},
async move {
go.await.expect("Helper oneshot channel must work. qed");
assert_eq!(rx.meter().queue_count(), 4);
rx.try_next().unwrap();
assert_eq!(rx.meter().queue_count(), 3);
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_eq!(rx.meter().queue_count(), 1);
rx.try_next().unwrap();
assert_eq!(dbg!(rx.meter().queue_count()), 0);
}
)
});
}
use std::time::Duration;
use futures_timer::Delay;
#[test]
fn stream_and_sink() {
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
block_on(async move {
futures::join!(
async move {
for i in 0..15 {
println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().queue_count());
let msg = Msg { val: i as u8 + 1u8 };
tx.send(msg).await.unwrap();
assert!(tx.meter().queue_count() > 0usize);
Delay::new(Duration::from_millis(20)).await;
}
()
},
async move {
while let Some(msg) = rx.next().await {
println!("rx'd one {} with {} backlogged", msg.val, rx.meter().queue_count());
Delay::new(Duration::from_millis(29)).await;
}
}
)
});
}
}
@@ -0,0 +1,179 @@
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metered variant of unbounded mpsc channels to be able to extract metrics.
use super::*;
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn unbounded<T>(name: &'static str) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
let (tx, rx) = mpsc::unbounded();
let mut shared_meter = Meter::default();
shared_meter.name = name;
let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
(tx, rx)
}
/// A receiver tracking the messages consumed by itself.
#[derive(Debug)]
pub struct UnboundedMeteredReceiver<T> {
// count currently contained messages
meter: Meter,
inner: mpsc::UnboundedReceiver<T>,
}
impl<T> std::ops::Deref for UnboundedMeteredReceiver<T> {
type Target = mpsc::UnboundedReceiver<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> std::ops::DerefMut for UnboundedMeteredReceiver<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> Stream for UnboundedMeteredReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match mpsc::UnboundedReceiver::poll_next(Pin::new(&mut self.inner), cx) {
Poll::Ready(x) => {
// always use Ordering::SeqCst to avoid underflows
self.meter.fill.fetch_sub(1, Ordering::SeqCst);
Poll::Ready(x)
}
other => other,
}
}
/// Don't rely on the unreliable size hint.
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<T> UnboundedMeteredReceiver<T> {
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
}
/// Attempt to receive the next item.
pub fn try_next(&mut self) -> Result<Option<T>, mpsc::TryRecvError> {
match self.inner.try_next()? {
Some(x) => {
self.meter.fill.fetch_sub(1, Ordering::SeqCst);
Ok(Some(x))
}
None => Ok(None),
}
}
}
impl<T> futures::stream::FusedStream for UnboundedMeteredReceiver<T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
/// The sender component, tracking the number of items
/// sent across it.
#[derive(Debug)]
pub struct UnboundedMeteredSender<T> {
meter: Meter,
inner: mpsc::UnboundedSender<T>,
}
impl<T> Clone for UnboundedMeteredSender<T> {
fn clone(&self) -> Self {
Self { meter: self.meter.clone(), inner: self.inner.clone() }
}
}
impl<T> std::ops::Deref for UnboundedMeteredSender<T> {
type Target = mpsc::UnboundedSender<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> std::ops::DerefMut for UnboundedMeteredSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> UnboundedMeteredSender<T> {
/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
&self.meter
}
/// Send message, wait until capacity is available.
pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError>
where
Self: Unpin,
{
self.meter.fill.fetch_add(1, Ordering::SeqCst);
let fut = self.inner.send(item);
futures::pin_mut!(fut);
fut.await
}
/// Attempt to send message or fail immediately.
pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.inner.unbounded_send(msg).expect("Unbounded send never fails. qed");
self.meter.fill.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
impl<T> futures::sink::Sink<T> for UnboundedMeteredSender<T> {
type Error = <futures::channel::mpsc::UnboundedSender<T> as futures::sink::Sink<T>>::Error;
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.inner).start_send(item)
}
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
val @ Poll::Ready(_)=> {
self.meter.fill.store(0, Ordering::SeqCst);
val
}
other => other,
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
val @ Poll::Ready(_)=> {
self.meter.fill.fetch_add(1, Ordering::SeqCst);
val
}
other => other,
}
}
}
+1
View File
@@ -20,5 +20,6 @@ polkadot-node-network-protocol = { path = "../protocol" }
assert_matches = "1.4.0"
parking_lot = "0.11.1"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+4 -4
View File
@@ -791,7 +791,6 @@ where
#[cfg(test)]
mod tests {
use super::*;
use futures::channel::mpsc;
use futures::executor;
use std::borrow::Cow;
@@ -805,6 +804,7 @@ mod tests {
use polkadot_node_subsystem_test_helpers::{
SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
};
use polkadot_node_subsystem_util::metered;
use polkadot_node_network_protocol::view;
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring;
@@ -812,7 +812,7 @@ mod tests {
// The subsystem's view of the network - only supports a single call to `event_stream`.
struct TestNetwork {
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
action_tx: mpsc::UnboundedSender<NetworkAction>,
action_tx: metered::UnboundedMeteredSender<NetworkAction>,
}
struct TestAuthorityDiscovery;
@@ -820,7 +820,7 @@ mod tests {
// The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s.
struct TestNetworkHandle {
action_rx: mpsc::UnboundedReceiver<NetworkAction>,
action_rx: metered::UnboundedMeteredReceiver<NetworkAction>,
net_tx: SingleItemSink<NetworkEvent>,
}
@@ -830,7 +830,7 @@ mod tests {
TestAuthorityDiscovery,
) {
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded();
let (action_tx, action_rx) = metered::unbounded("test_action");
(
TestNetwork {
+106 -51
View File
@@ -66,12 +66,12 @@ use std::task::Poll;
use std::time::Duration;
use std::collections::{hash_map, HashMap};
use futures::channel::{mpsc, oneshot};
use futures::channel::{oneshot, mpsc};
use futures::{
poll, select,
future::BoxFuture,
stream::{FuturesUnordered, Fuse},
Future, FutureExt, SinkExt, StreamExt,
Future, FutureExt, StreamExt,
};
use futures_timer::Delay;
use oorandom::Rand32;
@@ -90,7 +90,7 @@ pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, JaegerSpan, jaeger,
};
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}};
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome};
use polkadot_node_primitives::SpawnNamed;
// A capacity of bounded channels inside the overseer.
@@ -102,6 +102,8 @@ const LOG_TARGET: &'static str = "overseer";
// Rate at which messages are timed.
const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005;
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
@@ -188,7 +190,7 @@ enum ExternalRequest {
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct OverseerHandler {
events_tx: mpsc::Sender<Event>,
events_tx: metered::MeteredSender<Event>,
}
impl OverseerHandler {
@@ -289,7 +291,7 @@ impl Debug for ToOverseer {
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M> {
tx: mpsc::Sender<FromOverseer<M>>,
tx: metered::MeteredSender<FromOverseer<M>>,
name: &'static str,
}
@@ -322,8 +324,8 @@ impl<T> From<T> for MaybeTimed<T> {
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
rx: metered::MeteredReceiver<FromOverseer<M>>,
tx: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
rng: Rand32,
threshold: u32,
@@ -338,8 +340,8 @@ impl<M> OverseerSubsystemContext<M> {
/// `capture_rate` determines what fraction of messages are timed. Its value is clamped
/// to the range `0.0..=1.0`.
fn new(
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
rx: metered::MeteredReceiver<FromOverseer<M>>,
tx: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
increment: u64,
mut capture_rate: f64,
@@ -361,8 +363,8 @@ impl<M> OverseerSubsystemContext<M> {
/// Intended for tests.
#[allow(unused)]
fn new_unmetered(
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
rx: metered::MeteredReceiver<FromOverseer<M>>,
tx: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>,
) -> Self {
let metrics = Metrics::default();
OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0)
@@ -559,10 +561,10 @@ pub struct Overseer<S> {
running_subsystems: FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
/// Gather running subsystems' outbound streams into one.
to_overseer_rx: Fuse<mpsc::UnboundedReceiver<MaybeTimed<ToOverseer>>>,
to_overseer_rx: Fuse<metered::UnboundedMeteredReceiver<MaybeTimed<ToOverseer>>>,
/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
events_rx: metered::MeteredReceiver<Event>,
/// External listeners waiting for a hash to be in the active-leave set.
activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
@@ -1045,6 +1047,8 @@ struct MetricsInner {
deactivated_heads_total: prometheus::Counter<prometheus::U64>,
messages_relayed_total: prometheus::Counter<prometheus::U64>,
message_relay_timing: prometheus::Histogram,
channel_fill_level_to_overseer: prometheus::Histogram,
channel_fill_level_from_overseer: prometheus::Histogram,
}
#[derive(Default, Clone)]
@@ -1073,6 +1077,11 @@ impl Metrics {
fn time_message_hold(&self) -> MaybeTimer {
self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer())
}
fn channel_fill_level_snapshot(&self, from_overseer: usize, to_overseer: usize) {
self.0.as_ref().map(|metrics| metrics.channel_fill_level_to_overseer.observe(to_overseer as f64));
self.0.as_ref().map(|metrics| metrics.channel_fill_level_from_overseer.observe(from_overseer as f64));
}
}
impl metrics::Metrics for Metrics {
@@ -1121,6 +1130,30 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
channel_fill_level_from_overseer: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts {
common_opts: prometheus::Opts::new(
"overseer_channel_fill_level_from_overseer",
"Number of elements sitting in the channel waiting to be processed.",
),
buckets: prometheus::exponential_buckets(0.00001_f64, 2_f64, (CHANNEL_CAPACITY as f64).log2().ceil() as usize).expect("inputs are within documented range; qed"),
}
)?,
registry,
)?,
channel_fill_level_to_overseer: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts {
common_opts: prometheus::Opts::new(
"overseer_channel_fill_level_to_overseer",
"Number of elements sitting in the channel waiting to be processed.",
),
buckets: prometheus::exponential_buckets(0.00001_f64, 2_f64, (CHANNEL_CAPACITY as f64).log2().ceil() as usize).expect("inputs are within documented range; qed"),
}
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
@@ -1242,7 +1275,7 @@ where
CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>> + Send,
CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
{
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY, "overseer_events");
let handler = OverseerHandler {
events_tx: events_tx.clone(),
@@ -1250,7 +1283,23 @@ where
let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?;
let (to_overseer_tx, to_overseer_rx) = mpsc::unbounded();
let (to_overseer_tx, to_overseer_rx) = metered::unbounded("to_overseer");
{
let meter_from_overseer = events_rx.meter().clone();
let meter_to_overseer = to_overseer_rx.meter().clone();
let metronome_metrics = metrics.clone();
let metronome = Metronome::new(std::time::Duration::from_millis(137))
.for_each(move |_| {
metronome_metrics.channel_fill_level_snapshot(meter_from_overseer.queue_count(), meter_to_overseer.queue_count());
async move {
()
}
});
s.spawn("metrics_metronome", Box::pin(metronome));
}
let mut running_subsystems = FuturesUnordered::new();
let mut seed = 0x533d; // arbitrary
@@ -1258,7 +1307,7 @@ where
let candidate_validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.candidate_validation,
&metrics,
&mut seed,
@@ -1267,7 +1316,7 @@ where
let candidate_backing_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.candidate_backing,
&metrics,
&mut seed,
@@ -1276,7 +1325,7 @@ where
let candidate_selection_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.candidate_selection,
&metrics,
&mut seed,
@@ -1285,7 +1334,7 @@ where
let statement_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.statement_distribution,
&metrics,
&mut seed,
@@ -1294,7 +1343,7 @@ where
let availability_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.availability_distribution,
&metrics,
&mut seed,
@@ -1303,7 +1352,7 @@ where
let bitfield_signing_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.bitfield_signing,
&metrics,
&mut seed,
@@ -1312,7 +1361,7 @@ where
let bitfield_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.bitfield_distribution,
&metrics,
&mut seed,
@@ -1321,7 +1370,7 @@ where
let provisioner_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.provisioner,
&metrics,
&mut seed,
@@ -1330,7 +1379,7 @@ where
let pov_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.pov_distribution,
&metrics,
&mut seed,
@@ -1339,7 +1388,7 @@ where
let runtime_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.runtime_api,
&metrics,
&mut seed,
@@ -1348,7 +1397,7 @@ where
let availability_store_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.availability_store,
&metrics,
&mut seed,
@@ -1357,7 +1406,7 @@ where
let network_bridge_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.network_bridge,
&metrics,
&mut seed,
@@ -1366,7 +1415,7 @@ where
let chain_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.chain_api,
&metrics,
&mut seed,
@@ -1375,7 +1424,7 @@ where
let collation_generation_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.collation_generation,
&metrics,
&mut seed,
@@ -1385,7 +1434,7 @@ where
let collator_protocol_subsystem = spawn(
&mut s,
&mut running_subsystems,
to_overseer_tx.clone(),
metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx),
all_subsystems.collator_protocol,
&metrics,
&mut seed,
@@ -1735,12 +1784,12 @@ where
fn spawn<S: SpawnNamed, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
to_overseer: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
to_overseer: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>,
s: impl Subsystem<OverseerSubsystemContext<M>>,
metrics: &Metrics,
seed: &mut u64,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (to_tx, to_rx) = metered::channel(CHANNEL_CAPACITY, "subsystem_spawn");
let ctx = OverseerSubsystemContext::new(
to_rx,
to_overseer,
@@ -1783,18 +1832,19 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
mod tests {
use std::sync::atomic;
use std::collections::HashMap;
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending};
use futures::{executor, pin_mut, select, FutureExt, pending};
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
use polkadot_subsystem::{messages::RuntimeApiRequest, JaegerSpan};
use polkadot_node_primitives::{Collation, CollationGenerationConfig};
use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};
use polkadot_node_subsystem_util::metered;
use sp_core::crypto::Pair as _;
use super::*;
struct TestSubsystem1(mpsc::Sender<usize>);
struct TestSubsystem1(metered::MeteredSender<usize>);
impl<C> Subsystem<C> for TestSubsystem1
where C: SubsystemContext<Message=CandidateValidationMessage>
@@ -1822,7 +1872,7 @@ mod tests {
}
}
struct TestSubsystem2(mpsc::Sender<usize>);
struct TestSubsystem2(metered::MeteredSender<usize>);
impl<C> Subsystem<C> for TestSubsystem2
where C: SubsystemContext<Message=CandidateBackingMessage>
@@ -1893,8 +1943,11 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let (s1_tx, mut s1_rx) = mpsc::channel::<usize>(64);
let (s2_tx, mut s2_rx) = mpsc::channel::<usize>(64);
let (s1_tx, s1_rx) = metered::channel::<usize>(64, "overseer_test");
let (s2_tx, s2_rx) = metered::channel::<usize>(64, "overseer_test");
let mut s1_rx = s1_rx.fuse();
let mut s2_rx = s2_rx.fuse();
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem1(s1_tx))
@@ -1999,13 +2052,15 @@ mod tests {
fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> {
let gather = registry.gather();
assert_eq!(gather[0].get_name(), "overseer_messages_relay_timing");
assert_eq!(gather[1].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total");
let activated = gather[1].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[2].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64;
assert_eq!(gather[0].get_name(), "overseer_channel_fill_level_from_overseer");
assert_eq!(gather[1].get_name(), "overseer_channel_fill_level_to_overseer");
assert_eq!(gather[2].get_name(), "overseer_messages_relay_timing");
assert_eq!(gather[3].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[4].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[5].get_name(), "parachain_messages_relayed_total");
let activated = gather[3].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[4].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[5].get_metric()[0].get_counter().get_value() as u64;
let mut result = HashMap::new();
result.insert("activated", activated);
result.insert("deactivated", deactivated);
@@ -2034,7 +2089,7 @@ mod tests {
})
}
struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
struct TestSubsystem5(metered::MeteredSender<OverseerSignal>);
impl<C> Subsystem<C> for TestSubsystem5
where C: SubsystemContext<Message=CandidateValidationMessage>
@@ -2065,7 +2120,7 @@ mod tests {
}
}
struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
struct TestSubsystem6(metered::MeteredSender<OverseerSignal>);
impl<C> Subsystem<C> for TestSubsystem6
where C: SubsystemContext<Message=CandidateBackingMessage>
@@ -2123,8 +2178,8 @@ mod tests {
number: 3,
};
let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64);
let (tx_5, mut rx_5) = metered::channel(64, "overseer_test");
let (tx_6, mut rx_6) = metered::channel(64, "overseer_test");
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem5(tx_5))
.replace_candidate_backing(TestSubsystem6(tx_6));
@@ -2216,8 +2271,8 @@ mod tests {
number: 3,
};
let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64);
let (tx_5, mut rx_5) = metered::channel(64, "overseer_test");
let (tx_6, mut rx_6) = metered::channel(64, "overseer_test");
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem5(tx_5))
@@ -2308,7 +2363,7 @@ mod tests {
number: 1,
};
let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_5, mut rx_5) = metered::channel(64, "overseer_test");
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_backing(TestSubsystem6(tx_5));
+1
View File
@@ -21,6 +21,7 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
polkadot-node-jaeger = { path = "../jaeger" }
polkadot-primitives = { path = "../../primitives" }
metered-channel = { path = "../metered-channel"}
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+99 -1
View File
@@ -50,6 +50,7 @@ use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;
pub mod validator_discovery;
pub use metered_channel as metered;
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
@@ -987,9 +988,64 @@ impl<F: Future> Future for Timeout<F> {
}
}
#[derive(Copy, Clone)]
enum MetronomeState {
Snooze,
SetAlarm,
}
/// Create a stream of ticks with a defined cycle duration.
pub struct Metronome {
delay: Delay,
period: Duration,
state: MetronomeState,
}
impl Metronome
{
/// Create a new metronome source with a defined cycle duration.
pub fn new(cycle: Duration) -> Self {
let period = cycle.into();
Self {
period,
delay: Delay::new(period),
state: MetronomeState::Snooze,
}
}
}
impl futures::Stream for Metronome
{
type Item = ();
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
loop {
match self.state {
MetronomeState::SetAlarm => {
let val = self.period.clone();
self.delay.reset(val);
self.state = MetronomeState::Snooze;
}
MetronomeState::Snooze => {
if !Pin::new(&mut self.delay).poll(cx).is_ready() {
break
}
self.state = MetronomeState::SetAlarm;
return Poll::Ready(Some(()));
}
}
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use executor::block_on;
use thiserror::Error;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
@@ -999,7 +1055,7 @@ mod tests {
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
use std::{pin::Pin, time::Duration, sync::Arc};
use std::{pin::Pin, sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
// you can leave the subsystem itself to the job manager.
@@ -1183,4 +1239,46 @@ mod tests {
FakeCandidateSelectionSubsystem::new(pool, false, ()).start(context);
assert_eq!(name, "FakeCandidateSelection");
}
#[test]
fn tick_tack_metronome() {
let n = Arc::new(AtomicUsize::default());
let (tick, mut block) = mpsc::unbounded();
let metronome = {
let n = n.clone();
let stream = Metronome::new(Duration::from_millis(137_u64));
stream.for_each(move |_res| {
let _ = n.fetch_add(1, Ordering::Relaxed);
let mut tick = tick.clone();
async move {
tick.send(()).await.expect("Test helper channel works. qed");
}
}).fuse()
};
let f2 = async move {
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 1_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 2_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 3_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 4_usize);
}.fuse();
futures::pin_mut!(f2);
futures::pin_mut!(metronome);
block_on(async move {
// futures::join!(metronome, f2)
futures::select!(
_ = metronome => unreachable!("Metronome never stops. qed"),
_ = f2 => (),
)
});
}
}