From 03e39cf5bcbb4405132f526c66ccc8f6306e7745 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 8 Jan 2021 16:08:59 -0500 Subject: [PATCH] subsystems have an unbounded channel to the overseer (#2236) * subsystems have an unbounded channel to the overseer * Update node/overseer/src/lib.rs Co-authored-by: Bernhard Schuster * bump Cargo.lock Co-authored-by: Bernhard Schuster --- polkadot/Cargo.lock | 1 - polkadot/node/overseer/Cargo.toml | 1 - polkadot/node/overseer/src/lib.rs | 138 +++++++++++------------------- 3 files changed, 52 insertions(+), 88 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 9800da4d0d..1d76f17a99 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5348,7 +5348,6 @@ dependencies = [ "polkadot-primitives", "sc-client-api", "sp-core", - "streamunordered", "tracing", "tracing-futures", ] diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 41c114f15d..0887dd3e3b 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -14,7 +14,6 @@ polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../pr polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } -streamunordered = "0.5.1" tracing = "0.1.22" tracing-futures = "0.2.4" diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 9c4f88a9ef..598f4121c7 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -70,12 +70,11 @@ use futures::channel::{mpsc, oneshot}; use futures::{ poll, select, future::BoxFuture, - stream::{self, FuturesUnordered}, + stream::{FuturesUnordered, Fuse}, Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; use oorandom::Rand32; -use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; @@ -324,7 +323,7 @@ impl From for MaybeTimed { #[derive(Debug)] pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, - tx: mpsc::Sender>, + tx: mpsc::UnboundedSender>, metrics: Metrics, rng: Rand32, threshold: u32, @@ -340,7 +339,7 @@ impl OverseerSubsystemContext { /// to the range `0.0..=1.0`. fn new( rx: mpsc::Receiver>, - tx: mpsc::Sender>, + tx: mpsc::UnboundedSender>, metrics: Metrics, increment: u64, mut capture_rate: f64, @@ -361,7 +360,10 @@ impl OverseerSubsystemContext { /// /// Intended for tests. #[allow(unused)] - fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { + fn new_unmetered( + rx: mpsc::Receiver>, + tx: mpsc::UnboundedSender>, + ) -> Self { let metrics = Metrics::default(); OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) } @@ -375,31 +377,6 @@ impl OverseerSubsystemContext { MaybeTimed { timer, t } } - - /// Make a standalone function which can construct a `MaybeTimed` wrapper around some `T` - /// without borrowing `self`. - /// - /// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff. - fn make_maybe_timed(&mut self) -> impl FnMut(T) -> MaybeTimed { - // We don't want to simply clone this RNG because we don't want to duplicate its state. - // It's not ever going to be used for cryptographic purposes, but it's still better to - // keep good habits. - let (seed, increment) = self.rng.state(); - let mut rng = Rand32::new_inc(seed, increment + 1); - - let metrics = self.metrics.clone(); - let threshold = self.threshold; - - move |t| { - let timer = if rng.rand_u32() <= threshold { - metrics.time_message_hold() - } else { - None - }; - - MaybeTimed { timer, t } - } - } } #[async_trait::async_trait] @@ -428,7 +405,7 @@ impl SubsystemContext for OverseerSubsystemContext { self.send_timed(ToOverseer::SpawnJob { name, s, - }).await.map_err(Into::into) + }).map_err(|s| s.into_send_error().into()) } async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) @@ -437,23 +414,25 @@ impl SubsystemContext for OverseerSubsystemContext { self.send_timed(ToOverseer::SpawnBlockingJob { name, s, - }).await.map_err(Into::into) + }).map_err(|s| s.into_send_error().into()) } async fn send_message(&mut self, msg: AllMessages) { - self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await + self.send_and_log_error(ToOverseer::SubsystemMessage(msg)) } async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send { - self.send_all_timed_or_log(msgs).await + for msg in msgs { + self.send_and_log_error(ToOverseer::SubsystemMessage(msg)); + } } } impl OverseerSubsystemContext { - async fn send_and_log_error(&mut self, msg: ToOverseer) { - if self.send_timed(msg).await.is_err() { + fn send_and_log_error(&mut self, msg: ToOverseer) { + if self.send_timed(msg).is_err() { tracing::debug!( target: LOG_TARGET, msg_type = std::any::type_name::(), @@ -462,33 +441,13 @@ impl OverseerSubsystemContext { } } - async fn send_timed(&mut self, msg: ToOverseer) -> Result< + fn send_timed(&mut self, msg: ToOverseer) -> Result< (), - > as futures::Sink>>::Error + mpsc::TrySendError>, > { let msg = self.maybe_timed(msg); - self.tx.send(msg).await - } - - async fn send_all_timed_or_log(&mut self, msgs: Msgs) - where - Msgs: IntoIterator + Send, - Msgs::IntoIter: Send, - Msg: Into + Send, - { - let mut maybe_timed = self.make_maybe_timed(); - let mut msgs = stream::iter( - msgs.into_iter() - .map(move |msg| Ok(maybe_timed(ToOverseer::SubsystemMessage(msg.into())))) - ); - if self.tx.send_all(&mut msgs).await.is_err() { - tracing::debug!( - target: LOG_TARGET, - msg_type = std::any::type_name::(), - "Failed to send messages to Overseer", - ); - } + self.tx.unbounded_send(msg) } } @@ -600,7 +559,7 @@ pub struct Overseer { running_subsystems: FuturesUnordered>>, /// Gather running subsystems' outbound streams into one. - running_subsystems_rx: StreamUnordered>>, + to_overseer_rx: Fuse>>, /// Events that are sent to the overseer from the outside world events_rx: mpsc::Receiver, @@ -1291,7 +1250,7 @@ where let metrics = ::register(prometheus_registry)?; - let mut running_subsystems_rx = StreamUnordered::new(); + let (to_overseer_tx, to_overseer_rx) = mpsc::unbounded(); let mut running_subsystems = FuturesUnordered::new(); let mut seed = 0x533d; // arbitrary @@ -1299,7 +1258,7 @@ where let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.candidate_validation, &metrics, &mut seed, @@ -1308,7 +1267,7 @@ where let candidate_backing_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.candidate_backing, &metrics, &mut seed, @@ -1317,7 +1276,7 @@ where let candidate_selection_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.candidate_selection, &metrics, &mut seed, @@ -1326,7 +1285,7 @@ where let statement_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.statement_distribution, &metrics, &mut seed, @@ -1335,7 +1294,7 @@ where let availability_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.availability_distribution, &metrics, &mut seed, @@ -1344,7 +1303,7 @@ where let bitfield_signing_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.bitfield_signing, &metrics, &mut seed, @@ -1353,7 +1312,7 @@ where let bitfield_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.bitfield_distribution, &metrics, &mut seed, @@ -1362,7 +1321,7 @@ where let provisioner_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.provisioner, &metrics, &mut seed, @@ -1371,7 +1330,7 @@ where let pov_distribution_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.pov_distribution, &metrics, &mut seed, @@ -1380,7 +1339,7 @@ where let runtime_api_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.runtime_api, &metrics, &mut seed, @@ -1389,7 +1348,7 @@ where let availability_store_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.availability_store, &metrics, &mut seed, @@ -1398,7 +1357,7 @@ where let network_bridge_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.network_bridge, &metrics, &mut seed, @@ -1407,7 +1366,7 @@ where let chain_api_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.chain_api, &metrics, &mut seed, @@ -1416,7 +1375,7 @@ where let collation_generation_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.collation_generation, &metrics, &mut seed, @@ -1426,7 +1385,7 @@ where let collator_protocol_subsystem = spawn( &mut s, &mut running_subsystems, - &mut running_subsystems_rx, + to_overseer_tx.clone(), all_subsystems.collator_protocol, &metrics, &mut seed, @@ -1458,7 +1417,7 @@ where collator_protocol_subsystem, s, running_subsystems, - running_subsystems_rx, + to_overseer_rx: to_overseer_rx.fuse(), events_rx, activation_external_listeners, leaves, @@ -1546,11 +1505,14 @@ where } } }, - msg = self.running_subsystems_rx.next().fuse() => { - let MaybeTimed { timer, t: msg } = if let Some((StreamYield::Item(msg), _)) = msg { - msg - } else { - continue + msg = self.to_overseer_rx.next() => { + let MaybeTimed { timer, t: msg } = match msg { + Some(m) => m, + None => { + // This is a fused stream so we will shut down after receiving all + // shutdown notifications. + continue + } }; match msg { @@ -1773,14 +1735,19 @@ where fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>>, - streams: &mut StreamUnordered>>, + to_overseer: mpsc::UnboundedSender>, s: impl Subsystem>, metrics: &Metrics, seed: &mut u64, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); - let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), *seed, MESSAGE_TIMER_METRIC_CAPTURE_RATE); + let ctx = OverseerSubsystemContext::new( + to_rx, + to_overseer, + metrics.clone(), + *seed, + MESSAGE_TIMER_METRIC_CAPTURE_RATE, + ); let SpawnedSubsystem { future, name } = s.start(ctx); // increment the seed now that it's been used, so the next context will have its own distinct RNG @@ -1799,7 +1766,6 @@ fn spawn( spawner.spawn(name, fut); - let _ = streams.push(from_rx); futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) }))); let instance = Some(SubsystemInstance {