diff --git a/backend/telemetry_core/benches/subscribe.rs b/backend/telemetry_core/benches/subscribe.rs index 743606a..58be23c 100644 --- a/backend/telemetry_core/benches/subscribe.rs +++ b/backend/telemetry_core/benches/subscribe.rs @@ -21,65 +21,65 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { c.bench_function("subscribe speed: time till pong", move |b| { b.to_async(&rt).iter_custom(|iters| async move { - // Start a server: - let mut server = start_server( - ServerOpts { - release_mode: true, - log_output: false, - }, - CoreOpts { - worker_threads: Some(2), - ..Default::default() - }, - ShardOpts { - max_nodes_per_connection: Some(usize::MAX), - max_node_data_per_second: Some(usize::MAX), - worker_threads: Some(2), - ..Default::default() - }, - ) - .await; - let shard_id = server.add_shard().await.unwrap(); - - // Connect a shard: - let (mut node_tx, _) = server - .get_shard(shard_id) - .unwrap() - .connect_node() - .await - .expect("node can connect"); - - // Add a bunch of actual nodes on the same chain: - for n in 0..NUMBER_OF_NODES { - node_tx - .send_json_text(json!({ - "id":n, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Polkadot", // No limit to #nodes on this network. - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Node {}", n), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - } - })) - .unwrap(); - } - - // Give those messages a chance to be handled. This, of course, - // assumes that those messages _can_ be handled this quickly. If not, - // we'll start to skew benchmark results with the "time taklen to add node". - tokio::time::sleep(Duration::from_millis(250)).await; - // Now, see how quickly a feed is subscribed. Criterion controls the number of // iters performed here, but a lot of the time that number is "1". let mut total_time = Duration::ZERO; for _n in 0..iters { + // Start a server: + let mut server = start_server( + ServerOpts { + release_mode: true, + log_output: false, + }, + CoreOpts { + worker_threads: Some(16), + ..Default::default() + }, + ShardOpts { + max_nodes_per_connection: Some(usize::MAX), + max_node_data_per_second: Some(usize::MAX), + worker_threads: Some(2), + ..Default::default() + }, + ) + .await; + let shard_id = server.add_shard().await.unwrap(); + + // Connect a shard: + let (mut node_tx, _) = server + .get_shard(shard_id) + .unwrap() + .connect_node() + .await + .expect("node can connect"); + + // Add a bunch of actual nodes on the same chain: + for n in 0..NUMBER_OF_NODES { + node_tx + .send_json_text(json!({ + "id":n, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Polkadot", // No limit to #nodes on this network. + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Node {}", n), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })) + .unwrap(); + } + + // Give those messages a chance to be handled. This, of course, + // assumes that those messages _can_ be handled this quickly. If not, + // we'll start to skew benchmark results with the "time taklen to add node". + tokio::time::sleep(Duration::from_millis(250)).await; + // Start a bunch of feeds: let mut feeds = server .get_core() @@ -107,11 +107,6 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { } } total_time += start.elapsed(); - - // shut down the feeds - for (mut feed_tx, _) in feeds { - feed_tx.close().await.unwrap(); - } } // The total time spent waiting for subscribes: diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs new file mode 100644 index 0000000..334cc5d --- /dev/null +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -0,0 +1,75 @@ +use super::aggregator::Aggregator; +use super::inner_loop; +use futures::{Sink, SinkExt, StreamExt}; +use inner_loop::FromShardWebsocket; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +#[derive(Clone)] +pub struct AggregatorSet(Arc); + +pub struct AggregatorSetInner { + aggregators: Vec, + next_idx: AtomicUsize, +} + +impl AggregatorSet { + /// Spawn the number of aggregators we're asked to. + pub async fn spawn( + num_aggregators: usize, + denylist: Vec, + ) -> anyhow::Result { + let aggregators = futures::future::try_join_all( + (0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())), + ) + .await?; + + Ok(AggregatorSet(Arc::new(AggregatorSetInner { + aggregators, + next_idx: AtomicUsize::new(0), + }))) + } + + /// Return a sink that a shard can send messages into to be handled by all aggregators. + pub fn subscribe_shard( + &self, + ) -> impl Sink + Send + Sync + Unpin + 'static + { + let mut conns: Vec<_> = self + .0 + .aggregators + .iter() + .map(|a| a.subscribe_shard()) + .collect(); + + // Send every incoming message to all aggregators. + let (tx, mut rx) = futures::channel::mpsc::unbounded::(); + tokio::spawn(async move { + while let Some(msg) = rx.next().await { + for conn in &mut conns { + // Unbounded channel under the hood, so this await + // shouldn't ever need to yield. + if let Err(e) = conn.send(msg.clone()).await { + log::error!("Aggregator connection has failed: {}", e); + return; + } + } + } + }); + + tx.sink_map_err(|e| anyhow::anyhow!("{}", e)) + } + + /// Return a sink that a feed can send messages into to be handled by a single aggregator. + pub fn subscribe_feed( + &self, + ) -> ( + u64, + impl Sink + Send + Sync + Unpin + 'static, + ) { + let last_val = self.0.next_idx.fetch_add(1, Ordering::Relaxed); + let this_idx = (last_val + 1) % self.0.aggregators.len(); + + self.0.aggregators[this_idx].subscribe_feed() + } +} diff --git a/backend/telemetry_core/src/aggregator/mod.rs b/backend/telemetry_core/src/aggregator/mod.rs index 7e236eb..2865ed9 100644 --- a/backend/telemetry_core/src/aggregator/mod.rs +++ b/backend/telemetry_core/src/aggregator/mod.rs @@ -15,9 +15,10 @@ // along with this program. If not, see . mod aggregator; +mod aggregator_set; mod inner_loop; // Expose the various message types that can be worked with externally: pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket}; -pub use aggregator::*; +pub use aggregator_set::*; diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index a3beec0..aeee8f6 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -22,7 +22,7 @@ use std::str::FromStr; use tokio::time::{Duration, Instant}; use aggregator::{ - Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, + AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, }; use bincode::Options; use common::http_utils; @@ -63,6 +63,10 @@ struct Opts { /// on the machine. If no value is given, use an internal default that we have deemed sane. #[structopt(long)] worker_threads: Option, + /// Each aggregator keeps track of the entire node state. Feed subscriptions are split across + /// aggregators. + #[structopt(long)] + num_aggregators: Option, } fn main() { @@ -83,21 +87,29 @@ fn main() { None => usize::min(num_cpus::get(), 8), }; + let num_aggregators = match opts.num_aggregators { + Some(0) => num_cpus::get(), + Some(n) => n, + // By default, we'll have half as many aggregator tasks + // running as we do worker threads (minimum 1). + None => usize::max(worker_threads / 2, 1), + }; + tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(worker_threads) .build() .unwrap() .block_on(async { - if let Err(e) = start_server(opts).await { + if let Err(e) = start_server(num_aggregators, opts).await { log::error!("Error starting server: {}", e); } }); } /// Declare our routes and start the server. -async fn start_server(opts: Opts) -> anyhow::Result<()> { - let aggregator = Aggregator::spawn(opts.denylist).await?; +async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> { + let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?; let socket_addr = opts.socket; let feed_timeout = opts.feed_timeout; diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 51faef4..78b3dbe 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -404,6 +404,9 @@ struct SoakTestOpts { /// The number of nodes to connect to each feed #[structopt(long)] nodes: usize, + /// Number of aggregator loops to use in the core + #[structopt(long)] + core_num_aggregators: Option, /// Number of worker threads the core will use #[structopt(long)] core_worker_threads: Option, diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index 10d0f6f..f94d0e2 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -36,6 +36,7 @@ impl Default for ServerOpts { pub struct CoreOpts { pub feed_timeout: Option, pub worker_threads: Option, + pub num_aggregators: Option, } impl Default for CoreOpts { @@ -43,6 +44,7 @@ impl Default for CoreOpts { Self { feed_timeout: None, worker_threads: None, + num_aggregators: None, } } } @@ -156,6 +158,9 @@ pub async fn start_server( if let Some(val) = core_opts.worker_threads { core_command = core_command.arg("--worker-threads").arg(val.to_string()); } + if let Some(val) = core_opts.num_aggregators { + core_command = core_command.arg("--num-aggregators").arg(val.to_string()); + } // Start the server Server::start(server::StartOpts::ShardAndCore {