// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! Features to meter unbounded channels #[cfg(not(features = "metered"))] mod inner { // just aliased, non performance implications use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; pub type TracingUnboundedSender = UnboundedSender; pub type TracingUnboundedReceiver = UnboundedReceiver; /// Alias `mpsc::unbounded` pub fn tracing_unbounded(_key: &'static str) ->(TracingUnboundedSender, TracingUnboundedReceiver) { mpsc::unbounded() } } #[cfg(features = "metered")] mod inner { //tracing implementation use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender, TryRecvError, TrySendError, SendError }; use futures::{sink::Sink, task::{Poll, Context}, stream::Stream}; use std::pin::Pin; use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; /// Wrapper Type around `UnboundedSender` that increases the global /// measure when a message is added #[derive(Debug, Clone)] pub struct TracingUnboundedSender(&'static str, UnboundedSender); /// Wrapper Type around `UnboundedReceiver` that decreases the global /// measure when a message is polled #[derive(Debug)] pub struct TracingUnboundedReceiver(&'static str, UnboundedReceiver); /// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via /// `UNBOUNDED_CHANNELS_COUNTER` pub fn tracing_unbounded(key: &'static str) ->(TracingUnboundedSender, TracingUnboundedReceiver) { let (s, r) = mpsc::unbounded(); (TracingUnboundedSender(key.clone(), s), TracingUnboundedReceiver(key,r)) } impl TracingUnboundedSender { /// Proxy function to mpsc::UnboundedSender pub fn poll_ready(&self, ctx: &mut Context) -> Poll> { self.1.poll_ready(ctx) } /// Proxy function to mpsc::UnboundedSender pub fn is_closed(&self) -> bool { self.1.is_closed() } /// Proxy function to mpsc::UnboundedSender pub fn close_channel(&self) { self.1.close_channel() } /// Proxy function to mpsc::UnboundedSender pub fn disconnect(&mut self) { self.1.disconnect() } /// Proxy function to mpsc::UnboundedSender pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { self.1.start_send(msg) } /// Proxy function to mpsc::UnboundedSender pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.1.unbounded_send(msg).map(|s|{ UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).incr(); s }) } /// Proxy function to mpsc::UnboundedSender pub fn same_receiver(&self, other: &UnboundedSender) -> bool { self.1.same_receiver(other) } } impl TracingUnboundedReceiver { fn consume(&mut self) { // consume all items, make sure to reflect the updated count let mut count = 0; while let Ok(Some(..)) = self.try_next() { count += 1; } // and discount the messages if count > 0 { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).incr_by(count); } } /// Proxy function to mpsc::UnboundedReceiver /// that consumes all messages first and updates the counter pub fn close(&mut self) { self.consume(); self.1.close() } /// Proxy function to mpsc::UnboundedReceiver /// that discounts the messages taken out pub fn try_next(&mut self) -> Result, TryRecvError> { self.1.try_next().map(|s| { if s.is_some() { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).incr(); } s }) } } impl Drop for TracingUnboundedReceiver { fn drop(&mut self) { self.consume(); } } impl Unpin for TracingUnboundedReceiver {} impl Stream for TracingUnboundedReceiver { type Item = T; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { let s = self.get_mut(); match Pin::new(&mut s.1).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).incr(); } Poll::Ready(msg) } Poll::Pending => { Poll::Pending } } } } impl Sink for TracingUnboundedSender { type Error = SendError; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { TracingUnboundedSender::poll_ready(&*self, cx) } fn start_send( mut self: Pin<&mut Self>, msg: T, ) -> Result<(), Self::Error> { TracingUnboundedSender::start_send(&mut *self, msg) } fn poll_flush( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Ready(Ok(())) } fn poll_close( mut self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } } impl Sink for &TracingUnboundedSender { type Error = SendError; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { TracingUnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { self.unbounded_send(msg) .map_err(TrySendError::into_send_error) } fn poll_flush( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Ready(Ok(())) } fn poll_close( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { self.close_channel(); Poll::Ready(Ok(())) } } } pub use inner::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};