mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 00:28:01 +00:00
Report tracing_unbounded channel size to prometheus (#1489)
This commit is contained in:
@@ -24,7 +24,10 @@ use prometheus::{
|
||||
Error as PrometheusError, Registry,
|
||||
};
|
||||
|
||||
use prometheus::{core::GenericCounterVec, Opts};
|
||||
use prometheus::{
|
||||
core::{GenericCounterVec, GenericGaugeVec},
|
||||
Opts,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref TOKIO_THREADS_TOTAL: GenericCounter<AtomicU64> =
|
||||
@@ -36,18 +39,32 @@ lazy_static! {
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec<AtomicU64> = GenericCounterVec::new(
|
||||
Opts::new("substrate_unbounded_channel_len", "Items in each mpsc::unbounded instance"),
|
||||
&["entity", "action"] // 'name of channel, send|received|dropped
|
||||
pub static ref UNBOUNDED_CHANNELS_COUNTER: GenericCounterVec<AtomicU64> = GenericCounterVec::new(
|
||||
Opts::new(
|
||||
"substrate_unbounded_channel_len",
|
||||
"Items sent/received/dropped on each mpsc::unbounded instance"
|
||||
),
|
||||
&["entity", "action"], // name of channel, send|received|dropped
|
||||
).expect("Creating of statics doesn't fail. qed");
|
||||
pub static ref UNBOUNDED_CHANNELS_SIZE: GenericGaugeVec<AtomicU64> = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"substrate_unbounded_channel_size",
|
||||
"Size (number of messages to be processed) of each mpsc::unbounded instance",
|
||||
),
|
||||
&["entity"], // name of channel
|
||||
).expect("Creating of statics doesn't fail. qed");
|
||||
|
||||
}
|
||||
|
||||
pub static SENT_LABEL: &'static str = "send";
|
||||
pub static RECEIVED_LABEL: &'static str = "received";
|
||||
pub static DROPPED_LABEL: &'static str = "dropped";
|
||||
|
||||
/// Register the statics to report to registry
|
||||
pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
|
||||
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
|
||||
registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;
|
||||
registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;
|
||||
registry.register(Box::new(UNBOUNDED_CHANNELS_SIZE.clone()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -20,7 +20,9 @@
|
||||
|
||||
pub use async_channel::{TryRecvError, TrySendError};
|
||||
|
||||
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
|
||||
use crate::metrics::{
|
||||
DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
|
||||
};
|
||||
use async_channel::{Receiver, Sender};
|
||||
use futures::{
|
||||
stream::{FusedStream, Stream},
|
||||
@@ -102,7 +104,10 @@ impl<T> TracingUnboundedSender<T> {
|
||||
/// Proxy function to `async_channel::Sender::try_send`.
|
||||
pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
|
||||
self.inner.try_send(msg).map(|s| {
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc();
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
|
||||
UNBOUNDED_CHANNELS_SIZE
|
||||
.with_label_values(&[self.name])
|
||||
.set(self.inner.len().saturated_into());
|
||||
|
||||
if self.inner.len() >= self.queue_size_warning &&
|
||||
self.warning_fired
|
||||
@@ -140,7 +145,10 @@ impl<T> TracingUnboundedReceiver<T> {
|
||||
/// that discounts the messages taken out.
|
||||
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
|
||||
self.inner.try_recv().map(|s| {
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc();
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
|
||||
UNBOUNDED_CHANNELS_SIZE
|
||||
.with_label_values(&[self.name])
|
||||
.set(self.inner.len().saturated_into());
|
||||
s
|
||||
})
|
||||
}
|
||||
@@ -155,14 +163,16 @@ impl<T> Drop for TracingUnboundedReceiver<T> {
|
||||
fn drop(&mut self) {
|
||||
// Close the channel to prevent any further messages to be sent into the channel
|
||||
self.close();
|
||||
// the number of messages about to be dropped
|
||||
// The number of messages about to be dropped
|
||||
let count = self.inner.len();
|
||||
// discount the messages
|
||||
// Discount the messages
|
||||
if count > 0 {
|
||||
UNBOUNDED_CHANNELS_COUNTER
|
||||
.with_label_values(&[self.name, "dropped"])
|
||||
.with_label_values(&[self.name, DROPPED_LABEL])
|
||||
.inc_by(count.saturated_into());
|
||||
}
|
||||
// Reset the size metric to 0
|
||||
UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
|
||||
// Drain all the pending messages in the channel since they can never be accessed,
|
||||
// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
|
||||
// resolved
|
||||
@@ -180,7 +190,10 @@ impl<T> Stream for TracingUnboundedReceiver<T> {
|
||||
match Pin::new(&mut s.inner).poll_next(cx) {
|
||||
Poll::Ready(msg) => {
|
||||
if msg.is_some() {
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc();
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
|
||||
UNBOUNDED_CHANNELS_SIZE
|
||||
.with_label_values(&[s.name])
|
||||
.set(s.inner.len().saturated_into());
|
||||
}
|
||||
Poll::Ready(msg)
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user