From cd58c02cd3333cb4f0aa7f9aeebfc1381c2fbbb6 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Wed, 6 Jan 2021 14:25:04 +0100 Subject: [PATCH] Add metrics timing message passing from OverseerSubsystemContext to Overseer::route_message (#2201) * add timing setup to OverseerSubsystemContext * figure out how to initialize the rng * attach a timer to a portion of the messages traveling to the Overseer This timer only exists / logs a fraction of the time (configurable by `MESSAGE_TIMER_METRIC_CAPTURE_RATE`). When it exists, it tracks the span between the `OverSubsystemContext` receiving the message and its receipt in `Overseer::run`. * propagate message timing to the start of route_message This should be more accurate; it ensures that the timer runs at least as long as that function. As `route_message` is async, it may not actually run for some time after it is called (or ever). * fix failing test * rand_chacha apparently implicitly has getrandom feature * change rng initialization The previous impl using `from_entropy` depends on the `getrandom` crate, which uses the system entropy source, and which does not work on `wasm32-unknown-unknown` because it wants to fall back to a JS implementation which we can't assume exists. This impl depends only on `rand::thread_rng`, which has no documentation stating that it's similarly limited. * remove randomness in favor of a simpler 1 of N procedure This deserves a bit of explanation, as the motivating issue explicitly requested randomness. In short, it's hard to get randomness to compile for `wasm32-unknown-unknown` because that is explicitly intended to be as deterministic as practical. Additionally, even though it would never be used for consensus purposes, it still felt offputting to intentionally introduce randomness into a node's operations. Except, it wasn't really random, either: it was a deterministic PRNG varying only in its state, and getting the state to work right for that target would have required initializing from a constant. Given that it was a deterministic sequence anyway, it seemed much simpler and more explicit to simply select one of each N messages instead of attempting any kind of realistic randomness. * reinstate randomness for better statistical properties This partially reverts commit 0ab8594c328b3f9ce1f696fe405556d4000630e9. `oorandom` is much lighter than the previous `rand`-based implementation, which makes this easier to work with. This implementation gives each subsystem and each child RNG a distinct increment, which should ensure they produce distinct streams of values. --- polkadot/Cargo.lock | 7 + polkadot/node/overseer/Cargo.toml | 17 +- polkadot/node/overseer/src/lib.rs | 255 ++++++++++++++++++++++++++---- 3 files changed, 241 insertions(+), 38 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index e087f1b74c..dd1fe284ad 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -3841,6 +3841,12 @@ dependencies = [ "parking_lot 0.11.1", ] +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -5337,6 +5343,7 @@ dependencies = [ "futures 0.3.8", "futures-timer 3.0.2", "kv-log-macro", + "oorandom", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index dd9ca81d3f..41c114f15d 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -5,17 +5,18 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +async-trait = "0.1.42" +client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.8" +futures-timer = "3.0.2" +oorandom = "11.1.3" +polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } +polkadot-node-subsystem-util = { path = "../subsystem-util" } +polkadot-primitives = { path = "../../primitives" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } +streamunordered = "0.5.1" tracing = "0.1.22" tracing-futures = "0.2.4" -futures-timer = "3.0.2" -streamunordered = "0.5.1" -polkadot-primitives = { path = "../../primitives" } -client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } -polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } -polkadot-node-subsystem-util = { path = "../subsystem-util" } -polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } -async-trait = "0.1.42" [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 687a9ceab2..60e0215d51 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -59,7 +59,7 @@ // yielding false positives #![warn(missing_docs)] -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; @@ -74,6 +74,7 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; +use oorandom::Rand32; use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; @@ -99,6 +100,8 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "overseer"; +// Rate at which messages are timed. +const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// @@ -291,6 +294,26 @@ struct SubsystemInstance { name: &'static str, } +type MaybeTimer = Option; + +#[derive(Debug)] +struct MaybeTimed { + timer: MaybeTimer, + t: T, +} + +impl MaybeTimed { + fn into_inner(self) -> T { + self.t + } +} + +impl From for MaybeTimed { + fn from(t: T) -> Self { + Self { timer: None, t } + } +} + /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or to spawn it's [`SubsystemJob`]s. @@ -301,7 +324,82 @@ struct SubsystemInstance { #[derive(Debug)] pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, - tx: mpsc::Sender, + tx: mpsc::Sender>, + metrics: Metrics, + rng: Rand32, + threshold: u32, +} + +impl OverseerSubsystemContext { + /// Create a new `OverseerSubsystemContext`. + /// + /// `increment` determines the initial increment of the internal RNG. + /// The internal RNG is used to determine which messages are timed. + /// + /// `capture_rate` determines what fraction of messages are timed. Its value is clamped + /// to the range `0.0..=1.0`. + fn new( + rx: mpsc::Receiver>, + tx: mpsc::Sender>, + metrics: Metrics, + increment: u64, + mut capture_rate: f64, + ) -> Self { + let rng = Rand32::new_inc(0, increment); + + if capture_rate < 0.0 { + capture_rate = 0.0; + } else if capture_rate > 1.0 { + capture_rate = 1.0; + } + let threshold = (capture_rate * u32::MAX as f64) as u32; + + OverseerSubsystemContext { rx, tx, metrics, rng, threshold } + } + + /// Create a new `OverseserSubsystemContext` with no metering. + /// + /// Intended for tests. + #[allow(unused)] + fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { + let metrics = Metrics::default(); + OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) + } + + fn maybe_timed(&mut self, t: T) -> MaybeTimed { + let timer = if self.rng.rand_u32() <= self.threshold { + self.metrics.time_message_hold() + } else { + None + }; + + MaybeTimed { timer, t } + } + + /// Make a standalone function which can construct a `MaybeTimed` wrapper around some `T` + /// without borrowing `self`. + /// + /// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff. + fn make_maybe_timed(&mut self) -> impl FnMut(T) -> MaybeTimed { + // We don't want to simply clone this RNG because we don't want to duplicate its state. + // It's not ever going to be used for cryptographic purposes, but it's still better to + // keep good habits. + let (seed, increment) = self.rng.state(); + let mut rng = Rand32::new_inc(seed, increment + 1); + + let metrics = self.metrics.clone(); + let threshold = self.threshold; + + move |t| { + let timer = if rng.rand_u32() <= threshold { + metrics.time_message_hold() + } else { + None + }; + + MaybeTimed { timer, t } + } + } } #[async_trait::async_trait] @@ -327,7 +425,7 @@ impl SubsystemContext for OverseerSubsystemContext { async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SpawnJob { + self.send_timed(ToOverseer::SpawnJob { name, s, }).await.map_err(Into::into) @@ -336,7 +434,7 @@ impl SubsystemContext for OverseerSubsystemContext { async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SpawnBlockingJob { + self.send_timed(ToOverseer::SpawnBlockingJob { name, s, }).await.map_err(Into::into) @@ -349,21 +447,13 @@ impl SubsystemContext for OverseerSubsystemContext { async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send { - let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); - if self.tx.send_all(&mut msgs).await.is_err() { - tracing::debug!( - target: LOG_TARGET, - msg_type = std::any::type_name::(), - "Failed to send messages to Overseer", - ); - - } + self.send_all_timed_or_log(msgs).await } } impl OverseerSubsystemContext { async fn send_and_log_error(&mut self, msg: ToOverseer) { - if self.tx.send(msg).await.is_err() { + if self.send_timed(msg).await.is_err() { tracing::debug!( target: LOG_TARGET, msg_type = std::any::type_name::(), @@ -371,6 +461,35 @@ impl OverseerSubsystemContext { ); } } + + async fn send_timed(&mut self, msg: ToOverseer) -> Result< + (), + > as futures::Sink>>::Error + > + { + let msg = self.maybe_timed(msg); + self.tx.send(msg).await + } + + async fn send_all_timed_or_log(&mut self, msgs: Msgs) + where + Msgs: IntoIterator + Send, + Msgs::IntoIter: Send, + Msg: Into + Send, + { + let mut maybe_timed = self.make_maybe_timed(); + let mut msgs = stream::iter( + msgs.into_iter() + .map(move |msg| Ok(maybe_timed(ToOverseer::SubsystemMessage(msg.into())))) + ); + if self.tx.send_all(&mut msgs).await.is_err() { + tracing::debug!( + target: LOG_TARGET, + msg_type = std::any::type_name::(), + "Failed to send messages to Overseer", + ); + } + } } /// A subsystem that we oversee. @@ -480,8 +599,8 @@ pub struct Overseer { /// Here we keep handles to spawned subsystems to be notified when they terminate. running_subsystems: FuturesUnordered>>, - /// Gather running subsystms' outbound streams into one. - running_subsystems_rx: StreamUnordered>, + /// Gather running subsystems' outbound streams into one. + running_subsystems_rx: StreamUnordered>>, /// Events that are sent to the overseer from the outside world events_rx: mpsc::Receiver, @@ -966,6 +1085,7 @@ struct MetricsInner { activated_heads_total: prometheus::Counter, deactivated_heads_total: prometheus::Counter, messages_relayed_total: prometheus::Counter, + message_relay_timing: prometheus::Histogram, } #[derive(Default, Clone)] @@ -989,6 +1109,11 @@ impl Metrics { metrics.messages_relayed_total.inc(); } } + + /// Provide a timer for the duration between receiving a message and passing it to `route_message` + fn time_message_hold(&self) -> MaybeTimer { + self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer()) + } } impl metrics::Metrics for Metrics { @@ -1015,11 +1140,39 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + message_relay_timing: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts { + common_opts: prometheus::Opts::new( + "overseer_messages_relay_timing", + "Time spent holding a message in the overseer before passing it to `route_message`", + ), + // guessing at the desired resolution, but we know that messages will time + // out after 0.5 seconds, so the bucket set below seems plausible: + // `0.0001 * (1.6 ^ 18) ~= 0.472`. Prometheus auto-generates a final bucket + // for all values between the final value and `+Inf`, so this should work. + // + // The documented legal range for the inputs are: + // + // - `> 0.0` + // - `> 1.0` + // - `! 0` + buckets: prometheus::exponential_buckets(0.0001, 1.6, 18).expect("inputs are within documented range; qed"), + } + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } } +impl fmt::Debug for Metrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Metrics {{...}}") + } +} + impl Overseer where S: SpawnNamed, @@ -1136,14 +1289,20 @@ where events_tx: events_tx.clone(), }; + let metrics = ::register(prometheus_registry)?; + let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); + let mut seed = 0x533d; // arbitrary + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_validation, + &metrics, + &mut seed, )?; let candidate_backing_subsystem = spawn( @@ -1151,6 +1310,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_backing, + &metrics, + &mut seed, )?; let candidate_selection_subsystem = spawn( @@ -1158,6 +1319,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_selection, + &metrics, + &mut seed, )?; let statement_distribution_subsystem = spawn( @@ -1165,6 +1328,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.statement_distribution, + &metrics, + &mut seed, )?; let availability_distribution_subsystem = spawn( @@ -1172,6 +1337,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.availability_distribution, + &metrics, + &mut seed, )?; let bitfield_signing_subsystem = spawn( @@ -1179,6 +1346,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.bitfield_signing, + &metrics, + &mut seed, )?; let bitfield_distribution_subsystem = spawn( @@ -1186,6 +1355,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.bitfield_distribution, + &metrics, + &mut seed, )?; let provisioner_subsystem = spawn( @@ -1193,6 +1364,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.provisioner, + &metrics, + &mut seed, )?; let pov_distribution_subsystem = spawn( @@ -1200,6 +1373,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.pov_distribution, + &metrics, + &mut seed, )?; let runtime_api_subsystem = spawn( @@ -1207,6 +1382,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.runtime_api, + &metrics, + &mut seed, )?; let availability_store_subsystem = spawn( @@ -1214,6 +1391,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.availability_store, + &metrics, + &mut seed, )?; let network_bridge_subsystem = spawn( @@ -1221,6 +1400,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.network_bridge, + &metrics, + &mut seed, )?; let chain_api_subsystem = spawn( @@ -1228,6 +1409,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.chain_api, + &metrics, + &mut seed, )?; let collation_generation_subsystem = spawn( @@ -1235,6 +1418,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.collation_generation, + &metrics, + &mut seed, )?; @@ -1243,6 +1428,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.collator_protocol, + &metrics, + &mut seed, )?; let leaves = leaves @@ -1251,8 +1438,6 @@ where .collect(); let active_leaves = HashMap::new(); - - let metrics = ::register(prometheus_registry)?; let activation_external_listeners = HashMap::new(); let this = Self { @@ -1342,7 +1527,7 @@ where match msg { Event::MsgToSubsystem(msg) => { - self.route_message(msg).await?; + self.route_message(msg.into()).await?; } Event::Stop => { self.stop().await; @@ -1360,14 +1545,17 @@ where } }, msg = self.running_subsystems_rx.next().fuse() => { - let msg = if let Some((StreamYield::Item(msg), _)) = msg { + let MaybeTimed { timer, t: msg } = if let Some((StreamYield::Item(msg), _)) = msg { msg } else { continue }; match msg { - ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?, + ToOverseer::SubsystemMessage(msg) => { + let msg = MaybeTimed { timer, t: msg }; + self.route_message(msg).await? + }, ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } @@ -1462,7 +1650,8 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn route_message(&mut self, msg: MaybeTimed) -> SubsystemResult<()> { + let msg = msg.into_inner(); self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { @@ -1572,14 +1761,19 @@ where fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>>, - streams: &mut StreamUnordered>, + streams: &mut StreamUnordered>>, s: impl Subsystem>, + metrics: &Metrics, + seed: &mut u64, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx }; + let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), *seed, MESSAGE_TIMER_METRIC_CAPTURE_RATE); let SpawnedSubsystem { future, name } = s.start(ctx); + // increment the seed now that it's been used, so the next context will have its own distinct RNG + *seed += 1; + let (tx, rx) = oneshot::channel(); let fut = Box::pin(async move { @@ -1827,12 +2021,13 @@ mod tests { fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> { let gather = registry.gather(); - assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); - assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); - assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); - let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; - let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; - let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; + assert_eq!(gather[0].get_name(), "overseer_messages_relay_timing"); + assert_eq!(gather[1].get_name(), "parachain_activated_heads_total"); + assert_eq!(gather[2].get_name(), "parachain_deactivated_heads_total"); + assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total"); + let activated = gather[1].get_metric()[0].get_counter().get_value() as u64; + let deactivated = gather[2].get_metric()[0].get_counter().get_value() as u64; + let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64; let mut result = HashMap::new(); result.insert("activated", activated); result.insert("deactivated", deactivated);