Allow multiple aggregator loops in the core to try and spread feed subscription/send cost out

This commit is contained in:
James Wilson
2021-08-07 17:08:58 +01:00
parent 9c001bdcfd
commit c99cbee1e9
6 changed files with 156 additions and 65 deletions
@@ -0,0 +1,75 @@
use super::aggregator::Aggregator;
use super::inner_loop;
use futures::{Sink, SinkExt, StreamExt};
use inner_loop::FromShardWebsocket;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone)]
pub struct AggregatorSet(Arc<AggregatorSetInner>);
pub struct AggregatorSetInner {
aggregators: Vec<Aggregator>,
next_idx: AtomicUsize,
}
impl AggregatorSet {
/// Spawn the number of aggregators we're asked to.
pub async fn spawn(
num_aggregators: usize,
denylist: Vec<String>,
) -> anyhow::Result<AggregatorSet> {
let aggregators = futures::future::try_join_all(
(0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())),
)
.await?;
Ok(AggregatorSet(Arc::new(AggregatorSetInner {
aggregators,
next_idx: AtomicUsize::new(0),
})))
}
/// Return a sink that a shard can send messages into to be handled by all aggregators.
pub fn subscribe_shard(
&self,
) -> impl Sink<inner_loop::FromShardWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static
{
let mut conns: Vec<_> = self
.0
.aggregators
.iter()
.map(|a| a.subscribe_shard())
.collect();
// Send every incoming message to all aggregators.
let (tx, mut rx) = futures::channel::mpsc::unbounded::<FromShardWebsocket>();
tokio::spawn(async move {
while let Some(msg) = rx.next().await {
for conn in &mut conns {
// Unbounded channel under the hood, so this await
// shouldn't ever need to yield.
if let Err(e) = conn.send(msg.clone()).await {
log::error!("Aggregator connection has failed: {}", e);
return;
}
}
}
});
tx.sink_map_err(|e| anyhow::anyhow!("{}", e))
}
/// Return a sink that a feed can send messages into to be handled by a single aggregator.
pub fn subscribe_feed(
&self,
) -> (
u64,
impl Sink<inner_loop::FromFeedWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static,
) {
let last_val = self.0.next_idx.fetch_add(1, Ordering::Relaxed);
let this_idx = (last_val + 1) % self.0.aggregators.len();
self.0.aggregators[this_idx].subscribe_feed()
}
}
+2 -1
View File
@@ -15,9 +15,10 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
mod aggregator;
mod aggregator_set;
mod inner_loop;
// Expose the various message types that can be worked with externally:
pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket};
pub use aggregator::*;
pub use aggregator_set::*;
+16 -4
View File
@@ -22,7 +22,7 @@ use std::str::FromStr;
use tokio::time::{Duration, Instant};
use aggregator::{
Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket,
AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket,
};
use bincode::Options;
use common::http_utils;
@@ -63,6 +63,10 @@ struct Opts {
/// on the machine. If no value is given, use an internal default that we have deemed sane.
#[structopt(long)]
worker_threads: Option<usize>,
/// Each aggregator keeps track of the entire node state. Feed subscriptions are split across
/// aggregators.
#[structopt(long)]
num_aggregators: Option<usize>,
}
fn main() {
@@ -83,21 +87,29 @@ fn main() {
None => usize::min(num_cpus::get(), 8),
};
let num_aggregators = match opts.num_aggregators {
Some(0) => num_cpus::get(),
Some(n) => n,
// By default, we'll have half as many aggregator tasks
// running as we do worker threads (minimum 1).
None => usize::max(worker_threads / 2, 1),
};
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_threads)
.build()
.unwrap()
.block_on(async {
if let Err(e) = start_server(opts).await {
if let Err(e) = start_server(num_aggregators, opts).await {
log::error!("Error starting server: {}", e);
}
});
}
/// Declare our routes and start the server.
async fn start_server(opts: Opts) -> anyhow::Result<()> {
let aggregator = Aggregator::spawn(opts.denylist).await?;
async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> {
let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?;
let socket_addr = opts.socket;
let feed_timeout = opts.feed_timeout;