diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index 0543ece928..4d3abffe72 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -158,13 +158,23 @@ impl MeteredSender { where Self: Unpin, { - let msg = self.prepare_with_tof(msg); - let fut = self.inner.send(msg); - futures::pin_mut!(fut); - fut.await.map_err(|e| { - self.meter.retract_sent(); - e - }) + match self.try_send(msg) { + Err(send_err) => { + if !send_err.is_full() { + return Err(send_err.into_send_error()) + } + + let msg = send_err.into_inner(); + self.meter.note_sent(); + let fut = self.inner.send(msg); + futures::pin_mut!(fut); + fut.await.map_err(|e| { + self.meter.retract_sent(); + e + }) + }, + _ => Ok(()), + } } /// Attempt to send message or fail immediately. @@ -174,6 +184,10 @@ impl MeteredSender { ) -> result::Result<(), mpsc::TrySendError>> { let msg = self.prepare_with_tof(msg); self.inner.try_send(msg).map_err(|e| { + if e.is_full() { + // Count bounded channel sends that block. + self.meter.note_blocked(); + } self.meter.retract_sent(); e }) diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs index ee276583cd..726d716327 100644 --- a/polkadot/node/metered-channel/src/lib.rs +++ b/polkadot/node/metered-channel/src/lib.rs @@ -42,6 +42,8 @@ pub struct Meter { sent: Arc, // Number of receives on this channel. received: Arc, + // Number of times senders blocked while sending messages to a subsystem. + blocked: Arc, // Atomic ringbuffer of the last 50 time of flight values tof: Arc>, } @@ -51,6 +53,7 @@ impl std::default::Default for Meter { Self { sent: Arc::new(AtomicUsize::new(0)), received: Arc::new(AtomicUsize::new(0)), + blocked: Arc::new(AtomicUsize::new(0)), tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)), } } @@ -65,6 +68,8 @@ pub struct Readout { pub sent: usize, /// The amount of messages received on the channel, in aggregate. pub received: usize, + /// How many times the caller blocked when sending messages. + pub blocked: usize, /// Time of flight in micro seconds (us) pub tof: Vec, } @@ -77,6 +82,7 @@ impl Meter { Readout { sent: self.sent.load(Ordering::Relaxed), received: self.received.load(Ordering::Relaxed), + blocked: self.blocked.load(Ordering::Relaxed), tof: { let mut acc = Vec::with_capacity(self.tof.len()); while let Some(value) = self.tof.pop() { @@ -99,6 +105,10 @@ impl Meter { self.received.fetch_add(1, Ordering::Relaxed); } + fn note_blocked(&self) { + self.blocked.fetch_add(1, Ordering::Relaxed); + } + fn note_time_of_flight(&self, tof: CoarseDuration) { let _ = self.tof.force_push(tof); } diff --git a/polkadot/node/metered-channel/src/tests.rs b/polkadot/node/metered-channel/src/tests.rs index 4eecea453a..6ea947eaa5 100644 --- a/polkadot/node/metered-channel/src/tests.rs +++ b/polkadot/node/metered-channel/src/tests.rs @@ -39,12 +39,12 @@ fn try_send_try_next() { assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. }); rx.try_next().unwrap(); rx.try_next().unwrap(); - assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, tof } => { + assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => { // every second in test, consumed before assert_eq!(dbg!(tof).len(), 1); }); rx.try_next().unwrap(); - assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, tof } => { + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => { // every second in test, consumed before assert_eq!(dbg!(tof).len(), 0); }); @@ -127,3 +127,24 @@ fn failed_send_does_not_inc_sent() { assert_matches!(unbounded.meter().read(), Readout { sent: 0, received: 0, .. }); }); } + +#[test] +fn blocked_send_is_metered() { + let (mut bounded_sender, mut bounded_receiver) = channel::(1); + + block_on(async move { + assert!(bounded_sender.send(Msg::default()).await.is_ok()); + assert!(bounded_sender.send(Msg::default()).await.is_ok()); + assert!(bounded_sender.try_send(Msg::default()).is_err()); + + assert_matches!( + bounded_sender.meter().read(), + Readout { sent: 2, received: 0, blocked: 1, .. } + ); + bounded_receiver.try_next().unwrap(); + assert_matches!( + bounded_receiver.meter().read(), + Readout { sent: 2, received: 1, blocked: 1, .. } + ); + }); +} diff --git a/polkadot/node/overseer/src/metrics.rs b/polkadot/node/overseer/src/metrics.rs index 58a826f52f..71295dd223 100644 --- a/polkadot/node/overseer/src/metrics.rs +++ b/polkadot/node/overseer/src/metrics.rs @@ -31,6 +31,7 @@ struct MetricsInner { to_subsystem_bounded_tof: prometheus::HistogramVec, to_subsystem_bounded_sent: prometheus::GaugeVec, to_subsystem_bounded_received: prometheus::GaugeVec, + to_subsystem_bounded_blocked: prometheus::GaugeVec, to_subsystem_unbounded_tof: prometheus::HistogramVec, to_subsystem_unbounded_sent: prometheus::GaugeVec, @@ -91,6 +92,11 @@ impl Metrics { .with_label_values(&[name]) .set(readouts.bounded.received as u64); + metrics + .to_subsystem_bounded_blocked + .with_label_values(&[name]) + .set(readouts.bounded.blocked as u64); + metrics .to_subsystem_unbounded_sent .with_label_values(&[name]) @@ -180,6 +186,16 @@ impl MetricsTrait for Metrics { )?, registry, )?, + to_subsystem_bounded_blocked: prometheus::register( + prometheus::GaugeVec::::new( + prometheus::Opts::new( + "polkadot_parachain_subsystem_bounded_blocked", + "Number of times senders blocked while sending messages to a subsystem", + ), + &["subsystem_name"], + )?, + registry, + )?, to_subsystem_unbounded_tof: prometheus::register( prometheus::HistogramVec::new( prometheus::HistogramOpts::new(