Merge pull request #367 from paritytech/jsdw-sharding-gatekeeper

Flume, Metrics and Message control when overloaded
This commit is contained in:
James Wilson
2021-08-13 14:35:43 +01:00
committed by GitHub
20 changed files with 528 additions and 447 deletions
+57
View File
@@ -202,6 +202,7 @@ dependencies = [
"bimap",
"bincode",
"bytes",
"flume",
"fnv",
"futures",
"hex",
@@ -392,6 +393,19 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "flume"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e90cc80fad5bb391b38127896b0fa27d97e7fef74742797f4da518d67e1292f"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spinning_top",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -540,8 +554,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@@ -827,6 +843,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "nanorand"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958"
dependencies = [
"getrandom",
]
[[package]]
name = "native-tls"
version = "0.2.7"
@@ -977,6 +1002,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@@ -1458,6 +1503,15 @@ dependencies = [
"sha-1",
]
[[package]]
name = "spinning_top"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75adad84ee84b521fb2cca2d4fd0f1dab1d8d026bda3c5bea4ca63b5f9f9293c"
dependencies = [
"lock_api",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -1521,6 +1575,7 @@ dependencies = [
"bytes",
"common",
"criterion",
"flume",
"futures",
"hex",
"http",
@@ -1553,6 +1608,7 @@ dependencies = [
"anyhow",
"bincode",
"common",
"flume",
"futures",
"hex",
"http",
@@ -1590,6 +1646,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"common",
"flume",
"futures",
"http",
"log",
+1
View File
@@ -10,6 +10,7 @@ anyhow = "1.0.42"
base64 = { default-features = false, features = ["alloc"], version = "0.13" }
bimap = "0.6.1"
bytes = "1.0.1"
flume = "0.10.8"
fnv = "1.0.7"
futures = "0.3.15"
hex = "0.4.3"
+32
View File
@@ -131,3 +131,35 @@ where
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn len_doesnt_panic_if_lots_of_ids_are_retired() {
let mut map = DenseMap::<usize, usize>::new();
let id1 = map.add(1);
let id2 = map.add(2);
let id3 = map.add(3);
assert_eq!(map.len(), 3);
map.remove(id1);
map.remove(id2);
assert_eq!(map.len(), 1);
map.remove(id3);
assert_eq!(map.len(), 0);
map.remove(id1);
map.remove(id1);
map.remove(id1);
assert_eq!(map.len(), 0);
}
}
+7 -9
View File
@@ -14,8 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::on_close::OnClose;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use soketto::handshake::{Client, ServerResponse};
use std::sync::Arc;
use tokio::net::TcpStream;
@@ -73,7 +71,7 @@ impl Connection {
let mut rx_closed2 = tx_closed1.subscribe();
// Receive messages from the socket:
let (mut tx_to_external, rx_from_ws) = mpsc::unbounded();
let (tx_to_external, rx_from_ws) = flume::unbounded();
tokio::spawn(async move {
let mut send_to_external = true;
loop {
@@ -112,7 +110,7 @@ impl Connection {
.map_err(|e| e.into()),
};
if let Err(e) = tx_to_external.send(msg).await {
if let Err(e) = tx_to_external.send_async(msg).await {
// Our external channel may have closed or errored, but the socket hasn't
// been closed, so keep receiving in order to allow the socket to continue to
// function properly (we may be happy just sending messages to it), but stop
@@ -124,12 +122,12 @@ impl Connection {
});
// Send messages to the socket:
let (tx_to_ws, mut rx_from_external) = mpsc::unbounded();
let (tx_to_ws, rx_from_external) = flume::unbounded::<SentMessage>();
tokio::spawn(async move {
loop {
// Wait for messages, or bail entirely if asked to close.
let msg = tokio::select! {
msg = rx_from_external.next() => { msg },
msg = rx_from_external.recv_async() => { msg },
_ = rx_closed2.recv() => {
// attempt to gracefully end the connection.
let _ = ws_to_connection.close().await;
@@ -141,8 +139,8 @@ impl Connection {
// needs to keep receiving data for the WS connection to stay open, there's no
// reason to keep this side of the loop open if our channel is closed.
let msg = match msg {
None => break,
Some(msg) => msg,
Ok(msg) => msg,
_ => break,
};
// We don't explicitly shut down the channel if we hit send errors. Why? Because the
@@ -207,7 +205,7 @@ impl Connection {
closer: Arc::clone(&on_close),
},
Receiver {
inner: rx_from_ws,
inner: rx_from_ws.into_stream(),
closer: on_close,
},
)
+1 -2
View File
@@ -15,13 +15,12 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::on_close::OnClose;
use futures::channel::mpsc;
use futures::{Stream, StreamExt};
use std::sync::Arc;
/// Receive messages out of a connection
pub struct Receiver {
pub(super) inner: mpsc::UnboundedReceiver<Result<RecvMessage, RecvError>>,
pub(super) inner: flume::r#async::RecvStream<'static, Result<RecvMessage, RecvError>>,
pub(super) closer: Arc<OnClose>,
}
+13 -39
View File
@@ -15,8 +15,6 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::on_close::OnClose;
use futures::channel::mpsc;
use futures::{Sink, SinkExt};
use std::sync::Arc;
/// A message that can be sent into the channel interface
@@ -41,62 +39,38 @@ pub enum SentMessage {
/// Send messages into the connection
#[derive(Clone)]
pub struct Sender {
pub(super) inner: mpsc::UnboundedSender<SentMessage>,
pub(super) inner: flume::Sender<SentMessage>,
pub(super) closer: Arc<OnClose>,
}
impl Sender {
/// Ask the underlying Websocket connection to close.
pub async fn close(&mut self) -> Result<(), SendError> {
pub async fn close(&mut self) -> Result<(), SendError<SentMessage>> {
self.closer.0.send(()).map_err(|_| SendError::CloseError)?;
Ok(())
}
/// Returns whether this channel is closed.
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
self.inner.is_disconnected()
}
/// Unbounded send will always queue the message and doesn't
/// need to be awaited.
pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), SendError> {
self.inner
.unbounded_send(msg)
.map_err(|e| e.into_send_error())?;
pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), flume::SendError<SentMessage>> {
self.inner.send(msg)?;
Ok(())
}
/// Convert this sender into a Sink
pub fn into_sink(
self,
) -> impl futures::Sink<SentMessage> + std::marker::Unpin + Clone + 'static {
self.inner.into_sink()
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum SendError {
pub enum SendError<T: std::fmt::Debug + 'static> {
#[error("Failed to send message: {0}")]
ChannelError(#[from] mpsc::SendError),
ChannelError(#[from] flume::SendError<T>),
#[error("Failed to send close message")]
CloseError,
}
impl Sink<SentMessage> for Sender {
type Error = SendError;
fn poll_ready(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready_unpin(cx).map_err(|e| e.into())
}
fn start_send(
mut self: std::pin::Pin<&mut Self>,
item: SentMessage,
) -> Result<(), Self::Error> {
self.inner.start_send_unpin(item).map_err(|e| e.into())
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_flush_unpin(cx).map_err(|e| e.into())
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_close_unpin(cx).map_err(|e| e.into())
}
}
+1
View File
@@ -11,6 +11,7 @@ bimap = "0.6.1"
bincode = "1.3.3"
bytes = "1.0.1"
common = { path = "../common" }
flume = "0.10.8"
futures = "0.3.15"
hex = "0.4.3"
http = "0.2.4"
@@ -18,7 +18,6 @@ use super::inner_loop;
use crate::find_location::find_location;
use crate::state::NodeId;
use common::id_type;
use futures::channel::mpsc;
use futures::{future, Sink, SinkExt};
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicU64;
@@ -34,6 +33,16 @@ id_type! {
#[derive(Clone)]
pub struct Aggregator(Arc<AggregatorInternal>);
/// Options to configure the aggregator loop(s)
#[derive(Debug, Clone)]
pub struct AggregatorOpts {
/// Any node from these chains is muted
pub denylist: Vec<String>,
/// If our incoming message queue exceeds this length, we start
/// dropping non-essential messages.
pub max_queue_len: usize,
}
struct AggregatorInternal {
/// Shards that connect are each assigned a unique connection ID.
/// This helps us know who to send messages back to (especially in
@@ -44,26 +53,28 @@ struct AggregatorInternal {
/// Send messages in to the aggregator from the outside via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::UnboundedSender<inner_loop::ToAggregator>,
tx_to_aggregator: flume::Sender<inner_loop::ToAggregator>,
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(denylist: Vec<String>) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::unbounded();
pub async fn spawn(opts: AggregatorOpts) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = flume::unbounded();
// Kick off a locator task to locate nodes, which hands back a channel to make location requests
let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| {
future::ok::<_, mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation(
node_id, msg,
))
}));
let tx_to_locator =
find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| {
future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation(
node_id, msg,
))
}));
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(
rx_from_external,
tx_to_locator,
denylist,
opts.max_queue_len,
opts.denylist,
));
// Return a handle to our aggregator:
@@ -74,19 +85,31 @@ impl Aggregator {
})))
}
// This is spawned into a separate task and handles any messages coming
// in to the aggregator. If nobody is tolding the tx side of the channel
// any more, this task will gracefully end.
/// This is spawned into a separate task and handles any messages coming
/// in to the aggregator. If nobody is holding the tx side of the channel
/// any more, this task will gracefully end.
async fn handle_messages(
rx_from_external: mpsc::UnboundedReceiver<inner_loop::ToAggregator>,
tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
rx_from_external: flume::Receiver<inner_loop::ToAggregator>,
tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>,
max_queue_len: usize,
denylist: Vec<String>,
) {
inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist)
.handle()
inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len)
.handle(rx_from_external)
.await;
}
/// Gather metrics from our aggregator loop
pub async fn gather_metrics(&self) -> anyhow::Result<inner_loop::Metrics> {
let (tx, rx) = flume::unbounded();
let msg = inner_loop::ToAggregator::GatherMetrics(tx);
self.0.tx_to_aggregator.send_async(msg).await?;
let metrics = rx.recv_async().await?;
Ok(metrics)
}
/// Return a sink that a shard can send messages into to be handled by the aggregator.
pub fn subscribe_shard(
&self,
@@ -102,7 +125,7 @@ impl Aggregator {
// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(tx_to_aggregator.with(move |msg| async move {
Box::pin(tx_to_aggregator.into_sink().with(move |msg| async move {
Ok(inner_loop::ToAggregator::FromShardWebsocket(
shard_conn_id.into(),
msg,
@@ -129,7 +152,7 @@ impl Aggregator {
// but pinning by boxing is the easy solution for now:
(
feed_conn_id,
Box::pin(tx_to_aggregator.with(move |msg| async move {
Box::pin(tx_to_aggregator.into_sink().with(move |msg| async move {
Ok(inner_loop::ToAggregator::FromFeedWebsocket(
feed_conn_id.into(),
msg,
@@ -1,10 +1,10 @@
use super::aggregator::Aggregator;
use super::aggregator::{Aggregator, AggregatorOpts};
use super::inner_loop;
use common::EitherSink;
use futures::{Sink, SinkExt, StreamExt};
use inner_loop::FromShardWebsocket;
use futures::{Sink, SinkExt};
use inner_loop::{FromShardWebsocket, Metrics};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct AggregatorSet(Arc<AggregatorSetInner>);
@@ -12,25 +12,74 @@ pub struct AggregatorSet(Arc<AggregatorSetInner>);
pub struct AggregatorSetInner {
aggregators: Vec<Aggregator>,
next_idx: AtomicUsize,
metrics: Mutex<Vec<Metrics>>,
}
impl AggregatorSet {
/// Spawn the number of aggregators we're asked to.
pub async fn spawn(
num_aggregators: usize,
denylist: Vec<String>,
opts: AggregatorOpts,
) -> anyhow::Result<AggregatorSet> {
assert_ne!(num_aggregators, 0, "You must have 1 or more aggregator");
let aggregators = futures::future::try_join_all(
(0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())),
(0..num_aggregators).map(|_| Aggregator::spawn(opts.clone())),
)
.await?;
Ok(AggregatorSet(Arc::new(AggregatorSetInner {
let initial_metrics = (0..num_aggregators).map(|_| Metrics::default()).collect();
let this = AggregatorSet(Arc::new(AggregatorSetInner {
aggregators,
next_idx: AtomicUsize::new(0),
})))
metrics: Mutex::new(initial_metrics),
}));
// Start asking for metrics:
this.spawn_metrics_loops();
Ok(this)
}
/// Spawn loops which periodically ask for metrics from each internal aggregator.
/// Depending on how busy the aggregators are, these metrics won't necessarily be in
/// sync with each other.
fn spawn_metrics_loops(&self) {
let aggregators = self.0.aggregators.clone();
for (idx, a) in aggregators.into_iter().enumerate() {
let inner = Arc::clone(&self.0);
tokio::spawn(async move {
loop {
let now = tokio::time::Instant::now();
let metrics = match a.gather_metrics().await {
Ok(metrics) => metrics,
// Any error here is unlikely and probably means that the aggregator
// loop has failed completely.
Err(e) => {
log::error!("Error obtaining metrics (bailing): {}", e);
return;
}
};
// Lock, update the stored metrics and drop the lock immediately.
// We discard any error; if something went wrong talking to the inner loop,
// it's probably a fatal error
{
inner.metrics.lock().unwrap()[idx] = metrics;
}
// Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll
// end up waiting longer between requests.
tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await;
}
});
}
}
/// Return the latest metrics we've gathered so far from each internal aggregator.
pub fn latest_metrics(&self) -> Vec<Metrics> {
self.0.metrics.lock().unwrap().clone()
}
/// Return a sink that a shard can send messages into to be handled by all aggregators.
@@ -52,10 +101,11 @@ impl AggregatorSet {
.map(|a| a.subscribe_shard())
.collect();
let (tx, rx) = flume::unbounded::<FromShardWebsocket>();
// Send every incoming message to all aggregators.
let (tx, mut rx) = futures::channel::mpsc::unbounded::<FromShardWebsocket>();
tokio::spawn(async move {
while let Some(msg) = rx.next().await {
while let Ok(msg) = rx.recv_async().await {
for conn in &mut conns {
// Unbounded channel under the hood, so this await
// shouldn't ever need to yield.
@@ -67,7 +117,7 @@ impl AggregatorSet {
}
});
EitherSink::b(tx.sink_map_err(|e| anyhow::anyhow!("{}", e)))
EitherSink::b(tx.into_sink().sink_map_err(|e| anyhow::anyhow!("{}", e)))
}
/// Return a sink that a feed can send messages into to be handled by a single aggregator.
@@ -25,9 +25,11 @@ use common::{
node_types::BlockHash,
time,
};
use futures::channel::mpsc;
use futures::StreamExt;
use std::collections::{HashMap, HashSet};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
@@ -39,6 +41,9 @@ pub enum ToAggregator {
FromShardWebsocket(ConnId, FromShardWebsocket),
FromFeedWebsocket(ConnId, FromFeedWebsocket),
FromFindLocation(NodeId, find_location::Location),
/// Hand back some metrics. The provided sender is expected not to block when
/// a message is sent into it.
GatherMetrics(flume::Sender<Metrics>),
}
/// An incoming shard connection can send these messages to the aggregator.
@@ -47,7 +52,7 @@ pub enum FromShardWebsocket {
/// When the socket is opened, it'll send this first
/// so that we have a way to communicate back to it.
Initialize {
channel: mpsc::UnboundedSender<ToShardWebsocket>,
channel: flume::Sender<ToShardWebsocket>,
},
/// Tell the aggregator about a new node.
Add {
@@ -85,7 +90,7 @@ pub enum FromFeedWebsocket {
/// Unbounded so that slow feeds don't block aggregato
/// progress.
Initialize {
channel: mpsc::UnboundedSender<ToFeedWebsocket>,
channel: flume::Sender<ToFeedWebsocket>,
},
/// The feed can subscribe to a chain to receive
/// messages relating to it.
@@ -100,6 +105,32 @@ pub enum FromFeedWebsocket {
Disconnected,
}
/// A set of metrics returned when we ask for metrics
#[derive(Clone, Debug, Default)]
pub struct Metrics {
/// When in unix MS from epoch were these metrics obtained
pub timestamp_unix_ms: u64,
/// How many chains are feeds currently subscribed to.
pub chains_subscribed_to: usize,
/// Number of subscribed feeds.
pub subscribed_feeds: usize,
/// Number of subscribed feeds that also asked for finality information.
pub subscribed_finality_feeds: usize,
/// How many messages are currently queued up in internal channels
/// waiting to be sent out to feeds.
pub total_messages_to_feeds: usize,
/// How many messages are queued waiting to be handled by this aggregator.
pub total_messages_to_aggregator: usize,
/// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed.
pub dropped_messages_to_aggregator: u64,
/// How many nodes are currently known to this aggregator.
pub connected_nodes: usize,
/// How many feeds are currently connected to this aggregator.
pub connected_feeds: usize,
/// How many shards are currently connected to this aggregator.
pub connected_shards: usize,
}
// The frontend sends text based commands; parse them into these messages:
impl FromStr for FromFeedWebsocket {
type Err = anyhow::Error;
@@ -127,9 +158,6 @@ pub enum ToFeedWebsocket {
/// Instances of this are responsible for handling incoming and
/// outgoing messages in the main aggregator loop.
pub struct InnerLoop {
/// Messages from the outside world come into this:
rx_from_external: mpsc::UnboundedReceiver<ToAggregator>,
/// The state of our chains and nodes lives here:
node_state: State,
/// We maintain a mapping between NodeId and ConnId+LocalId, so that we know
@@ -137,9 +165,9 @@ pub struct InnerLoop {
node_ids: BiMap<NodeId, (ConnId, ShardNodeId)>,
/// Keep track of how to send messages out to feeds.
feed_channels: HashMap<ConnId, mpsc::UnboundedSender<ToFeedWebsocket>>,
feed_channels: HashMap<ConnId, flume::Sender<ToFeedWebsocket>>,
/// Keep track of how to send messages out to shards.
shard_channels: HashMap<ConnId, mpsc::UnboundedSender<ToShardWebsocket>>,
shard_channels: HashMap<ConnId, flume::Sender<ToShardWebsocket>>,
/// Which chain is a feed subscribed to?
/// Feed Connection ID -> Chain Genesis Hash
@@ -152,18 +180,21 @@ pub struct InnerLoop {
feed_conn_id_finality: HashSet<ConnId>,
/// Send messages here to make geographical location requests.
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
/// How big can the queue of messages coming in to the aggregator get before messages
/// are prioritised and dropped to try and get back on track.
max_queue_len: usize,
}
impl InnerLoop {
/// Create a new inner loop handler with the various state it needs.
pub fn new(
rx_from_external: mpsc::UnboundedReceiver<ToAggregator>,
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>,
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
denylist: Vec<String>,
max_queue_len: usize,
) -> Self {
InnerLoop {
rx_from_external,
node_state: State::new(denylist),
node_ids: BiMap::new(),
feed_channels: HashMap::new(),
@@ -172,28 +203,97 @@ impl InnerLoop {
chain_to_feed_conn_ids: HashMap::new(),
feed_conn_id_finality: HashSet::new(),
tx_to_locator,
max_queue_len,
}
}
/// Start handling and responding to incoming messages. Owing to unbounded channels, we actually
/// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop
/// will be able to make progress quickly without any potential yield points.
pub async fn handle(mut self) {
while let Some(msg) = self.rx_from_external.next().await {
match msg {
ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => {
self.handle_from_feed(feed_conn_id, msg)
}
ToAggregator::FromShardWebsocket(shard_conn_id, msg) => {
self.handle_from_shard(shard_conn_id, msg)
}
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location)
/// Start handling and responding to incoming messages.
pub async fn handle(mut self, rx_from_external: flume::Receiver<ToAggregator>) {
let max_queue_len = self.max_queue_len;
let (metered_tx, metered_rx) = flume::unbounded();
// Keep count of the number of messages we drop for the sake of metric reporting
let dropped_messages = Arc::new(AtomicU64::new(0));
// Actually handle all of our messages, but before we get here, we
// check the length of the queue below to decide whether or not to
// pass the message on to this.
let dropped_messages2 = Arc::clone(&dropped_messages);
tokio::spawn(async move {
while let Ok(msg) = metered_rx.recv_async().await {
match msg {
ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => {
self.handle_from_feed(feed_conn_id, msg)
}
ToAggregator::FromShardWebsocket(shard_conn_id, msg) => {
self.handle_from_shard(shard_conn_id, msg)
}
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location)
}
ToAggregator::GatherMetrics(tx) => self.handle_gather_metrics(
tx,
metered_rx.len(),
dropped_messages2.load(Ordering::Relaxed),
),
}
}
});
while let Ok(msg) = rx_from_external.recv_async().await {
// ignore node updates if we have too many messages to handle, in an attempt
// to reduce the queue length back to something reasonable, lest it get out of
// control and start consuming a load of memory.
if metered_tx.len() > max_queue_len {
if matches!(
msg,
ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. })
) {
// Note: this wraps on overflow (which is probably the best
// behaviour for graphing it anyway)
dropped_messages.fetch_add(1, Ordering::Relaxed);
continue;
}
}
if let Err(e) = metered_tx.send(msg) {
log::error!("Cannot send message into aggregator: {}", e);
break;
}
}
}
/// Gather and return some metrics.
fn handle_gather_metrics(
&mut self,
rx: flume::Sender<Metrics>,
total_messages_to_aggregator: usize,
dropped_messages_to_aggregator: u64,
) {
let timestamp_unix_ms = time::now();
let connected_nodes = self.node_ids.len();
let subscribed_feeds = self.feed_conn_id_to_chain.len();
let chains_subscribed_to = self.chain_to_feed_conn_ids.len();
let subscribed_finality_feeds = self.feed_conn_id_finality.len();
let connected_shards = self.shard_channels.len();
let connected_feeds = self.feed_channels.len();
let total_messages_to_feeds: usize = self.feed_channels.values().map(|c| c.len()).sum();
// Ignore error sending; assume the receiver stopped caring and dropped the channel:
let _ = rx.send(Metrics {
timestamp_unix_ms,
chains_subscribed_to,
subscribed_feeds,
subscribed_finality_feeds,
total_messages_to_feeds,
total_messages_to_aggregator,
dropped_messages_to_aggregator,
connected_nodes,
connected_feeds,
connected_shards,
});
}
/// Handle messages that come from the node geographical locator.
fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) {
self.node_state
@@ -237,7 +337,7 @@ impl InnerLoop {
match self.node_state.add_node(genesis_hash, node) {
state::AddNodeResult::ChainOnDenyList => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute {
let _ = shard_conn.send(ToShardWebsocket::Mute {
local_id,
reason: MuteReason::ChainNotAllowed,
});
@@ -245,7 +345,7 @@ impl InnerLoop {
}
state::AddNodeResult::ChainOverQuota => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute {
let _ = shard_conn.send(ToShardWebsocket::Mute {
local_id,
reason: MuteReason::Overquota,
});
@@ -286,7 +386,7 @@ impl InnerLoop {
// Ask for the grographical location of the node.
// Currently we only geographically locate IPV4 addresses so ignore IPV6.
if let IpAddr::V4(ip_v4) = ip {
let _ = self.tx_to_locator.unbounded_send((node_id, ip_v4));
let _ = self.tx_to_locator.send((node_id, ip_v4));
}
}
}
@@ -369,7 +469,7 @@ impl InnerLoop {
// Send this to the channel that subscribed:
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
let _ = channel.send(ToFeedWebsocket::Bytes(bytes));
}
}
FromFeedWebsocket::Ping { value } => {
@@ -382,7 +482,7 @@ impl InnerLoop {
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Pong(&value));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes));
}
}
FromFeedWebsocket::Subscribe { chain } => {
@@ -430,7 +530,7 @@ impl InnerLoop {
new_chain.finalized_block().hash,
));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes));
}
// If many (eg 10k) nodes are connected, serializing all of their info takes time.
@@ -465,7 +565,7 @@ impl InnerLoop {
})
.collect();
for bytes in all_feed_messages {
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes));
}
// Actually make a note of the new chain subsciption:
@@ -580,7 +680,7 @@ impl InnerLoop {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
for &feed_id in feeds {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.unbounded_send(message.clone());
let _ = chan.send(message.clone());
}
}
}
@@ -596,7 +696,7 @@ impl InnerLoop {
/// Send a message to everybody.
fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) {
for chan in self.feed_channels.values_mut() {
let _ = chan.unbounded_send(message.clone());
let _ = chan.send(message.clone());
}
}
@@ -622,7 +722,7 @@ impl InnerLoop {
// are also subscribed to receive finality updates.
for &feed_id in feeds.union(&self.feed_conn_id_finality) {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.unbounded_send(message.clone());
let _ = chan.send(message.clone());
}
}
}
@@ -19,6 +19,7 @@ mod aggregator_set;
mod inner_loop;
// Expose the various message types that can be worked with externally:
pub use aggregator::AggregatorOpts;
pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket};
pub use aggregator_set::*;
+4 -5
View File
@@ -17,8 +17,7 @@
use std::net::Ipv4Addr;
use std::sync::Arc;
use futures::channel::mpsc;
use futures::{Sink, SinkExt, StreamExt};
use futures::{Sink, SinkExt};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use serde::Deserialize;
@@ -31,12 +30,12 @@ pub type Location = Option<Arc<NodeLocation>>;
/// This is responsible for taking an IP address and attempting
/// to find a geographical location from this
pub fn find_location<Id, R>(response_chan: R) -> mpsc::UnboundedSender<(Id, Ipv4Addr)>
pub fn find_location<Id, R>(response_chan: R) -> flume::Sender<(Id, Ipv4Addr)>
where
R: Sink<(Id, Option<Arc<NodeLocation>>)> + Unpin + Send + Clone + 'static,
Id: Clone + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded();
let (tx, rx) = flume::unbounded();
// cache entries
let mut cache: FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>> = FxHashMap::default();
@@ -61,7 +60,7 @@ where
let semaphore = Arc::new(Semaphore::new(4));
loop {
while let Some((id, ip_address)) = rx.next().await {
while let Ok((id, ip_address)) = rx.recv_async().await {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut response_chan = response_chan.clone();
let locator = locator.clone();
+82 -7
View File
@@ -22,13 +22,14 @@ use std::str::FromStr;
use tokio::time::{Duration, Instant};
use aggregator::{
AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket,
AggregatorOpts, AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket,
ToShardWebsocket,
};
use bincode::Options;
use common::http_utils;
use common::internal_messages;
use common::ready_chunks_all::ReadyChunksAll;
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::{SinkExt, StreamExt};
use hyper::{Method, Response};
use simple_logger::SimpleLogger;
use structopt::StructOpt;
@@ -67,6 +68,10 @@ struct Opts {
/// aggregators.
#[structopt(long)]
num_aggregators: Option<usize>,
/// How big can the message queue for each aggregator grow before we start dropping non-essential
/// messages in an attempt to let it reduce?
#[structopt(long)]
aggregator_queue_len: Option<usize>,
}
fn main() {
@@ -110,7 +115,15 @@ fn main() {
/// Declare our routes and start the server.
async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> {
let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?;
let aggregator_queue_len = opts.aggregator_queue_len.unwrap_or(10_000);
let aggregator = AggregatorSet::spawn(
num_aggregators,
AggregatorOpts {
max_queue_len: aggregator_queue_len,
denylist: opts.denylist,
},
)
.await?;
let socket_addr = opts.socket;
let feed_timeout = opts.feed_timeout;
@@ -166,6 +179,8 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()>
},
))
}
// Return metrics in a prometheus-friendly text based format:
(&Method::GET, "/metrics") => Ok(return_prometheus_metrics(aggregator).await),
// 404 for anything else:
_ => Ok(Response::builder()
.status(404)
@@ -188,7 +203,8 @@ async fn handle_shard_websocket_connection<S>(
where
S: futures::Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
{
let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::unbounded();
let (tx_to_shard_conn, rx_from_aggregator) = flume::unbounded();
let mut rx_from_aggregator = rx_from_aggregator.into_stream();
// Tell the aggregator about this new connection, and give it a way to send messages to us:
let init_msg = FromShardWebsocket::Initialize {
@@ -330,8 +346,8 @@ where
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
{
// unbounded channel so that slow feeds don't block aggregator progress:
let (tx_to_feed_conn, rx_from_aggregator) = mpsc::unbounded();
let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator);
let (tx_to_feed_conn, rx_from_aggregator) = flume::unbounded();
let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator.into_stream());
// Tell the aggregator about this new connection, and give it a way to send messages to us:
let init_msg = FromFeedWebsocket::Initialize {
@@ -350,7 +366,6 @@ where
let recv_handle = tokio::spawn(async move {
loop {
let mut bytes = Vec::new();
// Receive a message, or bail if closer called. We don't care about cancel safety;
// if we're halfway through receiving a message, no biggie since we're closing the
// connection anyway.
@@ -466,3 +481,63 @@ where
// loop ended; give socket back to parent:
(tx_to_aggregator, ws_send)
}
async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper::Body> {
let metrics = aggregator.latest_metrics();
// Instead of using the rust prometheus library (which is optimised around global variables updated across a codebase),
// we just split out the text format that prometheus expects ourselves, and use the latest metrics that we've
// captured so far from the aggregators. See:
//
// https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details
//
// For an example and explanation of this text based format. The minimal output we produce here seems to
// be handled correctly when pointing a current version of prometheus at it.
//
// Note: '{{' and '}}' are just escaped versions of '{' and '}' in Rust fmt strings.
let mut s = String::new();
for (idx, m) in metrics.iter().enumerate() {
s.push_str(&format!(
"telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_nodes, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_shards, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n",
idx, m.chains_subscribed_to, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_finality_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.total_messages_to_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.total_messages_to_aggregator, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_dropped_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.dropped_messages_to_aggregator, m.timestamp_unix_ms
));
}
Response::builder()
// The version number here tells prometheus which version of the text format we're using:
.header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body(s.into())
.unwrap()
}
+43 -216
View File
@@ -34,19 +34,17 @@ In general, if you run into issues, it may be better to run this on a linux
box; MacOS seems to hit limits quicker in general.
*/
use common::node_types::BlockHash;
use common::ws_client::SentMessage;
use futures::{future, StreamExt};
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt;
use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts};
/// A configurable soak_test runner. Configure by providing the expected args as
/// an environment variable. One example to run this test is:
/// A test runner which sends realistic(ish) messages from fake nodes to a telemetry server.
///
/// To start up 4 telemetry_shards and 1 telemetry_core with 10 feeds and 100 nodes:
/// ```sh
/// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture
/// ```
@@ -56,211 +54,30 @@ use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts};
/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture
/// ```
///
/// Or, you can run it against existing processes with something like this:
/// Or, you can run it against existing processes on the network with something like this:
/// ```sh
/// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture
/// ```
///
/// Each will establish the same total number of connections and send the same messages.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn soak_test() {
#[test]
pub fn soak_test() {
let opts = get_soak_test_opts();
run_soak_test(opts).await;
}
/// A general soak test runner.
/// This test sends the same message over and over, and so
/// the results should be pretty reproducible.
async fn run_soak_test(opts: SoakTestOpts) {
let mut server = start_server(
ServerOpts {
release_mode: true,
log_output: opts.log_output,
..Default::default()
},
CoreOpts {
worker_threads: opts.core_worker_threads,
..Default::default()
},
ShardOpts {
worker_threads: opts.shard_worker_threads,
..Default::default()
},
)
.await;
println!("Telemetry core running at {}", server.get_core().host());
// Start up the shards we requested:
let mut shard_ids = vec![];
for _ in 0..opts.shards {
let shard_id = server.add_shard().await.expect("shard can't be added");
shard_ids.push(shard_id);
}
// Connect nodes to each shard:
let mut nodes = vec![];
for &shard_id in &shard_ids {
let mut conns = server
.get_shard(shard_id)
.unwrap()
.connect_multiple_nodes(opts.nodes)
.await
.expect("node connections failed");
nodes.append(&mut conns);
}
// Each node tells the shard about itself:
for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() {
node_tx
.send_json_binary(json!({
"id":1, // Only needs to be unique per node
"ts":"2021-07-12T10:37:47.714666+01:00",
"payload": {
"authority":true,
"chain": "Polkadot", // <- so that we don't go over quota with lots of nodes.
"config":"",
"genesis_hash": BlockHash::from_low_u64_ne(1),
"implementation":"Substrate Node",
"msg":"system.connected",
"name": format!("Node #{}", idx),
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
"startup_time":"1625565542717",
"version":"2.0.0-07a1af348-aarch64-macos"
},
}))
.unwrap();
}
// Connect feeds to the core:
let mut feeds = server
.get_core()
.connect_multiple_feeds(opts.feeds)
.await
.expect("feed connections failed");
// Every feed subscribes to the chain above to recv messages about it:
for (feed_tx, _) in &mut feeds {
feed_tx.send_command("subscribe", "Polkadot").unwrap();
}
// Start sending "update" messages from nodes at time intervals.
let bytes_in = Arc::new(AtomicUsize::new(0));
let bytes_in2 = Arc::clone(&bytes_in);
tokio::task::spawn(async move {
let msg = json!({
"id":1,
"payload":{
"bandwidth_download":576,
"bandwidth_upload":576,
"msg":"system.interval",
"peers":1
},
"ts":"2021-07-12T10:37:48.330433+01:00"
});
let msg_bytes: &'static [u8] = Box::new(serde_json::to_vec(&msg).unwrap()).leak();
loop {
// every ~1second we aim to have sent messages from all of the nodes. So we cycle through
// the node IDs and send a message from each at roughly 1s / number_of_nodes.
let mut interval =
tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64));
for node_id in (0..nodes.len()).cycle() {
interval.tick().await;
let node_tx = &mut nodes[node_id].0;
node_tx
.unbounded_send(SentMessage::StaticBinary(msg_bytes))
.unwrap();
bytes_in2.fetch_add(msg_bytes.len(), Ordering::Relaxed);
}
}
});
// Also start receiving messages, counting the bytes received so far.
let bytes_out = Arc::new(AtomicUsize::new(0));
let msgs_out = Arc::new(AtomicUsize::new(0));
for (_, mut feed_rx) in feeds {
let bytes_out = Arc::clone(&bytes_out);
let msgs_out = Arc::clone(&msgs_out);
tokio::task::spawn(async move {
while let Some(msg) = feed_rx.next().await {
let msg = msg.expect("message could be received");
let num_bytes = msg.len();
bytes_out.fetch_add(num_bytes, Ordering::Relaxed);
msgs_out.fetch_add(1, Ordering::Relaxed);
}
eprintln!("Error: feed has been closed unexpectedly");
});
}
// Periodically report on bytes out
tokio::task::spawn(async move {
let one_mb = 1024.0 * 1024.0;
let mut last_bytes_in = 0;
let mut last_bytes_out = 0;
let mut last_msgs_out = 0;
let mut n = 1;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let bytes_in_val = bytes_in.load(Ordering::Relaxed);
let bytes_out_val = bytes_out.load(Ordering::Relaxed);
let msgs_out_val = msgs_out.load(Ordering::Relaxed);
println!(
"#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})",
n,
(bytes_in_val - last_bytes_in) as f64 / one_mb,
(bytes_out_val - last_bytes_out) as f64 / one_mb,
bytes_in_val,
bytes_out_val,
(msgs_out_val - last_msgs_out),
msgs_out_val
);
n += 1;
last_bytes_in = bytes_in_val;
last_bytes_out = bytes_out_val;
last_msgs_out = msgs_out_val;
}
});
// Wait forever.
future::pending().await
}
/// Identical to `soak_test`, except that we try to send realistic messages from fake nodes.
/// This means it's potentially less reproducable, but presents a more accurate picture of
/// the load, and lets us see the UI working more or less.
///
/// We can provide the same arguments as we would to `soak_test`:
///
/// ```sh
/// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture
/// ```
///
/// You can also run this test against the pre-sharding actix binary with something like this:
/// ```sh
/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture
/// ```
///
/// Or, you can run it against existing processes with something like this:
/// ```sh
/// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture
/// ```
///
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn realistic_soak_test() {
let opts = get_soak_test_opts();
run_realistic_soak_test(opts).await;
tokio::runtime::Builder::new_multi_thread()
.worker_threads(opts.test_worker_threads)
.enable_all()
.thread_name("telemetry_test_runner")
.build()
.unwrap()
.block_on(run_soak_test(opts));
}
/// A general soak test runner.
/// This test sends realistic messages from connected nodes
/// so that we can see how things react under more normal
/// circumstances
async fn run_realistic_soak_test(opts: SoakTestOpts) {
async fn run_soak_test(opts: SoakTestOpts) {
let mut server = start_server(
ServerOpts {
release_mode: true,
@@ -300,30 +117,34 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) {
// Start nodes talking to the shards:
let bytes_in = Arc::new(AtomicUsize::new(0));
let ids_per_node = opts.ids_per_node;
for node in nodes.into_iter().enumerate() {
let bytes_in = Arc::clone(&bytes_in);
tokio::spawn(async move {
let (idx, (tx, _)) = node;
let (idx, (tx, _)) = node;
for id in 0..ids_per_node {
let bytes_in = Arc::clone(&bytes_in);
let tx = tx.clone();
let telemetry = test_utils::fake_telemetry::FakeTelemetry::new(
Duration::from_secs(3),
format!("Node {}", idx + 1),
"Polkadot".to_owned(),
idx + 1,
);
tokio::spawn(async move {
let telemetry = test_utils::fake_telemetry::FakeTelemetry::new(
Duration::from_secs(3),
format!("Node {}", idx + 1),
"Polkadot".to_owned(),
id + 1,
);
let res = telemetry
.start(|msg| async {
bytes_in.fetch_add(msg.len(), Ordering::Relaxed);
tx.unbounded_send(SentMessage::Binary(msg))?;
Ok::<_, anyhow::Error>(())
})
.await;
let res = telemetry
.start(|msg| async {
bytes_in.fetch_add(msg.len(), Ordering::Relaxed);
tx.unbounded_send(SentMessage::Binary(msg))?;
Ok::<_, anyhow::Error>(())
})
.await;
if let Err(e) = res {
log::error!("Telemetry Node #{} has died with error: {}", idx, e);
}
});
if let Err(e) = res {
log::error!("Telemetry Node #{} has died with error: {}", idx, e);
}
});
}
}
// Connect feeds to the core:
@@ -404,6 +225,9 @@ struct SoakTestOpts {
/// The number of nodes to connect to each feed
#[structopt(long)]
nodes: usize,
/// The number of different virtual nodes to connect per actual node socket connection
#[structopt(long, default_value = "1")]
ids_per_node: usize,
/// Number of aggregator loops to use in the core
#[structopt(long)]
core_num_aggregators: Option<usize>,
@@ -416,6 +240,9 @@ struct SoakTestOpts {
/// Should we log output from the core/shards to stdout?
#[structopt(long)]
log_output: bool,
/// How many worker threads should the soak test runner use?
#[structopt(long, default_value = "4")]
test_worker_threads: usize,
}
/// Get soak test args from an envvar and parse them via structopt.
+1
View File
@@ -9,6 +9,7 @@ license = "GPL-3.0"
anyhow = "1.0.41"
bincode = "1.3.3"
common = { path = "../common" }
flume = "0.10.8"
futures = "0.3.15"
hex = "0.4.3"
http = "0.2.4"
+18 -18
View File
@@ -21,8 +21,7 @@ use common::{
node_types::BlockHash,
AssignId,
};
use futures::channel::mpsc;
use futures::{Sink, SinkExt, StreamExt};
use futures::{Sink, SinkExt};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
@@ -60,7 +59,7 @@ pub enum FromWebsocket {
/// the websocket connection and force the node to reconnect
/// so that it sends its system info again incase the telemetry
/// core has restarted.
close_connection: mpsc::Sender<()>,
close_connection: flume::Sender<()>,
},
/// Tell the aggregator about a new node.
Add {
@@ -94,28 +93,28 @@ struct AggregatorInternal {
/// Send messages to the aggregator from websockets via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::Sender<ToAggregator>,
tx_to_aggregator: flume::Sender<ToAggregator>,
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
let (tx_to_aggregator, rx_from_external) = flume::bounded(10);
// Establish a resiliant connection to the core (this retries as needed):
let (tx_to_telemetry_core, mut rx_from_telemetry_core) =
let (tx_to_telemetry_core, rx_from_telemetry_core) =
create_ws_connection_to_core(telemetry_uri).await;
// Forward messages from the telemetry core into the aggregator:
let mut tx_to_aggregator2 = tx_to_aggregator.clone();
let tx_to_aggregator2 = tx_to_aggregator.clone();
tokio::spawn(async move {
while let Some(msg) = rx_from_telemetry_core.next().await {
while let Ok(msg) = rx_from_telemetry_core.recv_async().await {
let msg_to_aggregator = match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
};
if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await {
if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await {
// This will close the ws channels, which themselves log messages.
break;
}
@@ -139,8 +138,8 @@ impl Aggregator {
// in to the aggregator. If nobody is holding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
mut rx_from_external: mpsc::Receiver<ToAggregator>,
mut tx_to_telemetry_core: mpsc::Sender<FromAggregator>,
rx_from_external: flume::Receiver<ToAggregator>,
tx_to_telemetry_core: flume::Sender<FromAggregator>,
) {
use internal_messages::{FromShardAggregator, FromTelemetryCore};
@@ -150,7 +149,7 @@ impl Aggregator {
// A list of close channels for the currently connected substrate nodes. Send an empty
// tuple to these to ask the connections to be closed.
let mut close_connections: HashMap<ConnId, mpsc::Sender<()>> = HashMap::new();
let mut close_connections: HashMap<ConnId, flume::Sender<()>> = HashMap::new();
// Maintain mappings from the connection ID and node message ID to the "local ID" which we
// broadcast to the telemetry core.
@@ -160,15 +159,15 @@ impl Aggregator {
let mut muted: HashSet<ShardNodeId> = HashSet::new();
// Now, loop and receive messages to handle.
while let Some(msg) = rx_from_external.next().await {
while let Ok(msg) = rx_from_external.recv_async().await {
match msg {
ToAggregator::ConnectedToTelemetryCore => {
// Take hold of the connection closers and run them all.
let closers = close_connections;
for (_, mut closer) in closers {
for (_, closer) in closers {
// if this fails, it probably means the connection has died already anyway.
let _ = closer.send(()).await;
let _ = closer.send_async(()).await;
}
// We've told everything to disconnect. Now, reset our state:
@@ -212,7 +211,7 @@ impl Aggregator {
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
.send(FromShardAggregator::AddNode {
.send_async(FromShardAggregator::AddNode {
ip,
node,
genesis_hash,
@@ -245,7 +244,7 @@ impl Aggregator {
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
.send(FromShardAggregator::UpdateNode { local_id, payload })
.send_async(FromShardAggregator::UpdateNode { local_id, payload })
.await;
}
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
@@ -264,7 +263,7 @@ impl Aggregator {
to_local_id.remove_by_id(local_id);
muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send(FromShardAggregator::RemoveNode { local_id })
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
}
@@ -293,6 +292,7 @@ impl Aggregator {
// but pinning by boxing is the easy solution for now:
Box::pin(
tx_to_aggregator
.into_sink()
.with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }),
)
}
+16 -21
View File
@@ -16,8 +16,7 @@
use bincode::Options;
use common::ws_client;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use futures::StreamExt;
#[derive(Clone, Debug)]
pub enum Message<Out> {
@@ -36,13 +35,13 @@ pub enum Message<Out> {
/// between aggregator and core.
pub async fn create_ws_connection_to_core<In, Out>(
telemetry_uri: http::Uri,
) -> (mpsc::Sender<In>, mpsc::Receiver<Message<Out>>)
) -> (flume::Sender<In>, flume::Receiver<Message<Out>>)
where
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
let (tx_in, mut rx_in) = mpsc::channel(10);
let (mut tx_out, rx_out) = mpsc::channel(10);
let (tx_in, rx_in) = flume::bounded::<In>(10);
let (tx_out, rx_out) = flume::bounded(10);
let mut is_connected = false;
@@ -51,7 +50,7 @@ where
// Throw away any pending messages from the incoming channel so that it
// doesn't get filled up and begin blocking while we're looping and waiting
// for a reconnection.
while let Ok(Some(_)) = rx_in.try_next() {}
while let Ok(_) = rx_in.try_recv() {}
// Try to connect. If connection established, we serialize and forward messages
// to/from the core. If the external channels break, we end for good. If the internal
@@ -60,9 +59,9 @@ where
Ok(connection) => {
let (tx_to_core, mut rx_from_core) = connection.into_channels();
is_connected = true;
let mut tx_out = tx_out.clone();
let tx_out = tx_out.clone();
if let Err(e) = tx_out.send(Message::Connected).await {
if let Err(e) = tx_out.send_async(Message::Connected).await {
// If receiving end is closed, bail now.
log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
@@ -73,35 +72,31 @@ where
tokio::select! {
msg = rx_from_core.next() => {
let msg = match msg {
Some(msg) => msg,
Some(Ok(msg)) => msg,
// No more messages from core? core WS is disconnected.
None => {
_ => {
log::warn!("No more messages from core: shutting down connection (will reconnect)");
break
}
};
let bytes = match msg {
Ok(ws_client::RecvMessage::Binary(bytes)) => bytes,
Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(),
Err(e) => {
log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e);
break;
}
ws_client::RecvMessage::Binary(bytes) => bytes,
ws_client::RecvMessage::Text(s) => s.into_bytes()
};
let msg = bincode::options()
.deserialize(&bytes)
.expect("internal messages must be deserializable");
if let Err(e) = tx_out.send(Message::Data(msg)).await {
if let Err(e) = tx_out.send_async(Message::Data(msg)).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
},
msg = rx_in.next() => {
msg = rx_in.recv_async() => {
let msg = match msg {
Some(msg) => msg,
None => {
Ok(msg) => msg,
Err(flume::RecvError::Disconnected) => {
log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)");
return
}
@@ -131,7 +126,7 @@ where
if is_connected {
is_connected = false;
if let Err(e) = tx_out.send(Message::Disconnected).await {
if let Err(e) = tx_out.send_async(Message::Disconnected).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
+3 -3
View File
@@ -29,7 +29,7 @@ use common::byte_size::ByteSize;
use common::http_utils;
use common::node_message;
use common::rolling_total::RollingTotalBuilder;
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::SinkExt;
use http::Uri;
use hyper::{Method, Response};
use simple_logger::SimpleLogger;
@@ -203,7 +203,7 @@ where
// This could be a oneshot channel, but it's useful to be able to clone
// messages, and we can't clone oneshot channel senders.
let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0);
let (close_connection_tx, close_connection_rx) = flume::bounded(1);
// Tell the aggregator about this new connection, and give it a way to close this connection:
let init_msg = FromWebsocket::Initialize {
@@ -223,7 +223,7 @@ where
tokio::select! {
// The close channel has fired, so end the loop. `ws_recv.receive_data` is
// *not* cancel safe, but since we're closing the connection we don't care.
_ = close_connection_rx.next() => {
_ = close_connection_rx.recv_async() => {
log::info!("connection to {:?} being closed by aggregator", real_addr);
break
},
+1
View File
@@ -18,3 +18,4 @@ tokio = { version = "1.7.1", features = ["full"] }
tokio-util = { version = "0.6.7", features = ["full"] }
common = { path = "../common" }
time = { version = "0.3.0", features = ["formatting"] }
flume = "0.10.8"
+7 -60
View File
@@ -21,7 +21,7 @@ use std::{
use crate::feed_message_de::FeedMessage;
use common::ws_client;
use futures::{Sink, SinkExt, Stream, StreamExt};
use futures::{Stream, StreamExt};
/// Wrap a `ws_client::Sender` with convenient utility methods for shard connections
pub struct ShardSender(ws_client::Sender);
@@ -32,45 +32,20 @@ impl From<ws_client::Sender> for ShardSender {
}
}
impl Sink<ws_client::SentMessage> for ShardSender {
type Error = ws_client::SendError;
fn poll_ready(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_ready_unpin(cx)
}
fn start_send(
mut self: std::pin::Pin<&mut Self>,
item: ws_client::SentMessage,
) -> Result<(), Self::Error> {
self.0.start_send_unpin(item)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_flush_unpin(cx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_close_unpin(cx)
}
}
impl ShardSender {
/// Send JSON as a binary websocket message
pub fn send_json_binary(
&mut self,
json: serde_json::Value,
) -> Result<(), ws_client::SendError> {
) -> Result<(), flume::SendError<ws_client::SentMessage>> {
let bytes = serde_json::to_vec(&json).expect("valid bytes");
self.unbounded_send(ws_client::SentMessage::Binary(bytes))
}
/// Send JSON as a textual websocket message
pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> {
pub fn send_json_text(
&mut self,
json: serde_json::Value,
) -> Result<(), flume::SendError<ws_client::SentMessage>> {
let s = serde_json::to_string(&json).expect("valid string");
self.unbounded_send(ws_client::SentMessage::Text(s))
}
@@ -128,34 +103,6 @@ impl From<ws_client::Sender> for FeedSender {
}
}
impl Sink<ws_client::SentMessage> for FeedSender {
type Error = ws_client::SendError;
fn poll_ready(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_ready_unpin(cx)
}
fn start_send(
mut self: std::pin::Pin<&mut Self>,
item: ws_client::SentMessage,
) -> Result<(), Self::Error> {
self.0.start_send_unpin(item)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_flush_unpin(cx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_close_unpin(cx)
}
}
impl Deref for FeedSender {
type Target = ws_client::Sender;
fn deref(&self) -> &Self::Target {
@@ -176,7 +123,7 @@ impl FeedSender {
&self,
command: S,
param: S,
) -> Result<(), ws_client::SendError> {
) -> Result<(), flume::SendError<ws_client::SentMessage>> {
self.unbounded_send(ws_client::SentMessage::Text(format!(
"{}:{}",
command.as_ref(),