diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 3ed9d4ca2b..09cdbe1930 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4236,12 +4236,6 @@ dependencies = [ "parking_lot 0.11.1", ] -[[package]] -name = "oorandom" -version = "11.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" - [[package]] name = "opaque-debug" version = "0.2.3" @@ -5886,12 +5880,12 @@ dependencies = [ name = "polkadot-overseer" version = "0.1.0" dependencies = [ + "assert_matches", "async-trait", "femme", "futures 0.3.13", "futures-timer 3.0.2", "kv-log-macro", - "oorandom", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index 66ecf30202..5ad1fae420 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -25,10 +25,9 @@ use super::Meter; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. -pub fn channel(capacity: usize, name: &'static str) -> (MeteredSender, MeteredReceiver) { +pub fn channel(capacity: usize) -> (MeteredSender, MeteredReceiver) { let (tx, rx) = mpsc::channel(capacity); - let mut shared_meter = Meter::default(); - shared_meter.name = name; + let shared_meter = Meter::default(); let tx = MeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = MeteredReceiver { meter: shared_meter, inner: rx }; (tx, rx) diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs index 2329e16482..e2fc0d84b5 100644 --- a/polkadot/node/metered-channel/src/lib.rs +++ b/polkadot/node/metered-channel/src/lib.rs @@ -30,8 +30,6 @@ pub use self::unbounded::*; /// A peek into the inner state of a meter. #[derive(Debug, Clone, Default)] pub struct Meter { - /// Name of the receiver and sender pair. - name: &'static str, // Number of sends on this channel. sent: Arc, // Number of receives on this channel. @@ -60,11 +58,6 @@ impl Meter { } } - /// Obtain the name of the channel `Sender` and `Receiver` pair. - pub fn name(&self) -> &'static str { - self.name - } - fn note_sent(&self) { self.sent.fetch_add(1, Ordering::Relaxed); } @@ -92,7 +85,7 @@ mod tests { #[test] fn try_send_try_next() { block_on(async move { - let (mut tx, mut rx) = channel::(5, "goofy"); + let (mut tx, mut rx) = channel::(5); let msg = Msg::default(); assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 }); tx.try_send(msg).unwrap(); @@ -116,7 +109,7 @@ mod tests { fn with_tasks() { let (ready, go) = futures::channel::oneshot::channel(); - let (mut tx, mut rx) = channel::(5, "goofy"); + let (mut tx, mut rx) = channel::(5); block_on(async move { futures::join!( async move { @@ -149,7 +142,7 @@ mod tests { #[test] fn stream_and_sink() { - let (mut tx, mut rx) = channel::(5, "goofy"); + let (mut tx, mut rx) = channel::(5); block_on(async move { futures::join!( @@ -175,8 +168,8 @@ mod tests { #[test] fn failed_send_does_not_inc_sent() { - let (mut bounded, _) = channel::(5, "pluto"); - let (mut unbounded, _) = unbounded::("pluto"); + let (mut bounded, _) = channel::(5); + let (mut unbounded, _) = unbounded::(); block_on(async move { assert!(bounded.send(Msg::default()).await.is_err()); diff --git a/polkadot/node/metered-channel/src/unbounded.rs b/polkadot/node/metered-channel/src/unbounded.rs index 1d98b18dbe..242b9198f4 100644 --- a/polkadot/node/metered-channel/src/unbounded.rs +++ b/polkadot/node/metered-channel/src/unbounded.rs @@ -25,10 +25,9 @@ use super::Meter; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. -pub fn unbounded(name: &'static str) -> (UnboundedMeteredSender, UnboundedMeteredReceiver) { +pub fn unbounded() -> (UnboundedMeteredSender, UnboundedMeteredReceiver) { let (tx, rx) = mpsc::unbounded(); - let mut shared_meter = Meter::default(); - shared_meter.name = name; + let shared_meter = Meter::default(); let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx }; (tx, rx) @@ -147,7 +146,7 @@ impl UnboundedMeteredSender { /// Attempt to send message or fail immediately. - pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError> { + pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError> { self.meter.note_sent(); self.inner.unbounded_send(msg).map_err(|e| { self.meter.retract_sent(); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 6c79773bc1..62dc5d8172 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -734,7 +734,7 @@ mod tests { TestAuthorityDiscovery, ) { let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); - let (action_tx, action_rx) = metered::unbounded("test_action"); + let (action_tx, action_rx) = metered::unbounded(); ( TestNetwork { diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 9a3ef00c64..1c0401a536 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -9,7 +9,6 @@ async-trait = "0.1.42" client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.12" futures-timer = "3.0.2" -oorandom = "11.1.3" polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } @@ -20,6 +19,6 @@ tracing = "0.1.25" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-network-protocol = { path = "../network/protocol" } futures = { version = "0.3.12", features = ["thread-pool"] } -futures-timer = "3.0.2" femme = "2.1.1" kv-log-macro = "1.0.7" +assert_matches = "1.4.0" diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 1661b6dd3d..214e085f1d 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -61,20 +61,19 @@ use std::fmt::{self, Debug}; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{atomic::{self, AtomicUsize}, Arc}; use std::task::Poll; use std::time::Duration; use std::collections::{hash_map, HashMap}; -use futures::channel::{oneshot, mpsc}; +use futures::channel::{oneshot}; use futures::{ poll, select, future::BoxFuture, - stream::{FuturesUnordered, Fuse}, + stream::{self, FuturesUnordered, Fuse}, Future, FutureExt, StreamExt, }; use futures_timer::Delay; -use oorandom::Rand32; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; @@ -89,8 +88,8 @@ use polkadot_subsystem::messages::{ ApprovalVotingMessage, GossipSupportMessage, }; pub use polkadot_subsystem::{ - Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger, + Subsystem, SubsystemContext, SubsystemSender, OverseerSignal, FromOverseer, SubsystemError, + SubsystemResult, SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger, }; use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome}; use polkadot_node_primitives::SpawnNamed; @@ -101,467 +100,6 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "parachain::overseer"; -// Rate at which messages are timed. -const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; - - - -/// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. -/// -/// It wraps a system-wide [`AllMessages`] type that represents all possible -/// messages in the system. -/// -/// [`AllMessages`]: enum.AllMessages.html -/// [`Subsystem`]: trait.Subsystem.html -/// [`Overseer`]: struct.Overseer.html -enum ToOverseer { - /// This is a message sent by a `Subsystem`. - SubsystemMessage(AllMessages), - - /// A message that wraps something the `Subsystem` is desiring to - /// spawn on the overseer and a `oneshot::Sender` to signal the result - /// of the spawn. - SpawnJob { - name: &'static str, - s: BoxFuture<'static, ()>, - }, - - /// Same as `SpawnJob` but for blocking tasks to be executed on a - /// dedicated thread pool. - SpawnBlockingJob { - name: &'static str, - s: BoxFuture<'static, ()>, - }, -} - -/// An event telling the `Overseer` on the particular block -/// that has been imported or finalized. -/// -/// This structure exists solely for the purposes of decoupling -/// `Overseer` code from the client code and the necessity to call -/// `HeaderBackend::block_number_from_id()`. -#[derive(Debug, Clone)] -pub struct BlockInfo { - /// hash of the block. - pub hash: Hash, - /// hash of the parent block. - pub parent_hash: Hash, - /// block's number. - pub number: BlockNumber, -} - -impl From> for BlockInfo { - fn from(n: BlockImportNotification) -> Self { - BlockInfo { - hash: n.hash, - parent_hash: n.header.parent_hash, - number: n.header.number, - } - } -} - -impl From> for BlockInfo { - fn from(n: FinalityNotification) -> Self { - BlockInfo { - hash: n.hash, - parent_hash: n.header.parent_hash, - number: n.header.number, - } - } -} - -/// Some event from the outer world. -enum Event { - BlockImported(BlockInfo), - BlockFinalized(BlockInfo), - MsgToSubsystem(AllMessages), - ExternalRequest(ExternalRequest), - Stop, -} - -/// Some request from outer world. -enum ExternalRequest { - WaitForActivation { - hash: Hash, - response_channel: oneshot::Sender>, - }, -} - -/// A handler used to communicate with the [`Overseer`]. -/// -/// [`Overseer`]: struct.Overseer.html -#[derive(Clone)] -pub struct OverseerHandler { - events_tx: metered::MeteredSender, -} - -impl OverseerHandler { - /// Inform the `Overseer` that that some block was imported. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn block_imported(&mut self, block: BlockInfo) { - self.send_and_log_error(Event::BlockImported(block)).await - } - - /// Send some message to one of the `Subsystem`s. - #[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))] - pub async fn send_msg(&mut self, msg: impl Into) { - self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await - } - - /// Inform the `Overseer` that some block was finalized. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn block_finalized(&mut self, block: BlockInfo) { - self.send_and_log_error(Event::BlockFinalized(block)).await - } - - /// Wait for a block with the given hash to be in the active-leaves set. - /// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals. - /// - /// The response channel responds if the hash was activated and is closed if the hash was deactivated. - /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, - /// the response channel may never return if the hash was deactivated before this call. - /// In this case, it's the caller's responsibility to ensure a timeout is set. - #[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))] - pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender>) { - self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { - hash, - response_channel - })).await - } - - /// Tell `Overseer` to shutdown. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn stop(&mut self) { - self.send_and_log_error(Event::Stop).await - } - - async fn send_and_log_error(&mut self, event: Event) { - if self.events_tx.send(event).await.is_err() { - tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); - } - } -} - -/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding -/// import and finality notifications into the [`OverseerHandler`]. -/// -/// [`Overseer`]: struct.Overseer.html -/// [`OverseerHandler`]: struct.OverseerHandler.html -pub async fn forward_events>( - client: Arc

, - mut handler: OverseerHandler, -) { - let mut finality = client.finality_notification_stream(); - let mut imports = client.import_notification_stream(); - - loop { - select! { - f = finality.next() => { - match f { - Some(block) => { - handler.block_finalized(block.into()).await; - } - None => break, - } - }, - i = imports.next() => { - match i { - Some(block) => { - handler.block_imported(block.into()).await; - } - None => break, - } - }, - complete => break, - } - } -} - -impl Debug for ToOverseer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ToOverseer::SubsystemMessage(msg) => { - write!(f, "OverseerMessage::SubsystemMessage({:?})", msg) - } - ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"), - ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)") - } - } -} - -/// A running instance of some [`Subsystem`]. -/// -/// [`Subsystem`]: trait.Subsystem.html -struct SubsystemInstance { - tx: metered::MeteredSender>, - name: &'static str, -} - -type MaybeTimer = Option; - -#[derive(Debug)] -struct MaybeTimed { - timer: MaybeTimer, - t: T, -} - -impl MaybeTimed { - fn into_inner(self) -> T { - self.t - } -} - -impl From for MaybeTimed { - fn from(t: T) -> Self { - Self { timer: None, t } - } -} - -/// A context type that is given to the [`Subsystem`] upon spawning. -/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s -/// or to spawn it's [`SubsystemJob`]s. -/// -/// [`Overseer`]: struct.Overseer.html -/// [`Subsystem`]: trait.Subsystem.html -/// [`SubsystemJob`]: trait.SubsystemJob.html -#[derive(Debug)] -pub struct OverseerSubsystemContext{ - rx: metered::MeteredReceiver>, - tx: metered::UnboundedMeteredSender>, - metrics: Metrics, - rng: Rand32, - threshold: u32, -} - -impl OverseerSubsystemContext { - /// Create a new `OverseerSubsystemContext`. - /// - /// `increment` determines the initial increment of the internal RNG. - /// The internal RNG is used to determine which messages are timed. - /// - /// `capture_rate` determines what fraction of messages are timed. Its value is clamped - /// to the range `0.0..=1.0`. - fn new( - rx: metered::MeteredReceiver>, - tx: metered::UnboundedMeteredSender>, - metrics: Metrics, - increment: u64, - mut capture_rate: f64, - ) -> Self { - let rng = Rand32::new_inc(0, increment); - - if capture_rate < 0.0 { - capture_rate = 0.0; - } else if capture_rate > 1.0 { - capture_rate = 1.0; - } - let threshold = (capture_rate * u32::MAX as f64) as u32; - - OverseerSubsystemContext { rx, tx, metrics, rng, threshold } - } - - /// Create a new `OverseserSubsystemContext` with no metering. - /// - /// Intended for tests. - #[allow(unused)] - fn new_unmetered( - rx: metered::MeteredReceiver>, - tx: metered::UnboundedMeteredSender>, - ) -> Self { - let metrics = Metrics::default(); - OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) - } - - fn maybe_timed(&mut self, t: T) -> MaybeTimed { - let timer = if self.rng.rand_u32() <= self.threshold { - self.metrics.time_message_hold() - } else { - None - }; - - MaybeTimed { timer, t } - } -} - -#[async_trait::async_trait] -impl SubsystemContext for OverseerSubsystemContext { - type Message = M; - - async fn try_recv(&mut self) -> Result>, ()> { - match poll!(self.rx.next()) { - Poll::Ready(Some(msg)) => Ok(Some(msg)), - Poll::Ready(None) => Err(()), - Poll::Pending => Ok(None), - } - } - - async fn recv(&mut self) -> SubsystemResult> { - self.rx.next().await - .ok_or(SubsystemError::Context( - "No more messages in rx queue to process" - .to_owned() - )) - } - - async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) - -> SubsystemResult<()> - { - self.send_timed(ToOverseer::SpawnJob { - name, - s, - }).map_err(|s| s.into_send_error().into()) - } - - async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) - -> SubsystemResult<()> - { - self.send_timed(ToOverseer::SpawnBlockingJob { - name, - s, - }).map_err(|s| s.into_send_error().into()) - } - - async fn send_message(&mut self, msg: AllMessages) { - self.send_and_log_error(ToOverseer::SubsystemMessage(msg)) - } - - async fn send_messages(&mut self, msgs: T) - where T: IntoIterator + Send, T::IntoIter: Send - { - for msg in msgs { - self.send_and_log_error(ToOverseer::SubsystemMessage(msg)); - } - } -} - -impl OverseerSubsystemContext { - fn send_and_log_error(&mut self, msg: ToOverseer) { - if self.send_timed(msg).is_err() { - tracing::debug!( - target: LOG_TARGET, - msg_type = std::any::type_name::(), - "Failed to send a message to Overseer", - ); - } - } - - fn send_timed(&mut self, msg: ToOverseer) -> Result< - (), - mpsc::TrySendError>, - > - { - let msg = self.maybe_timed(msg); - self.tx.unbounded_send(msg) - } -} - -/// A subsystem that we oversee. -/// -/// Ties together the [`Subsystem`] itself and it's running instance -/// (which may be missing if the [`Subsystem`] is not running at the moment -/// for whatever reason). -/// -/// [`Subsystem`]: trait.Subsystem.html -struct OverseenSubsystem { - instance: Option>, -} - -impl OverseenSubsystem { - /// Send a message to the wrapped subsystem. - /// - /// If the inner `instance` is `None`, nothing is happening. - async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { - const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); - - if let Some(ref mut instance) = self.instance { - match instance.tx.send( - FromOverseer::Communication { msg } - ).timeout(MESSAGE_TIMEOUT).await - { - None => { - tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); - Err(SubsystemError::SubsystemStalled(instance.name)) - } - Some(res) => res.map_err(Into::into), - } - } else { - Ok(()) - } - } - - /// Send a signal to the wrapped subsystem. - /// - /// If the inner `instance` is `None`, nothing is happening. - async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); - - if let Some(ref mut instance) = self.instance { - match instance.tx.send(FromOverseer::Signal(signal)).timeout(SIGNAL_TIMEOUT).await { - None => { - tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); - Err(SubsystemError::SubsystemStalled(instance.name)) - } - Some(res) => res.map_err(Into::into), - } - } else { - Ok(()) - } - } -} - -/// The `Overseer` itself. -pub struct Overseer { - /// Handles to all subsystems. - subsystems: AllSubsystems< - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - OverseenSubsystem, - >, - - /// Spawner to spawn tasks to. - s: S, - - /// Here we keep handles to spawned subsystems to be notified when they terminate. - running_subsystems: FuturesUnordered>>, - - /// Gather running subsystems' outbound streams into one. - to_overseer_rx: Fuse>>, - - /// Events that are sent to the overseer from the outside world - events_rx: metered::MeteredReceiver, - - /// External listeners waiting for a hash to be in the active-leave set. - activation_external_listeners: HashMap>>>, - - /// Stores the [`jaeger::Span`] per active leaf. - span_per_active_leaf: HashMap>, - - /// A set of leaves that `Overseer` starts working with. - /// - /// Drained at the beginning of `run` and never used again. - leaves: Vec<(Hash, BlockNumber)>, - - /// The set of the "active leaves". - active_leaves: HashMap, - - /// Various Prometheus metrics. - metrics: Metrics, -} trait MapSubsystem { type Output; @@ -585,6 +123,7 @@ impl MapSubsystem for F where F: Fn(T) -> U { /// Each [`Subsystem`] is supposed to implement some interface that is generic over /// message type that is specific to this [`Subsystem`]. At the moment not all /// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. +#[derive(Debug, Clone)] pub struct AllSubsystems< CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (), @@ -1265,17 +804,779 @@ type AllSubsystemsSame = AllSubsystems< T, T, T, >; +/// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. +/// +/// It wraps a system-wide [`AllMessages`] type that represents all possible +/// messages in the system. +/// +/// [`AllMessages`]: enum.AllMessages.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`Overseer`]: struct.Overseer.html +enum ToOverseer { + /// A message that wraps something the `Subsystem` is desiring to + /// spawn on the overseer and a `oneshot::Sender` to signal the result + /// of the spawn. + SpawnJob { + name: &'static str, + s: BoxFuture<'static, ()>, + }, + + /// Same as `SpawnJob` but for blocking tasks to be executed on a + /// dedicated thread pool. + SpawnBlockingJob { + name: &'static str, + s: BoxFuture<'static, ()>, + }, +} + +/// An event telling the `Overseer` on the particular block +/// that has been imported or finalized. +/// +/// This structure exists solely for the purposes of decoupling +/// `Overseer` code from the client code and the necessity to call +/// `HeaderBackend::block_number_from_id()`. +#[derive(Debug, Clone)] +pub struct BlockInfo { + /// hash of the block. + pub hash: Hash, + /// hash of the parent block. + pub parent_hash: Hash, + /// block's number. + pub number: BlockNumber, +} + +impl From> for BlockInfo { + fn from(n: BlockImportNotification) -> Self { + BlockInfo { + hash: n.hash, + parent_hash: n.header.parent_hash, + number: n.header.number, + } + } +} + +impl From> for BlockInfo { + fn from(n: FinalityNotification) -> Self { + BlockInfo { + hash: n.hash, + parent_hash: n.header.parent_hash, + number: n.header.number, + } + } +} + +/// Some event from the outer world. +enum Event { + BlockImported(BlockInfo), + BlockFinalized(BlockInfo), + MsgToSubsystem(AllMessages), + ExternalRequest(ExternalRequest), + Stop, +} + +/// Some request from outer world. +enum ExternalRequest { + WaitForActivation { + hash: Hash, + response_channel: oneshot::Sender>, + }, +} + +/// A handler used to communicate with the [`Overseer`]. +/// +/// [`Overseer`]: struct.Overseer.html +#[derive(Clone)] +pub struct OverseerHandler { + events_tx: metered::MeteredSender, +} + +impl OverseerHandler { + /// Inform the `Overseer` that that some block was imported. + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + pub async fn block_imported(&mut self, block: BlockInfo) { + self.send_and_log_error(Event::BlockImported(block)).await + } + + /// Send some message to one of the `Subsystem`s. + #[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))] + pub async fn send_msg(&mut self, msg: impl Into) { + self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await + } + + /// Inform the `Overseer` that some block was finalized. + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + pub async fn block_finalized(&mut self, block: BlockInfo) { + self.send_and_log_error(Event::BlockFinalized(block)).await + } + + /// Wait for a block with the given hash to be in the active-leaves set. + /// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals. + /// + /// The response channel responds if the hash was activated and is closed if the hash was deactivated. + /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, + /// the response channel may never return if the hash was deactivated before this call. + /// In this case, it's the caller's responsibility to ensure a timeout is set. + #[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))] + pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender>) { + self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { + hash, + response_channel + })).await + } + + /// Tell `Overseer` to shutdown. + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + pub async fn stop(&mut self) { + self.send_and_log_error(Event::Stop).await + } + + async fn send_and_log_error(&mut self, event: Event) { + if self.events_tx.send(event).await.is_err() { + tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); + } + } +} + +/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding +/// import and finality notifications into the [`OverseerHandler`]. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`OverseerHandler`]: struct.OverseerHandler.html +pub async fn forward_events>( + client: Arc

, + mut handler: OverseerHandler, +) { + let mut finality = client.finality_notification_stream(); + let mut imports = client.import_notification_stream(); + + loop { + select! { + f = finality.next() => { + match f { + Some(block) => { + handler.block_finalized(block.into()).await; + } + None => break, + } + }, + i = imports.next() => { + match i { + Some(block) => { + handler.block_imported(block.into()).await; + } + None => break, + } + }, + complete => break, + } + } +} + +impl Debug for ToOverseer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"), + ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)") + } + } +} + +/// A running instance of some [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +struct SubsystemInstance { + tx_signal: metered::MeteredSender, + tx_bounded: metered::MeteredSender>, + meters: SubsystemMeters, + signals_received: usize, + name: &'static str, +} + +#[derive(Debug)] +struct MessagePacket { + signals_received: usize, + message: T, +} + +fn make_packet(signals_received: usize, message: T) -> MessagePacket { + MessagePacket { + signals_received, + message, + } +} + +// The channels held by every subsystem to communicate with every other subsystem. +#[derive(Debug, Clone)] +struct ChannelsOut { + candidate_validation: metered::MeteredSender>, + candidate_backing: metered::MeteredSender>, + candidate_selection: metered::MeteredSender>, + statement_distribution: metered::MeteredSender>, + availability_distribution: metered::MeteredSender>, + availability_recovery: metered::MeteredSender>, + bitfield_signing: metered::MeteredSender>, + bitfield_distribution: metered::MeteredSender>, + provisioner: metered::MeteredSender>, + runtime_api: metered::MeteredSender>, + availability_store: metered::MeteredSender>, + network_bridge: metered::MeteredSender>, + chain_api: metered::MeteredSender>, + collation_generation: metered::MeteredSender>, + collator_protocol: metered::MeteredSender>, + approval_distribution: metered::MeteredSender>, + approval_voting: metered::MeteredSender>, + gossip_support: metered::MeteredSender>, + + candidate_validation_unbounded: metered::UnboundedMeteredSender>, + candidate_backing_unbounded: metered::UnboundedMeteredSender>, + candidate_selection_unbounded: metered::UnboundedMeteredSender>, + statement_distribution_unbounded: metered::UnboundedMeteredSender>, + availability_distribution_unbounded: metered::UnboundedMeteredSender>, + availability_recovery_unbounded: metered::UnboundedMeteredSender>, + bitfield_signing_unbounded: metered::UnboundedMeteredSender>, + bitfield_distribution_unbounded: metered::UnboundedMeteredSender>, + provisioner_unbounded: metered::UnboundedMeteredSender>, + runtime_api_unbounded: metered::UnboundedMeteredSender>, + availability_store_unbounded: metered::UnboundedMeteredSender>, + network_bridge_unbounded: metered::UnboundedMeteredSender>, + chain_api_unbounded: metered::UnboundedMeteredSender>, + collation_generation_unbounded: metered::UnboundedMeteredSender>, + collator_protocol_unbounded: metered::UnboundedMeteredSender>, + approval_distribution_unbounded: metered::UnboundedMeteredSender>, + approval_voting_unbounded: metered::UnboundedMeteredSender>, + gossip_support_unbounded: metered::UnboundedMeteredSender>, +} + +impl ChannelsOut { + async fn send_and_log_error( + &mut self, + signals_received: usize, + message: AllMessages, + ) { + let res = match message { + AllMessages::CandidateValidation(msg) => { + self.candidate_validation.send(make_packet(signals_received, msg)).await + }, + AllMessages::CandidateBacking(msg) => { + self.candidate_backing.send(make_packet(signals_received, msg)).await + }, + AllMessages::CandidateSelection(msg) => { + self.candidate_selection.send(make_packet(signals_received, msg)).await + }, + AllMessages::StatementDistribution(msg) => { + self.statement_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::AvailabilityDistribution(msg) => { + self.availability_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::AvailabilityRecovery(msg) => { + self.availability_recovery.send(make_packet(signals_received, msg)).await + }, + AllMessages::BitfieldDistribution(msg) => { + self.bitfield_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::BitfieldSigning(msg) => { + self.bitfield_signing.send(make_packet(signals_received, msg)).await + }, + AllMessages::Provisioner(msg) => { + self.provisioner.send(make_packet(signals_received, msg)).await + }, + AllMessages::RuntimeApi(msg) => { + self.runtime_api.send(make_packet(signals_received, msg)).await + }, + AllMessages::AvailabilityStore(msg) => { + self.availability_store.send(make_packet(signals_received, msg)).await + }, + AllMessages::NetworkBridge(msg) => { + self.network_bridge.send(make_packet(signals_received, msg)).await + }, + AllMessages::ChainApi(msg) => { + self.chain_api.send(make_packet(signals_received, msg)).await + }, + AllMessages::CollationGeneration(msg) => { + self.collation_generation.send(make_packet(signals_received, msg)).await + }, + AllMessages::CollatorProtocol(msg) => { + self.collator_protocol.send(make_packet(signals_received, msg)).await + }, + AllMessages::ApprovalDistribution(msg) => { + self.approval_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::ApprovalVoting(msg) => { + self.approval_voting.send(make_packet(signals_received, msg)).await + }, + AllMessages::GossipSupport(msg) => { + self.gossip_support.send(make_packet(signals_received, msg)).await + }, + }; + + if res.is_err() { + tracing::debug!( + target: LOG_TARGET, + "Failed to send a message to another subsystem", + ); + } + } + + + fn send_unbounded_and_log_error( + &self, + signals_received: usize, + message: AllMessages, + ) { + let res = match message { + AllMessages::CandidateValidation(msg) => { + self.candidate_validation_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CandidateBacking(msg) => { + self.candidate_backing_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CandidateSelection(msg) => { + self.candidate_selection_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::StatementDistribution(msg) => { + self.statement_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::AvailabilityDistribution(msg) => { + self.availability_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::AvailabilityRecovery(msg) => { + self.availability_recovery_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::BitfieldDistribution(msg) => { + self.bitfield_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::BitfieldSigning(msg) => { + self.bitfield_signing_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::Provisioner(msg) => { + self.provisioner_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::RuntimeApi(msg) => { + self.runtime_api_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::AvailabilityStore(msg) => { + self.availability_store_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::NetworkBridge(msg) => { + self.network_bridge_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::ChainApi(msg) => { + self.chain_api_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CollationGeneration(msg) => { + self.collation_generation_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CollatorProtocol(msg) => { + self.collator_protocol_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::ApprovalDistribution(msg) => { + self.approval_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::ApprovalVoting(msg) => { + self.approval_voting_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::GossipSupport(msg) => { + self.gossip_support_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + }; + + if res.is_err() { + tracing::debug!( + target: LOG_TARGET, + "Failed to send a message to another subsystem", + ); + } + } +} + +type SubsystemIncomingMessages = stream::Select< + metered::MeteredReceiver>, + metered::UnboundedMeteredReceiver>, +>; + +#[derive(Debug, Default, Clone)] +struct SignalsReceived(Arc); + +impl SignalsReceived { + fn load(&self) -> usize { + self.0.load(atomic::Ordering::SeqCst) + } + + fn inc(&self) { + self.0.fetch_add(1, atomic::Ordering::SeqCst); + } +} + +/// A sender from subsystems to other subsystems. +#[derive(Debug, Clone)] +pub struct OverseerSubsystemSender { + channels: ChannelsOut, + signals_received: SignalsReceived, +} + +#[async_trait::async_trait] +impl SubsystemSender for OverseerSubsystemSender { + async fn send_message(&mut self, msg: AllMessages) { + self.channels.send_and_log_error(self.signals_received.load(), msg).await; + } + + async fn send_messages(&mut self, msgs: T) + where T: IntoIterator + Send, T::IntoIter: Send + { + // This can definitely be optimized if necessary. + for msg in msgs { + self.send_message(msg).await; + } + } + + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.channels.send_unbounded_and_log_error(self.signals_received.load(), msg); + } +} + +/// A context type that is given to the [`Subsystem`] upon spawning. +/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s +/// or to spawn it's [`SubsystemJob`]s. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`SubsystemJob`]: trait.SubsystemJob.html +#[derive(Debug)] +pub struct OverseerSubsystemContext{ + signals: metered::MeteredReceiver, + messages: SubsystemIncomingMessages, + to_subsystems: OverseerSubsystemSender, + to_overseer: metered::UnboundedMeteredSender, + signals_received: SignalsReceived, + pending_incoming: Option<(usize, M)>, + metrics: Metrics, +} + +impl OverseerSubsystemContext { + /// Create a new `OverseerSubsystemContext`. + fn new( + signals: metered::MeteredReceiver, + messages: SubsystemIncomingMessages, + to_subsystems: ChannelsOut, + to_overseer: metered::UnboundedMeteredSender, + metrics: Metrics, + ) -> Self { + let signals_received = SignalsReceived::default(); + OverseerSubsystemContext { + signals, + messages, + to_subsystems: OverseerSubsystemSender { + channels: to_subsystems, + signals_received: signals_received.clone(), + }, + to_overseer, + signals_received, + pending_incoming: None, + metrics, + } + } + + /// Create a new `OverseserSubsystemContext` with no metering. + /// + /// Intended for tests. + #[allow(unused)] + fn new_unmetered( + signals: metered::MeteredReceiver, + messages: SubsystemIncomingMessages, + to_subsystems: ChannelsOut, + to_overseer: metered::UnboundedMeteredSender, + ) -> Self { + let metrics = Metrics::default(); + OverseerSubsystemContext::new(signals, messages, to_subsystems, to_overseer, metrics) + } +} + +#[async_trait::async_trait] +impl SubsystemContext for OverseerSubsystemContext { + type Message = M; + type Sender = OverseerSubsystemSender; + + async fn try_recv(&mut self) -> Result>, ()> { + match poll!(self.recv()) { + Poll::Ready(msg) => Ok(Some(msg.map_err(|_| ())?)), + Poll::Pending => Ok(None), + } + } + + async fn recv(&mut self) -> SubsystemResult> { + loop { + // If we have a message pending an overseer signal, we only poll for signals + // in the meantime. + if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { + if needs_signals_received <= self.signals_received.load() { + return Ok(FromOverseer::Communication { msg }); + } else { + self.pending_incoming = Some((needs_signals_received, msg)); + + // wait for next signal. + let signal = self.signals.next().await + .ok_or(SubsystemError::Context( + "No more messages in rx queue to process" + .to_owned() + ))?; + + self.signals_received.inc(); + return Ok(FromOverseer::Signal(signal)) + } + } + + let mut await_message = self.messages.next().fuse(); + let mut await_signal = self.signals.next().fuse(); + let signals_received = self.signals_received.load(); + let pending_incoming = &mut self.pending_incoming; + + // Otherwise, wait for the next signal or incoming message. + let from_overseer = futures::select_biased! { + signal = await_signal => { + let signal = signal + .ok_or(SubsystemError::Context( + "No more messages in rx queue to process" + .to_owned() + ))?; + + FromOverseer::Signal(signal) + } + msg = await_message => { + let packet = msg + .ok_or(SubsystemError::Context( + "No more messages in rx queue to process" + .to_owned() + ))?; + + if packet.signals_received > signals_received { + // wait until we've received enough signals to return this message. + *pending_incoming = Some((packet.signals_received, packet.message)); + continue; + } else { + // we know enough to return this message. + FromOverseer::Communication { msg: packet.message} + } + } + }; + + if let FromOverseer::Signal(_) = from_overseer { + self.signals_received.inc(); + } + + return Ok(from_overseer); + } + } + + async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) + -> SubsystemResult<()> + { + self.to_overseer.send(ToOverseer::SpawnJob { + name, + s, + }).await.map_err(Into::into) + } + + async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) + -> SubsystemResult<()> + { + self.to_overseer.send(ToOverseer::SpawnBlockingJob { + name, + s, + }).await.map_err(Into::into) + } + + fn sender(&mut self) -> &mut OverseerSubsystemSender { + &mut self.to_subsystems + } +} + +/// A subsystem that we oversee. +/// +/// Ties together the [`Subsystem`] itself and it's running instance +/// (which may be missing if the [`Subsystem`] is not running at the moment +/// for whatever reason). +/// +/// [`Subsystem`]: trait.Subsystem.html +struct OverseenSubsystem { + instance: Option>, +} + +impl OverseenSubsystem { + /// Send a message to the wrapped subsystem. + /// + /// If the inner `instance` is `None`, nothing is happening. + async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { + const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); + + if let Some(ref mut instance) = self.instance { + match instance.tx_bounded.send(MessagePacket { + signals_received: instance.signals_received, + message: msg.into() + }).timeout(MESSAGE_TIMEOUT).await + { + None => { + tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + Err(SubsystemError::SubsystemStalled(instance.name)) + } + Some(res) => res.map_err(Into::into), + } + } else { + Ok(()) + } + } + + /// Send a signal to the wrapped subsystem. + /// + /// If the inner `instance` is `None`, nothing is happening. + async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { + const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); + + if let Some(ref mut instance) = self.instance { + match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await { + None => { + tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + Err(SubsystemError::SubsystemStalled(instance.name)) + } + Some(res) => { + let res = res.map_err(Into::into); + if res.is_ok() { + instance.signals_received += 1; + } + res + } + } + } else { + Ok(()) + } + } +} + +#[derive(Clone)] +struct SubsystemMeters { + bounded: metered::Meter, + unbounded: metered::Meter, + signals: metered::Meter, +} + +impl SubsystemMeters { + fn read(&self) -> SubsystemMeterReadouts { + SubsystemMeterReadouts { + bounded: self.bounded.read(), + unbounded: self.unbounded.read(), + signals: self.signals.read(), + } + } +} + +struct SubsystemMeterReadouts { + bounded: metered::Readout, + unbounded: metered::Readout, + signals: metered::Readout, +} + +/// The `Overseer` itself. +pub struct Overseer { + /// Handles to all subsystems. + subsystems: AllSubsystems< + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + OverseenSubsystem, + >, + + /// Spawner to spawn tasks to. + s: S, + + /// Here we keep handles to spawned subsystems to be notified when they terminate. + running_subsystems: FuturesUnordered>>, + + /// Gather running subsystems' outbound streams into one. + to_overseer_rx: Fuse>, + + /// Events that are sent to the overseer from the outside world + events_rx: metered::MeteredReceiver, + + /// External listeners waiting for a hash to be in the active-leave set. + activation_external_listeners: HashMap>>>, + + /// Stores the [`jaeger::Span`] per active leaf. + span_per_active_leaf: HashMap>, + + /// A set of leaves that `Overseer` starts working with. + /// + /// Drained at the beginning of `run` and never used again. + leaves: Vec<(Hash, BlockNumber)>, + + /// The set of the "active leaves". + active_leaves: HashMap, + + /// Various Prometheus metrics. + metrics: Metrics, +} + /// Overseer Prometheus metrics. #[derive(Clone)] struct MetricsInner { activated_heads_total: prometheus::Counter, deactivated_heads_total: prometheus::Counter, messages_relayed_total: prometheus::Counter, - message_relay_timings: prometheus::Histogram, - to_overseer_sent: prometheus::Gauge, - to_overseer_received: prometheus::Gauge, - from_overseer_sent: prometheus::GaugeVec, - from_overseer_received: prometheus::GaugeVec, + to_subsystem_bounded_sent: prometheus::GaugeVec, + to_subsystem_bounded_received: prometheus::GaugeVec, + to_subsystem_unbounded_sent: prometheus::GaugeVec, + to_subsystem_unbounded_received: prometheus::GaugeVec, + signals_sent: prometheus::GaugeVec, + signals_received: prometheus::GaugeVec, } #[derive(Default, Clone)] @@ -1300,27 +1601,31 @@ impl Metrics { } } - /// Provide a timer for the duration between receiving a message and passing it to `route_message` - fn time_message_hold(&self) -> MaybeTimer { - self.0.as_ref().map(|metrics| metrics.message_relay_timings.start_timer()) - } - fn channel_fill_level_snapshot( &self, - from_overseer: AllSubsystemsSame<(&'static str, metered::Readout)>, - to_overseer: metered::Readout, + to_subsystem: AllSubsystemsSame<(&'static str, SubsystemMeterReadouts)>, ) { self.0.as_ref().map(|metrics| { - from_overseer.map_subsystems(|(name, readout): (_, metered::Readout)| { - metrics.from_overseer_sent.with_label_values(&[name]) - .set(readout.sent as u64); + to_subsystem.map_subsystems( + |(name, readouts): (_, SubsystemMeterReadouts)| { + metrics.to_subsystem_bounded_sent.with_label_values(&[name]) + .set(readouts.bounded.sent as u64); - metrics.from_overseer_received.with_label_values(&[name]) - .set(readout.received as u64); - }); + metrics.to_subsystem_bounded_received.with_label_values(&[name]) + .set(readouts.bounded.received as u64); - metrics.to_overseer_sent.set(to_overseer.sent as u64); - metrics.to_overseer_received.set(to_overseer.received as u64); + metrics.to_subsystem_unbounded_sent.with_label_values(&[name]) + .set(readouts.unbounded.sent as u64); + + metrics.to_subsystem_unbounded_received.with_label_values(&[name]) + .set(readouts.unbounded.received as u64); + + metrics.signals_sent.with_label_values(&[name]) + .set(readouts.signals.sent as u64); + + metrics.signals_received.with_label_values(&[name]) + .set(readouts.signals.received as u64); + }); }); } } @@ -1349,33 +1654,11 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - message_relay_timings: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts { - common_opts: prometheus::Opts::new( - "parachain_overseer_messages_relay_timings", - "Time spent holding a message in the overseer before passing it to `route_message`", - ), - // guessing at the desired resolution, but we know that messages will time - // out after 0.5 seconds, so the bucket set below seems plausible: - // `0.0001 * (1.6 ^ 18) ~= 0.472`. Prometheus auto-generates a final bucket - // for all values between the final value and `+Inf`, so this should work. - // - // The documented legal range for the inputs are: - // - // - `> 0.0` - // - `> 1.0` - // - `! 0` - buckets: prometheus::exponential_buckets(0.0001, 1.6, 18).expect("inputs are within documented range; qed"), - } - )?, - registry, - )?, - from_overseer_sent: prometheus::register( + to_subsystem_bounded_sent: prometheus::register( prometheus::GaugeVec::::new( prometheus::Opts::new( - "parachain_from_overseer_sent", - "Number of elements sent by the overseer to subsystems", + "parachain_subsystem_bounded_sent", + "Number of elements sent to subsystems' bounded queues", ), &[ "subsystem_name", @@ -1383,11 +1666,11 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - from_overseer_received: prometheus::register( + to_subsystem_bounded_received: prometheus::register( prometheus::GaugeVec::::new( prometheus::Opts::new( - "parachain_from_overseer_received", - "Number of elements received by subsystems from overseer", + "parachain_subsystem_bounded_received", + "Number of elements received by subsystems' bounded queues", ), &[ "subsystem_name", @@ -1395,21 +1678,51 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - to_overseer_sent: prometheus::register( - prometheus::Gauge::::with_opts( + to_subsystem_unbounded_sent: prometheus::register( + prometheus::GaugeVec::::new( prometheus::Opts::new( - "parachain_to_overseer_sent", - "Number of elements sent by subsystems to overseer", + "parachain_subsystem_unbounded_sent", + "Number of elements sent to subsystems' unbounded queues", ), + &[ + "subsystem_name", + ], )?, registry, )?, - to_overseer_received: prometheus::register( - prometheus::Gauge::::with_opts( + to_subsystem_unbounded_received: prometheus::register( + prometheus::GaugeVec::::new( prometheus::Opts::new( - "parachain_to_overseer_received", - "Number of element received by overseer from subsystems", + "parachain_subsystem_unbounded_received", + "Number of elements received by subsystems' unbounded queues", ), + &[ + "subsystem_name", + ], + )?, + registry, + )?, + signals_sent: prometheus::register( + prometheus::GaugeVec::::new( + prometheus::Opts::new( + "parachain_overseer_signals_sent", + "Number of signals sent by overseer to subsystems", + ), + &[ + "subsystem_name", + ], + )?, + registry, + )?, + signals_received: prometheus::register( + prometheus::GaugeVec::::new( + prometheus::Opts::new( + "parachain_overseer_signals_received", + "Number of signals received by subsystems from overseer", + ), + &[ + "subsystem_name", + ], )?, registry, )?, @@ -1537,7 +1850,7 @@ where ApV: Subsystem> + Send, GS: Subsystem> + Send, { - let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY, "overseer_events"); + let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY); let handler = OverseerHandler { events_tx: events_tx.clone(), @@ -1545,190 +1858,355 @@ where let metrics = ::register(prometheus_registry)?; - let (to_overseer_tx, to_overseer_rx) = metered::unbounded("to_overseer"); + let (to_overseer_tx, to_overseer_rx) = metered::unbounded(); let mut running_subsystems = FuturesUnordered::new(); - let mut seed = 0x533d; // arbitrary + let (candidate_validation_bounded_tx, candidate_validation_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (candidate_backing_bounded_tx, candidate_backing_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (candidate_selection_bounded_tx, candidate_selection_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (statement_distribution_bounded_tx, statement_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (availability_distribution_bounded_tx, availability_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (availability_recovery_bounded_tx, availability_recovery_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (bitfield_signing_bounded_tx, bitfield_signing_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (bitfield_distribution_bounded_tx, bitfield_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (provisioner_bounded_tx, provisioner_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (runtime_api_bounded_tx, runtime_api_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (availability_store_bounded_tx, availability_store_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (network_bridge_bounded_tx, network_bridge_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (chain_api_bounded_tx, chain_api_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (collator_protocol_bounded_tx, collator_protocol_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (collation_generation_bounded_tx, collation_generation_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (approval_distribution_bounded_tx, approval_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (approval_voting_bounded_tx, approval_voting_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (gossip_support_bounded_tx, gossip_support_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + + let (candidate_validation_unbounded_tx, candidate_validation_unbounded_rx) + = metered::unbounded(); + let (candidate_backing_unbounded_tx, candidate_backing_unbounded_rx) + = metered::unbounded(); + let (candidate_selection_unbounded_tx, candidate_selection_unbounded_rx) + = metered::unbounded(); + let (statement_distribution_unbounded_tx, statement_distribution_unbounded_rx) + = metered::unbounded(); + let (availability_distribution_unbounded_tx, availability_distribution_unbounded_rx) + = metered::unbounded(); + let (availability_recovery_unbounded_tx, availability_recovery_unbounded_rx) + = metered::unbounded(); + let (bitfield_signing_unbounded_tx, bitfield_signing_unbounded_rx) + = metered::unbounded(); + let (bitfield_distribution_unbounded_tx, bitfield_distribution_unbounded_rx) + = metered::unbounded(); + let (provisioner_unbounded_tx, provisioner_unbounded_rx) + = metered::unbounded(); + let (runtime_api_unbounded_tx, runtime_api_unbounded_rx) + = metered::unbounded(); + let (availability_store_unbounded_tx, availability_store_unbounded_rx) + = metered::unbounded(); + let (network_bridge_unbounded_tx, network_bridge_unbounded_rx) + = metered::unbounded(); + let (chain_api_unbounded_tx, chain_api_unbounded_rx) + = metered::unbounded(); + let (collator_protocol_unbounded_tx, collator_protocol_unbounded_rx) + = metered::unbounded(); + let (collation_generation_unbounded_tx, collation_generation_unbounded_rx) + = metered::unbounded(); + let (approval_distribution_unbounded_tx, approval_distribution_unbounded_rx) + = metered::unbounded(); + let (approval_voting_unbounded_tx, approval_voting_unbounded_rx) + = metered::unbounded(); + let (gossip_support_unbounded_tx, gossip_support_unbounded_rx) + = metered::unbounded(); + + let channels_out = ChannelsOut { + candidate_validation: candidate_validation_bounded_tx.clone(), + candidate_backing: candidate_backing_bounded_tx.clone(), + candidate_selection: candidate_selection_bounded_tx.clone(), + statement_distribution: statement_distribution_bounded_tx.clone(), + availability_distribution: availability_distribution_bounded_tx.clone(), + availability_recovery: availability_recovery_bounded_tx.clone(), + bitfield_signing: bitfield_signing_bounded_tx.clone(), + bitfield_distribution: bitfield_distribution_bounded_tx.clone(), + provisioner: provisioner_bounded_tx.clone(), + runtime_api: runtime_api_bounded_tx.clone(), + availability_store: availability_store_bounded_tx.clone(), + network_bridge: network_bridge_bounded_tx.clone(), + chain_api: chain_api_bounded_tx.clone(), + collator_protocol: collator_protocol_bounded_tx.clone(), + collation_generation: collation_generation_bounded_tx.clone(), + approval_distribution: approval_distribution_bounded_tx.clone(), + approval_voting: approval_voting_bounded_tx.clone(), + gossip_support: gossip_support_bounded_tx.clone(), + + candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), + candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), + candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(), + statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(), + availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(), + availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(), + bitfield_signing_unbounded: bitfield_signing_unbounded_tx.clone(), + bitfield_distribution_unbounded: bitfield_distribution_unbounded_tx.clone(), + provisioner_unbounded: provisioner_unbounded_tx.clone(), + runtime_api_unbounded: runtime_api_unbounded_tx.clone(), + availability_store_unbounded: availability_store_unbounded_tx.clone(), + network_bridge_unbounded: network_bridge_unbounded_tx.clone(), + chain_api_unbounded: chain_api_unbounded_tx.clone(), + collator_protocol_unbounded: collator_protocol_unbounded_tx.clone(), + collation_generation_unbounded: collation_generation_unbounded_tx.clone(), + approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(), + approval_voting_unbounded: approval_voting_unbounded_tx.clone(), + gossip_support_unbounded: gossip_support_unbounded_tx.clone(), + }; let candidate_validation_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + candidate_validation_bounded_tx, + stream::select(candidate_validation_bounded_rx, candidate_validation_unbounded_rx), + candidate_validation_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.candidate_validation, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let candidate_backing_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + candidate_backing_bounded_tx, + stream::select(candidate_backing_bounded_rx, candidate_backing_unbounded_rx), + candidate_backing_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.candidate_backing, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let candidate_selection_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + candidate_selection_bounded_tx, + stream::select(candidate_selection_bounded_rx, candidate_selection_unbounded_rx), + candidate_selection_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.candidate_selection, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let statement_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + statement_distribution_bounded_tx, + stream::select(statement_distribution_bounded_rx, statement_distribution_unbounded_rx), + candidate_validation_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.statement_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let availability_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + availability_distribution_bounded_tx, + stream::select(availability_distribution_bounded_rx, availability_distribution_unbounded_rx), + availability_distribution_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.availability_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let availability_recovery_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + availability_recovery_bounded_tx, + stream::select(availability_recovery_bounded_rx, availability_recovery_unbounded_rx), + availability_recovery_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.availability_recovery, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let bitfield_signing_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + bitfield_signing_bounded_tx, + stream::select(bitfield_signing_bounded_rx, bitfield_signing_unbounded_rx), + bitfield_signing_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.bitfield_signing, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let bitfield_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + bitfield_distribution_bounded_tx, + stream::select(bitfield_distribution_bounded_rx, bitfield_distribution_unbounded_rx), + bitfield_distribution_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.bitfield_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let provisioner_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + provisioner_bounded_tx, + stream::select(provisioner_bounded_rx, provisioner_unbounded_rx), + provisioner_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.provisioner, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let runtime_api_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + runtime_api_bounded_tx, + stream::select(runtime_api_bounded_rx, runtime_api_unbounded_rx), + runtime_api_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.runtime_api, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let availability_store_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + availability_store_bounded_tx, + stream::select(availability_store_bounded_rx, availability_store_unbounded_rx), + availability_store_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.availability_store, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Blocking, )?; let network_bridge_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + network_bridge_bounded_tx, + stream::select(network_bridge_bounded_rx, network_bridge_unbounded_rx), + network_bridge_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.network_bridge, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let chain_api_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + chain_api_bounded_tx, + stream::select(chain_api_bounded_rx, chain_api_unbounded_rx), + chain_api_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.chain_api, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Blocking, )?; let collation_generation_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + collation_generation_bounded_tx, + stream::select(collation_generation_bounded_rx, collation_generation_unbounded_rx), + collation_generation_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.collation_generation, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; - let collator_protocol_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + collator_protocol_bounded_tx, + stream::select(collator_protocol_bounded_rx, collator_protocol_unbounded_rx), + collator_protocol_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.collator_protocol, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let approval_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + approval_distribution_bounded_tx, + stream::select(approval_distribution_bounded_rx, approval_distribution_unbounded_rx), + approval_distribution_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.approval_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let approval_voting_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + approval_voting_bounded_tx, + stream::select(approval_voting_bounded_rx, approval_voting_unbounded_rx), + approval_voting_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.approval_voting, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Blocking, )?; let gossip_support_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + gossip_support_bounded_tx, + stream::select(gossip_support_bounded_rx, gossip_support_unbounded_rx), + gossip_support_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.gossip_support, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; @@ -1762,35 +2240,33 @@ where }; { - struct ExtractNameAndMeter; - impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem> for ExtractNameAndMeter { - type Output = (&'static str, metered::Meter); + struct ExtractNameAndMeters; + impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem> for ExtractNameAndMeters { + type Output = (&'static str, SubsystemMeters); fn map_subsystem(&self, subsystem: &'a OverseenSubsystem) -> Self::Output { let instance = subsystem.instance.as_ref() .expect("Extraction is done directly after spawning when subsystems\ have not concluded; qed"); - (instance.name, instance.tx.meter().clone()) + ( + instance.name, + instance.meters.clone(), + ) } } - let meter_external_to_overseer = events_rx.meter().clone(); - let meter_subsystem_to_overseer = to_overseer_rx.meter().clone(); - let subsystem_meters = subsystems.as_ref().map_subsystems(ExtractNameAndMeter); + let subsystem_meters = subsystems.as_ref().map_subsystems(ExtractNameAndMeters); let metronome_metrics = metrics.clone(); let metronome = Metronome::new(std::time::Duration::from_millis(950)) .for_each(move |_| { - let to_subsystem_counts = subsystem_meters.as_ref() - .map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.read())); + let subsystem_meters = subsystem_meters.as_ref() + .map_subsystems(|&(name, ref meters): &(_, SubsystemMeters)| (name, meters.read())); // We combine the amount of messages from subsystems to the overseer // as well as the amount of messages from external sources to the overseer // into one to_overseer value. - metronome_metrics.channel_fill_level_snapshot( - to_subsystem_counts, - meter_subsystem_to_overseer.read() + meter_external_to_overseer.read(), - ); + metronome_metrics.channel_fill_level_snapshot(subsystem_meters); async move { () @@ -1899,7 +2375,7 @@ where } }, msg = self.to_overseer_rx.next() => { - let MaybeTimed { timer, t: msg } = match msg { + let msg = match msg { Some(m) => m, None => { // This is a fused stream so we will shut down after receiving all @@ -1909,10 +2385,6 @@ where }; match msg { - ToOverseer::SubsystemMessage(msg) => { - let msg = MaybeTimed { timer, t: msg }; - self.route_message(msg).await? - }, ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } @@ -2018,8 +2490,7 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn route_message(&mut self, msg: MaybeTimed) -> SubsystemResult<()> { - let msg = msg.into_inner(); + async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { @@ -2148,33 +2619,33 @@ enum TaskKind { fn spawn( spawner: &mut S, - futures: &mut FuturesUnordered>>, - to_overseer: metered::UnboundedMeteredSender>, + message_tx: metered::MeteredSender>, + message_rx: SubsystemIncomingMessages, + unbounded_meter: metered::Meter, + to_subsystems: ChannelsOut, + to_overseer_tx: metered::UnboundedMeteredSender, s: impl Subsystem>, metrics: &Metrics, - seed: &mut u64, + futures: &mut FuturesUnordered>>, task_kind: TaskKind, ) -> SubsystemResult> { - let (to_tx, to_rx) = metered::channel(CHANNEL_CAPACITY, "subsystem_spawn"); + let (signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); let ctx = OverseerSubsystemContext::new( - to_rx, - to_overseer, + signal_rx, + message_rx, + to_subsystems, + to_overseer_tx, metrics.clone(), - *seed, - MESSAGE_TIMER_METRIC_CAPTURE_RATE, ); let SpawnedSubsystem { future, name } = s.start(ctx); - // increment the seed now that it's been used, so the next context will have its own distinct RNG - *seed += 1; - let (tx, rx) = oneshot::channel(); let fut = Box::pin(async move { if let Err(e) = future.await { - tracing::error!(target: LOG_TARGET, subsystem=name, err = ?e, "subsystem exited with error"); + tracing::error!(subsystem=name, err = ?e, "subsystem exited with error"); } else { - tracing::debug!(target: LOG_TARGET, subsystem=name, "subsystem exited without an error"); + tracing::debug!(subsystem=name, "subsystem exited without an error"); } let _ = tx.send(()); }); @@ -2187,7 +2658,14 @@ fn spawn( futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) }))); let instance = Some(SubsystemInstance { - tx: to_tx, + meters: SubsystemMeters { + unbounded: unbounded_meter, + bounded: message_tx.meter().clone(), + signals: signal_tx.meter().clone(), + }, + tx_signal: signal_tx, + tx_bounded: message_tx, + signals_received: 0, name, }); @@ -2209,6 +2687,7 @@ mod tests { use polkadot_node_subsystem_util::metered; use sp_core::crypto::Pair as _; + use assert_matches::assert_matches; use super::*; @@ -2311,8 +2790,8 @@ mod tests { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (s1_tx, s1_rx) = metered::channel::(64, "overseer_test"); - let (s2_tx, s2_rx) = metered::channel::(64, "overseer_test"); + let (s1_tx, s1_rx) = metered::channel::(64); + let (s2_tx, s2_rx) = metered::channel::(64); let mut s1_rx = s1_rx.fuse(); let mut s2_rx = s2_rx.fuse(); @@ -2423,9 +2902,6 @@ mod tests { assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); - assert_eq!(gather[3].get_name(), "parachain_overseer_messages_relay_timings"); - assert_eq!(gather[4].get_name(), "parachain_to_overseer_received"); - assert_eq!(gather[5].get_name(), "parachain_to_overseer_sent"); let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; @@ -2546,8 +3022,8 @@ mod tests { number: 3, }; - let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); - let (tx_6, mut rx_6) = metered::channel(64, "overseer_test"); + let (tx_5, mut rx_5) = metered::channel(64); + let (tx_6, mut rx_6) = metered::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) .replace_candidate_backing(TestSubsystem6(tx_6)); @@ -2648,8 +3124,8 @@ mod tests { number: 3, }; - let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); - let (tx_6, mut rx_6) = metered::channel(64, "overseer_test"); + let (tx_5, mut rx_5) = metered::channel(64); + let (tx_6, mut rx_6) = metered::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) @@ -2748,7 +3224,7 @@ mod tests { number: 1, }; - let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); + let (tx_5, mut rx_5) = metered::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_backing(TestSubsystem6(tx_5)); @@ -2960,8 +3436,11 @@ mod tests { // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - let spawner = sp_core::testing::TaskExecutor::new(); + const NUM_SUBSYSTEMS: usize = 18; + // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution + const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3; + let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0)); let signals_received = Arc::new(atomic::AtomicUsize::new(0)); @@ -3030,22 +3509,158 @@ mod tests { handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; + // Wait until all subsystems have received. Otherwise the messages might race against + // the conclude signal. + loop { + match (&mut overseer_fut).timeout(Duration::from_millis(100)).await { + None => { + let r = msgs_received.load(atomic::Ordering::SeqCst); + if r < NUM_SUBSYSTEMS_MESSAGED { + Delay::new(Duration::from_millis(100)).await; + } else if r > NUM_SUBSYSTEMS_MESSAGED { + panic!("too many messages received??"); + } else { + break + } + } + Some(_) => panic!("exited too early"), + } + } + // send a stop signal to each subsystems handler.stop().await; - select! { - res = overseer_fut => { - const NUM_SUBSYSTEMS: usize = 18; + let res = overseer_fut.await; + assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); + assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED); - assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution - assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 3); - - assert!(res.is_ok()); - }, - complete => (), - } + assert!(res.is_ok()); }); } + + #[test] + fn context_holds_onto_message_until_enough_signals_received() { + let (candidate_validation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (candidate_backing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (candidate_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (statement_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (availability_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (availability_recovery_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (bitfield_signing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (bitfield_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (provisioner_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (runtime_api_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (availability_store_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (network_bridge_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (chain_api_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (collator_protocol_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (collation_generation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (approval_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (approval_voting_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (gossip_support_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + + let (candidate_validation_unbounded_tx, _) = metered::unbounded(); + let (candidate_backing_unbounded_tx, _) = metered::unbounded(); + let (candidate_selection_unbounded_tx, _) = metered::unbounded(); + let (statement_distribution_unbounded_tx, _) = metered::unbounded(); + let (availability_distribution_unbounded_tx, _) = metered::unbounded(); + let (availability_recovery_unbounded_tx, _) = metered::unbounded(); + let (bitfield_signing_unbounded_tx, _) = metered::unbounded(); + let (bitfield_distribution_unbounded_tx, _) = metered::unbounded(); + let (provisioner_unbounded_tx, _) = metered::unbounded(); + let (runtime_api_unbounded_tx, _) = metered::unbounded(); + let (availability_store_unbounded_tx, _) = metered::unbounded(); + let (network_bridge_unbounded_tx, _) = metered::unbounded(); + let (chain_api_unbounded_tx, _) = metered::unbounded(); + let (collator_protocol_unbounded_tx, _) = metered::unbounded(); + let (collation_generation_unbounded_tx, _) = metered::unbounded(); + let (approval_distribution_unbounded_tx, _) = metered::unbounded(); + let (approval_voting_unbounded_tx, _) = metered::unbounded(); + let (gossip_support_unbounded_tx, _) = metered::unbounded(); + + let channels_out = ChannelsOut { + candidate_validation: candidate_validation_bounded_tx.clone(), + candidate_backing: candidate_backing_bounded_tx.clone(), + candidate_selection: candidate_selection_bounded_tx.clone(), + statement_distribution: statement_distribution_bounded_tx.clone(), + availability_distribution: availability_distribution_bounded_tx.clone(), + availability_recovery: availability_recovery_bounded_tx.clone(), + bitfield_signing: bitfield_signing_bounded_tx.clone(), + bitfield_distribution: bitfield_distribution_bounded_tx.clone(), + provisioner: provisioner_bounded_tx.clone(), + runtime_api: runtime_api_bounded_tx.clone(), + availability_store: availability_store_bounded_tx.clone(), + network_bridge: network_bridge_bounded_tx.clone(), + chain_api: chain_api_bounded_tx.clone(), + collator_protocol: collator_protocol_bounded_tx.clone(), + collation_generation: collation_generation_bounded_tx.clone(), + approval_distribution: approval_distribution_bounded_tx.clone(), + approval_voting: approval_voting_bounded_tx.clone(), + gossip_support: gossip_support_bounded_tx.clone(), + + candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), + candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), + candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(), + statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(), + availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(), + availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(), + bitfield_signing_unbounded: bitfield_signing_unbounded_tx.clone(), + bitfield_distribution_unbounded: bitfield_distribution_unbounded_tx.clone(), + provisioner_unbounded: provisioner_unbounded_tx.clone(), + runtime_api_unbounded: runtime_api_unbounded_tx.clone(), + availability_store_unbounded: availability_store_unbounded_tx.clone(), + network_bridge_unbounded: network_bridge_unbounded_tx.clone(), + chain_api_unbounded: chain_api_unbounded_tx.clone(), + collator_protocol_unbounded: collator_protocol_unbounded_tx.clone(), + collation_generation_unbounded: collation_generation_unbounded_tx.clone(), + approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(), + approval_voting_unbounded: approval_voting_unbounded_tx.clone(), + gossip_support_unbounded: gossip_support_unbounded_tx.clone(), + }; + + let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); + let (mut bounded_tx, bounded_rx) = metered::channel(CHANNEL_CAPACITY); + let (unbounded_tx, unbounded_rx) = metered::unbounded(); + let (to_overseer_tx, _to_overseer_rx) = metered::unbounded(); + + let mut ctx = OverseerSubsystemContext::<()>::new_unmetered( + signal_rx, + stream::select(bounded_rx, unbounded_rx), + channels_out, + to_overseer_tx, + ); + + assert_eq!(ctx.signals_received.load(), 0); + + let test_fut = async move { + signal_tx.send(OverseerSignal::Conclude).await.unwrap(); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Signal(OverseerSignal::Conclude)); + + assert_eq!(ctx.signals_received.load(), 1); + bounded_tx.send(MessagePacket { + signals_received: 2, + message: (), + }).await.unwrap(); + unbounded_tx.unbounded_send(MessagePacket { + signals_received: 2, + message: (), + }).unwrap(); + + match poll!(ctx.recv()) { + Poll::Pending => {} + Poll::Ready(_) => panic!("ready too early"), + }; + + assert!(ctx.pending_incoming.is_some()); + + signal_tx.send(OverseerSignal::Conclude).await.unwrap(); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Signal(OverseerSignal::Conclude)); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Communication { msg: () }); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Communication { msg: () }); + assert!(ctx.pending_incoming.is_none()); + }; + + futures::executor::block_on(test_fut); + } } diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 71d9631fd8..76fd86a3f6 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -21,7 +21,7 @@ use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem::{ FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem, - SpawnedSubsystem, OverseerSignal, + SpawnedSubsystem, OverseerSignal, SubsystemSender, }; use polkadot_node_subsystem_util::TimeoutExt; @@ -156,9 +156,41 @@ pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { (SingleItemSink(inner.clone()), SingleItemStream(inner)) } +/// A test subsystem sender. +#[derive(Clone)] +pub struct TestSubsystemSender { + tx: mpsc::UnboundedSender, +} + +#[async_trait::async_trait] +impl SubsystemSender for TestSubsystemSender { + async fn send_message(&mut self, msg: AllMessages) { + self.tx + .send(msg) + .await + .expect("test overseer no longer live"); + } + + async fn send_messages(&mut self, msgs: T) + where + T: IntoIterator + Send, + T::IntoIter: Send, + { + let mut iter = stream::iter(msgs.into_iter().map(Ok)); + self.tx + .send_all(&mut iter) + .await + .expect("test overseer no longer live"); + } + + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.tx.unbounded_send(msg).expect("test overseer no longer live"); + } +} + /// A test subsystem context. pub struct TestSubsystemContext { - tx: mpsc::UnboundedSender, + tx: TestSubsystemSender, rx: SingleItemStream>, spawn: S, } @@ -168,6 +200,7 @@ impl SubsystemContext for TestSubsystemContext { type Message = M; + type Sender = TestSubsystemSender; async fn try_recv(&mut self) -> Result>, ()> { match poll!(self.rx.next()) { @@ -198,23 +231,8 @@ impl SubsystemContext Ok(()) } - async fn send_message(&mut self, msg: AllMessages) { - self.tx - .send(msg) - .await - .expect("test overseer no longer live"); - } - - async fn send_messages(&mut self, msgs: T) - where - T: IntoIterator + Send, - T::IntoIter: Send, - { - let mut iter = stream::iter(msgs.into_iter().map(Ok)); - self.tx - .send_all(&mut iter) - .await - .expect("test overseer no longer live"); + fn sender(&mut self) -> &mut TestSubsystemSender { + &mut self.tx } } @@ -260,7 +278,7 @@ pub fn make_subsystem_context( ( TestSubsystemContext { - tx: all_messages_tx, + tx: TestSubsystemSender { tx: all_messages_tx }, rx: overseer_rx, spawn, }, diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index f7cba512ab..3cc4894a49 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -210,6 +210,27 @@ pub struct SpawnedSubsystem { /// [`SubsystemError`]: struct.SubsystemError.html pub type SubsystemResult = Result; +/// A sender used by subsystems to communicate with other subsystems. +/// +/// Each clone of this type may add more capacity to the bounded buffer, so clones should +/// be used sparingly. +#[async_trait] +pub trait SubsystemSender: Send + Clone + 'static { + /// Send a direct message to some other `Subsystem`, routed based on message type. + async fn send_message(&mut self, msg: AllMessages); + + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. + async fn send_messages(&mut self, msgs: T) + where T: IntoIterator + Send, T::IntoIter: Send; + + /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message + /// type. + /// + /// This function should be used only when there is some other bounding factor on the messages + /// sent with it. Otherwise, it risks a memory leak. + fn send_unbounded_message(&mut self, msg: AllMessages); +} + /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or spawn jobs. @@ -217,11 +238,14 @@ pub type SubsystemResult = Result; /// [`Overseer`]: struct.Overseer.html /// [`SubsystemJob`]: trait.SubsystemJob.html #[async_trait] -pub trait SubsystemContext: Send + 'static { +pub trait SubsystemContext: Send + Sized + 'static { /// The message type of this context. Subsystems launched with this context will expect /// to receive messages of this type. type Message: Send; + /// The message sender type of this context. Clones of the sender should be used sparingly. + type Sender: SubsystemSender; + /// Try to asynchronously receive a message. /// /// This has to be used with caution, if you loop over this without @@ -241,12 +265,34 @@ pub trait SubsystemContext: Send + 'static { s: Pin + Send>>, ) -> SubsystemResult<()>; + /// Get a mutable reference to the sender. + fn sender(&mut self) -> &mut Self::Sender; + /// Send a direct message to some other `Subsystem`, routed based on message type. - async fn send_message(&mut self, msg: AllMessages); + async fn send_message(&mut self, msg: AllMessages) { + self.sender().send_message(msg).await + } /// Send multiple direct messages to other `Subsystem`s, routed based on message type. async fn send_messages(&mut self, msgs: T) - where T: IntoIterator + Send, T::IntoIter: Send; + where T: IntoIterator + Send, T::IntoIter: Send + { + self.sender().send_messages(msgs).await + } + + + /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message + /// type. + /// + /// This function should be used only when there is some other bounding factor on the messages + /// sent with it. Otherwise, it risks a memory leak. + /// + /// Generally, for this method to be used, these conditions should be met: + /// * There is a communication cycle between subsystems + /// * One of the parts of the cycle has a clear bound on the number of messages produced. + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.sender().send_unbounded_message(msg) + } } /// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].