mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-12 15:51:00 +00:00
fmt, clean warnings, tidy aggregator opts and add queue length limit
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
use futures::channel::mpsc::{ SendError, TrySendError, UnboundedSender, UnboundedReceiver, unbounded };
|
||||
use futures::{ Sink, Stream, SinkExt, StreamExt };
|
||||
use std::sync::atomic::{ AtomicUsize, Ordering };
|
||||
use futures::channel::mpsc::{
|
||||
unbounded, SendError, TrySendError, UnboundedReceiver, UnboundedSender,
|
||||
};
|
||||
use futures::{Sink, SinkExt, Stream, StreamExt};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
@@ -12,11 +14,11 @@ pub fn metered_unbounded<T>() -> (MeteredUnboundedSender<T>, MeteredUnboundedRec
|
||||
|
||||
let tx = MeteredUnboundedSender {
|
||||
inner: tx,
|
||||
len: len
|
||||
len: len,
|
||||
};
|
||||
let rx = MeteredUnboundedReceiver {
|
||||
inner: rx,
|
||||
len: len2
|
||||
len: len2,
|
||||
};
|
||||
|
||||
(tx, rx)
|
||||
@@ -30,7 +32,7 @@ pub struct MeteredUnboundedSender<T> {
|
||||
len: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl <T> MeteredUnboundedSender<T> {
|
||||
impl<T> MeteredUnboundedSender<T> {
|
||||
/// The current number of messages in the queue.
|
||||
pub fn len(&self) -> usize {
|
||||
self.len.load(Ordering::Relaxed)
|
||||
@@ -43,10 +45,13 @@ impl <T> MeteredUnboundedSender<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl <T> Sink<T> for MeteredUnboundedSender<T> {
|
||||
impl<T> Sink<T> for MeteredUnboundedSender<T> {
|
||||
type Error = SendError;
|
||||
|
||||
fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
@@ -54,19 +59,28 @@ impl <T> Sink<T> for MeteredUnboundedSender<T> {
|
||||
self.unbounded_send(item).map_err(|e| e.into_send_error())
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_flush(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_flush_unpin(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_close(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_close_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl <T> Stream for MeteredUnboundedReceiver<T> {
|
||||
impl<T> Stream for MeteredUnboundedReceiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let res = self.inner.poll_next_unpin(cx);
|
||||
if matches!(res, Poll::Ready(Some(..))) {
|
||||
self.len.fetch_sub(1, Ordering::Relaxed);
|
||||
@@ -83,7 +97,7 @@ pub struct MeteredUnboundedReceiver<T> {
|
||||
len: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl <T> MeteredUnboundedReceiver<T> {
|
||||
impl<T> MeteredUnboundedReceiver<T> {
|
||||
/// The current number of messages in the queue.
|
||||
pub fn len(&self) -> usize {
|
||||
self.len.load(Ordering::Relaxed)
|
||||
@@ -137,7 +151,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn channel_len_consistent_when_send_parallelised() {
|
||||
let (mut tx, mut rx) = metered_unbounded::<usize>();
|
||||
let (tx, _rx) = metered_unbounded::<usize>();
|
||||
|
||||
// Send lots of messages on a bunch of real threads:
|
||||
let mut join_handles = vec![];
|
||||
@@ -156,12 +170,11 @@ mod test {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
assert_eq!(tx.len(), 50 * 10_000);
|
||||
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn channel_len_consistent_when_send_and_recv_parallelised() {
|
||||
let (mut tx, mut rx) = metered_unbounded::<usize>();
|
||||
let (tx, mut rx) = metered_unbounded::<usize>();
|
||||
|
||||
// Send lots of messages on a bunch of real threads:
|
||||
let mut join_handles = vec![];
|
||||
@@ -185,7 +198,5 @@ mod test {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
assert_eq!(tx.len(), 0);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
mod metered_unbounded;
|
||||
|
||||
pub use metered_unbounded::*;
|
||||
pub use metered_unbounded::*;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
pub mod byte_size;
|
||||
pub mod channel;
|
||||
pub mod http_utils;
|
||||
pub mod id_type;
|
||||
pub mod internal_messages;
|
||||
@@ -24,7 +25,6 @@ pub mod ready_chunks_all;
|
||||
pub mod rolling_total;
|
||||
pub mod time;
|
||||
pub mod ws_client;
|
||||
pub mod channel;
|
||||
|
||||
mod assign_id;
|
||||
mod dense_map;
|
||||
|
||||
@@ -34,6 +34,16 @@ id_type! {
|
||||
#[derive(Clone)]
|
||||
pub struct Aggregator(Arc<AggregatorInternal>);
|
||||
|
||||
/// Options to configure the aggregator loop(s)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AggregatorOpts {
|
||||
/// Any node from these chains is muted
|
||||
pub denylist: Vec<String>,
|
||||
/// If our incoming message queue exceeds this length, we start
|
||||
/// dropping non-essential messages.
|
||||
pub max_queue_len: usize,
|
||||
}
|
||||
|
||||
struct AggregatorInternal {
|
||||
/// Shards that connect are each assigned a unique connection ID.
|
||||
/// This helps us know who to send messages back to (especially in
|
||||
@@ -49,7 +59,7 @@ struct AggregatorInternal {
|
||||
|
||||
impl Aggregator {
|
||||
/// Spawn a new Aggregator. This connects to the telemetry backend
|
||||
pub async fn spawn(denylist: Vec<String>) -> anyhow::Result<Aggregator> {
|
||||
pub async fn spawn(opts: AggregatorOpts) -> anyhow::Result<Aggregator> {
|
||||
let (tx_to_aggregator, rx_from_external) = mpsc::unbounded();
|
||||
|
||||
// Kick off a locator task to locate nodes, which hands back a channel to make location requests
|
||||
@@ -63,7 +73,8 @@ impl Aggregator {
|
||||
tokio::spawn(Aggregator::handle_messages(
|
||||
rx_from_external,
|
||||
tx_to_locator,
|
||||
denylist,
|
||||
opts.max_queue_len,
|
||||
opts.denylist,
|
||||
));
|
||||
|
||||
// Return a handle to our aggregator:
|
||||
@@ -80,9 +91,10 @@ impl Aggregator {
|
||||
async fn handle_messages(
|
||||
rx_from_external: mpsc::UnboundedReceiver<inner_loop::ToAggregator>,
|
||||
tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
|
||||
max_queue_len: usize,
|
||||
denylist: Vec<String>,
|
||||
) {
|
||||
inner_loop::InnerLoop::new(tx_to_aggregator, denylist)
|
||||
inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len)
|
||||
.handle(rx_from_external)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::aggregator::Aggregator;
|
||||
use super::aggregator::{Aggregator, AggregatorOpts};
|
||||
use super::inner_loop;
|
||||
use common::EitherSink;
|
||||
use futures::{Sink, SinkExt, StreamExt};
|
||||
@@ -18,12 +18,12 @@ impl AggregatorSet {
|
||||
/// Spawn the number of aggregators we're asked to.
|
||||
pub async fn spawn(
|
||||
num_aggregators: usize,
|
||||
denylist: Vec<String>,
|
||||
opts: AggregatorOpts,
|
||||
) -> anyhow::Result<AggregatorSet> {
|
||||
assert_ne!(num_aggregators, 0, "You must have 1 or more aggregator");
|
||||
|
||||
let aggregators = futures::future::try_join_all(
|
||||
(0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())),
|
||||
(0..num_aggregators).map(|_| Aggregator::spawn(opts.clone())),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -20,10 +20,10 @@ use crate::find_location;
|
||||
use crate::state::{self, NodeId, State};
|
||||
use bimap::BiMap;
|
||||
use common::{
|
||||
channel::metered_unbounded,
|
||||
internal_messages::{self, MuteReason, ShardNodeId},
|
||||
node_message,
|
||||
node_types::BlockHash,
|
||||
channel::metered_unbounded,
|
||||
time,
|
||||
};
|
||||
use futures::channel::mpsc;
|
||||
@@ -151,6 +151,10 @@ pub struct InnerLoop {
|
||||
|
||||
/// Send messages here to make geographical location requests.
|
||||
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
|
||||
|
||||
/// How big can the queue of messages coming in to the aggregator get before messages
|
||||
/// are prioritised and dropped to try and get back on track.
|
||||
max_queue_len: usize,
|
||||
}
|
||||
|
||||
impl InnerLoop {
|
||||
@@ -158,6 +162,7 @@ impl InnerLoop {
|
||||
pub fn new(
|
||||
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
|
||||
denylist: Vec<String>,
|
||||
max_queue_len: usize,
|
||||
) -> Self {
|
||||
InnerLoop {
|
||||
node_state: State::new(denylist),
|
||||
@@ -168,6 +173,7 @@ impl InnerLoop {
|
||||
chain_to_feed_conn_ids: HashMap::new(),
|
||||
feed_conn_id_finality: HashSet::new(),
|
||||
tx_to_locator,
|
||||
max_queue_len,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,7 +181,7 @@ impl InnerLoop {
|
||||
/// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop
|
||||
/// will be able to make progress quickly without any potential yield points.
|
||||
pub async fn handle(mut self, mut rx_from_external: mpsc::UnboundedReceiver<ToAggregator>) {
|
||||
|
||||
let max_queue_len = self.max_queue_len;
|
||||
let (metered_tx, mut metered_rx) = metered_unbounded();
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -197,17 +203,30 @@ impl InnerLoop {
|
||||
// TEMP: let's monitor message queue len out of interest
|
||||
let tx_len = metered_tx.clone();
|
||||
std::thread::spawn(move || {
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(async move {
|
||||
let mut n = 0;
|
||||
loop {
|
||||
println!("#{} Queue len: {}", n, tx_len.len());
|
||||
n += 1;
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
});
|
||||
tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(async move {
|
||||
let mut n = 0;
|
||||
loop {
|
||||
println!("#{} Queue len: {}", n, tx_len.len());
|
||||
n += 1;
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
while let Some(msg) = rx_from_external.next().await {
|
||||
// ignore node updates if we have too many messages to handle, in an attempt
|
||||
// to reduce the queue length back to something reasonable.
|
||||
if metered_tx.len() > max_queue_len {
|
||||
if matches!(
|
||||
msg,
|
||||
ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. })
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = metered_tx.unbounded_send(msg) {
|
||||
log::error!("Cannot send message into aggregator: {}", e);
|
||||
break;
|
||||
|
||||
@@ -19,6 +19,7 @@ mod aggregator_set;
|
||||
mod inner_loop;
|
||||
|
||||
// Expose the various message types that can be worked with externally:
|
||||
pub use aggregator::AggregatorOpts;
|
||||
pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket};
|
||||
|
||||
pub use aggregator_set::*;
|
||||
|
||||
@@ -22,7 +22,8 @@ use std::str::FromStr;
|
||||
use tokio::time::{Duration, Instant};
|
||||
|
||||
use aggregator::{
|
||||
AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket,
|
||||
AggregatorOpts, AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket,
|
||||
ToShardWebsocket,
|
||||
};
|
||||
use bincode::Options;
|
||||
use common::http_utils;
|
||||
@@ -67,6 +68,10 @@ struct Opts {
|
||||
/// aggregators.
|
||||
#[structopt(long)]
|
||||
num_aggregators: Option<usize>,
|
||||
/// How big can the message queue for each aggregator grow before we start dropping non-essential
|
||||
/// messages in an attempt to let it reduce?
|
||||
#[structopt(long)]
|
||||
aggregator_queue_len: Option<usize>,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -110,7 +115,15 @@ fn main() {
|
||||
|
||||
/// Declare our routes and start the server.
|
||||
async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> {
|
||||
let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?;
|
||||
let aggregator_queue_len = opts.aggregator_queue_len.unwrap_or(10_000);
|
||||
let aggregator = AggregatorSet::spawn(
|
||||
num_aggregators,
|
||||
AggregatorOpts {
|
||||
max_queue_len: aggregator_queue_len,
|
||||
denylist: opts.denylist,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let socket_addr = opts.socket;
|
||||
let feed_timeout = opts.feed_timeout;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user