diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 0899ba01c7..cc12858193 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -12953,6 +12953,7 @@ dependencies = [ name = "tracing-gum" version = "0.9.43" dependencies = [ + "coarsetime", "polkadot-node-jaeger", "polkadot-primitives", "tracing", diff --git a/polkadot/node/gum/Cargo.toml b/polkadot/node/gum/Cargo.toml index 13cb9954df..6bd4f07588 100644 --- a/polkadot/node/gum/Cargo.toml +++ b/polkadot/node/gum/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true description = "Stick logs together with the TraceID as provided by tempo" [dependencies] +coarsetime = "0.1.22" tracing = "0.1.35" jaeger = { path = "../jaeger", package = "polkadot-node-jaeger" } gum-proc-macro = { path = "./proc-macro", package = "tracing-gum-proc-macro" } diff --git a/polkadot/node/gum/proc-macro/src/lib.rs b/polkadot/node/gum/proc-macro/src/lib.rs index d51742e9ca..e8b6b59917 100644 --- a/polkadot/node/gum/proc-macro/src/lib.rs +++ b/polkadot/node/gum/proc-macro/src/lib.rs @@ -43,6 +43,27 @@ pub fn warn(item: proc_macro::TokenStream) -> proc_macro::TokenStream { gum(item, Level::Warn) } +/// Print a warning or debug level message depending on their frequency +#[proc_macro] +pub fn warn_if_frequent(item: proc_macro::TokenStream) -> proc_macro::TokenStream { + let ArgsIfFrequent { freq, max_rate, rest } = parse2(item.into()).unwrap(); + + let freq_expr = freq.expr; + let max_rate_expr = max_rate.expr; + let debug: proc_macro2::TokenStream = gum(rest.clone().into(), Level::Debug).into(); + let warn: proc_macro2::TokenStream = gum(rest.into(), Level::Warn).into(); + + let stream = quote! { + if #freq_expr .is_frequent(#max_rate_expr) { + #warn + } else { + #debug + } + }; + + stream.into() +} + /// Print a info level message. #[proc_macro] pub fn info(item: proc_macro::TokenStream) -> proc_macro::TokenStream { diff --git a/polkadot/node/gum/proc-macro/src/types.rs b/polkadot/node/gum/proc-macro/src/types.rs index 6926e1ade9..635347a875 100644 --- a/polkadot/node/gum/proc-macro/src/types.rs +++ b/polkadot/node/gum/proc-macro/src/types.rs @@ -23,6 +23,8 @@ use syn::{ pub(crate) mod kw { syn::custom_keyword!(target); + syn::custom_keyword!(freq); + syn::custom_keyword!(max_rate); } #[derive(Debug, Clone, PartialEq, Eq)] @@ -248,6 +250,50 @@ impl ToTokens for FmtGroup { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct Freq { + kw: kw::freq, + colon: Token![:], + pub expr: syn::Expr, +} + +impl Parse for Freq { + fn parse(input: ParseStream) -> Result { + Ok(Self { kw: input.parse()?, colon: input.parse()?, expr: input.parse()? }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct MaxRate { + kw: kw::max_rate, + colon: Token![:], + pub expr: syn::Expr, +} + +impl Parse for MaxRate { + fn parse(input: ParseStream) -> Result { + Ok(Self { kw: input.parse()?, colon: input.parse()?, expr: input.parse()? }) + } +} + +pub(crate) struct ArgsIfFrequent { + pub freq: Freq, + pub max_rate: MaxRate, + pub rest: TokenStream, +} + +impl Parse for ArgsIfFrequent { + fn parse(input: ParseStream) -> Result { + let freq = input.parse()?; + let _: Token![,] = input.parse()?; + let max_rate = input.parse()?; + let _: Token![,] = input.parse()?; + let rest = input.parse()?; + + Ok(Self { freq, max_rate, rest }) + } +} + /// Full set of arguments as provided to the `gum::warn!` call. #[derive(Debug, Clone)] pub(crate) struct Args { diff --git a/polkadot/node/gum/src/lib.rs b/polkadot/node/gum/src/lib.rs index c2d62d98a6..e989a15ae4 100644 --- a/polkadot/node/gum/src/lib.rs +++ b/polkadot/node/gum/src/lib.rs @@ -112,7 +112,84 @@ pub use jaeger::hash_to_trace_identifier; #[doc(hidden)] pub use polkadot_primitives::{CandidateHash, Hash}; -pub use gum_proc_macro::{debug, error, info, trace, warn}; +pub use gum_proc_macro::{debug, error, info, trace, warn, warn_if_frequent}; #[cfg(test)] mod tests; + +const FREQ_SMOOTHING_FACTOR: f32 = 0.5; + +/// Exponential moving average +#[derive(Debug, Default)] +struct EmaBucket { + current: f32, + count: u32, +} + +impl EmaBucket { + fn update(&mut self, value: f32, alpha: f32) { + if self.count == 0 { + self.current = value; + } else { + self.current += alpha * (value - self.current); + } + self.count += 1; + } +} + +/// Utility struct to compare the rate of its own calls. +pub struct Freq { + ema: EmaBucket, + last: u64, +} + +impl Freq { + /// Initiates a new instance + pub fn new() -> Self { + Self { ema: Default::default(), last: Default::default() } + } + + /// Compares the rate of its own calls with the passed one. + pub fn is_frequent(&mut self, max_rate: Times) -> bool { + self.record(); + + // Two attempts is not enough to call something as frequent. + if self.ema.count < 3 { + return false + } + + let rate = 1000.0 / self.ema.current; // Current EMA represents interval in ms + rate > max_rate.into() + } + + fn record(&mut self) { + let now = coarsetime::Clock::now_since_epoch().as_millis() as u64; + if self.last > 0 { + self.ema.update((now - self.last) as f32, FREQ_SMOOTHING_FACTOR); + } + self.last = now; + } +} + +/// Represents frequency per second, minute, hour and day +pub enum Times { + /// Per second + PerSecond(u32), + /// Per minute + PerMinute(u32), + /// Per hour + PerHour(u32), + /// Per day + PerDay(u32), +} + +impl From for f32 { + fn from(value: Times) -> Self { + match value { + Times::PerSecond(v) => v as f32, + Times::PerMinute(v) => v as f32 / 60.0, + Times::PerHour(v) => v as f32 / (60.0 * 60.0), + Times::PerDay(v) => v as f32 / (60.0 * 60.0 * 24.0), + } + } +} diff --git a/polkadot/node/gum/src/tests.rs b/polkadot/node/gum/src/tests.rs index 941abf234d..3883239d62 100644 --- a/polkadot/node/gum/src/tests.rs +++ b/polkadot/node/gum/src/tests.rs @@ -50,6 +50,21 @@ fn wo_unnecessary() { ); } +#[test] +fn if_frequent() { + let a: i32 = 7; + let mut f = Freq::new(); + warn_if_frequent!( + freq: f, + max_rate: Times::PerSecond(1), + target: "bar", + a = a, + b = ?Y::default(), + "fff {c}", + c = a, + ); +} + #[test] fn w_candidate_hash_value_assignment() { let a: i32 = 7; @@ -102,3 +117,50 @@ fn w_candidate_hash_aliased_unnecessary() { "xxx", ); } + +#[test] +fn frequent_at_fourth_time() { + let mut freq = Freq::new(); + + assert!(!freq.is_frequent(Times::PerSecond(1))); + assert!(!freq.is_frequent(Times::PerSecond(1))); + assert!(!freq.is_frequent(Times::PerSecond(1))); + + assert!(freq.is_frequent(Times::PerSecond(1))); +} + +#[test] +fn not_frequent_at_fourth_time_if_slow() { + let mut freq = Freq::new(); + + assert!(!freq.is_frequent(Times::PerSecond(1000))); + assert!(!freq.is_frequent(Times::PerSecond(1000))); + assert!(!freq.is_frequent(Times::PerSecond(1000))); + + std::thread::sleep(std::time::Duration::from_millis(10)); + assert!(!freq.is_frequent(Times::PerSecond(1000))); +} + +#[test] +fn calculate_rate_per_second() { + let rate: f32 = Times::PerSecond(100).into(); + assert_eq!(rate, 100.0) +} + +#[test] +fn calculate_rate_per_minute() { + let rate: f32 = Times::PerMinute(100).into(); + assert_eq!(rate, 1.6666666) +} + +#[test] +fn calculate_rate_per_hour() { + let rate: f32 = Times::PerHour(100).into(); + assert_eq!(rate, 0.027777778) +} + +#[test] +fn calculate_rate_per_day() { + let rate: f32 = Times::PerDay(100).into(); + assert_eq!(rate, 0.0011574074) +} diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 3e060723e7..c547a1abbc 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -91,7 +91,11 @@ pub type Result = std::result::Result; /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them -pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> { +pub fn log_error( + result: Result<()>, + ctx: &'static str, + warn_freq: &mut gum::Freq, +) -> std::result::Result<(), FatalError> { match result.into_nested()? { Ok(()) => Ok(()), Err(jfyi) => { @@ -104,7 +108,8 @@ pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<( JfyiError::FetchPoV(_) | JfyiError::SendResponse | JfyiError::NoSuchPoV | - JfyiError::Runtime(_) => gum::debug!(target: LOG_TARGET, error = ?jfyi, ctx), + JfyiError::Runtime(_) => + gum::warn_if_frequent!(freq: warn_freq, max_rate: gum::Times::PerHour(100), target: LOG_TARGET, error = ?jfyi, ctx), } Ok(()) }, diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index bbefe12f11..c62ce1dd98 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -97,6 +97,7 @@ impl AvailabilityDistributionSubsystem { let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs; let mut requester = Requester::new(metrics.clone()).fuse(); + let mut warn_freq = gum::Freq::new(); { let sender = ctx.sender().clone(); @@ -147,6 +148,7 @@ impl AvailabilityDistributionSubsystem { .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await, "Error in Requester::update_fetching_heads", + &mut warn_freq, )?; }, FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _)) => { @@ -188,6 +190,7 @@ impl AvailabilityDistributionSubsystem { ) .await, "pov_requester::fetch_pov", + &mut warn_freq, )?; }, } diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 119bcdaa94..f87e1888bb 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -260,6 +260,8 @@ impl RunningTask { let mut succeeded = false; let mut count: u32 = 0; let mut span = self.span.child("run-fetch-chunk-task").with_relay_parent(self.relay_parent); + let mut network_error_freq = gum::Freq::new(); + let mut canceled_freq = gum::Freq::new(); // Try validators in reverse order: while let Some(validator) = self.group.pop() { // Report retries: @@ -272,7 +274,10 @@ impl RunningTask { .with_chunk_index(self.request.index.0) .with_stage(jaeger::Stage::AvailabilityDistribution); // Send request: - let resp = match self.do_request(&validator).await { + let resp = match self + .do_request(&validator, &mut network_error_freq, &mut canceled_freq) + .await + { Ok(resp) => resp, Err(TaskError::ShuttingDown) => { gum::info!( @@ -342,6 +347,8 @@ impl RunningTask { async fn do_request( &mut self, validator: &AuthorityDiscoveryId, + nerwork_error_freq: &mut gum::Freq, + canceled_freq: &mut gum::Freq, ) -> std::result::Result { gum::trace!( target: LOG_TARGET, @@ -386,7 +393,9 @@ impl RunningTask { Err(TaskError::PeerError) }, Err(RequestError::NetworkError(err)) => { - gum::debug!( + gum::warn_if_frequent!( + freq: nerwork_error_freq, + max_rate: gum::Times::PerHour(100), target: LOG_TARGET, origin = ?validator, relay_parent = ?self.relay_parent, @@ -400,7 +409,9 @@ impl RunningTask { Err(TaskError::PeerError) }, Err(RequestError::Canceled(oneshot::Canceled)) => { - gum::debug!( + gum::warn_if_frequent!( + freq: canceled_freq, + max_rate: gum::Times::PerHour(100), target: LOG_TARGET, origin = ?validator, relay_parent = ?self.relay_parent, diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index f098609034..d8896afad0 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -1270,6 +1270,9 @@ async fn run_inner( let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL); futures::pin_mut!(check_collations_stream); + let mut network_error_freq = gum::Freq::new(); + let mut canceled_freq = gum::Freq::new(); + loop { select! { _ = reputation_delay => { @@ -1312,6 +1315,8 @@ async fn run_inner( &mut state.requested_collations, &state.metrics, &state.span_per_relay_parent, + &mut network_error_freq, + &mut canceled_freq, ).await; for (peer_id, rep) in reputation_changes { @@ -1328,14 +1333,22 @@ async fn poll_requests( requested_collations: &mut HashMap, metrics: &Metrics, span_per_relay_parent: &HashMap, + network_error_freq: &mut gum::Freq, + canceled_freq: &mut gum::Freq, ) -> Vec<(PeerId, Rep)> { let mut retained_requested = HashSet::new(); let mut reputation_changes = Vec::new(); for (pending_collation, per_req) in requested_collations.iter_mut() { // Despite the await, this won't block on the response itself. - let result = - poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req) - .await; + let result = poll_collation_response( + metrics, + span_per_relay_parent, + pending_collation, + per_req, + network_error_freq, + canceled_freq, + ) + .await; if !result.is_ready() { retained_requested.insert(pending_collation.clone()); @@ -1479,6 +1492,8 @@ async fn poll_collation_response( spans: &HashMap, pending_collation: &PendingCollation, per_req: &mut PerRequest, + network_error_freq: &mut gum::Freq, + canceled_freq: &mut gum::Freq, ) -> CollationFetchResult { if never!(per_req.from_collator.is_terminated()) { gum::error!( @@ -1522,7 +1537,9 @@ async fn poll_collation_response( CollationFetchResult::Error(None) }, Err(RequestError::NetworkError(err)) => { - gum::debug!( + gum::warn_if_frequent!( + freq: network_error_freq, + max_rate: gum::Times::PerHour(100), target: LOG_TARGET, hash = ?pending_collation.relay_parent, para_id = ?pending_collation.para_id, @@ -1537,7 +1554,9 @@ async fn poll_collation_response( CollationFetchResult::Error(Some(COST_NETWORK_ERROR)) }, Err(RequestError::Canceled(err)) => { - gum::debug!( + gum::warn_if_frequent!( + freq: canceled_freq, + max_rate: gum::Times::PerHour(100), target: LOG_TARGET, hash = ?pending_collation.relay_parent, para_id = ?pending_collation.para_id, diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs index 14917af99f..e0895989ee 100644 --- a/polkadot/node/network/statement-distribution/src/error.rs +++ b/polkadot/node/network/statement-distribution/src/error.rs @@ -82,13 +82,18 @@ pub enum Error { /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them. -pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> { +pub fn log_error( + result: Result<()>, + ctx: &'static str, + warn_freq: &mut gum::Freq, +) -> std::result::Result<(), FatalError> { match result.into_nested()? { Err(jfyi) => { match jfyi { JfyiError::RequestedUnannouncedCandidate(_, _) => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), - _ => gum::debug!(target: LOG_TARGET, error = %jfyi, ctx), + _ => + gum::warn_if_frequent!(freq: warn_freq, max_rate: gum::Times::PerHour(100), target: LOG_TARGET, error = %jfyi, ctx), } Ok(()) }, diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 1c97cbfab5..4fa05865bb 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1810,6 +1810,8 @@ impl StatementDistributionSubsystem { ) .map_err(FatalError::SpawnTask)?; + let mut warn_freq = gum::Freq::new(); + loop { select! { _ = reputation_delay => { @@ -1851,7 +1853,7 @@ impl StatementDistributionSubsystem { result.ok_or(FatalError::RequesterReceiverFinished)?, ) .await; - log_error(result.map_err(From::from), "handle_requester_message")?; + log_error(result.map_err(From::from), "handle_requester_message", &mut warn_freq)?; }, MuxedMessage::Responder(result) => { let result = self @@ -1861,7 +1863,7 @@ impl StatementDistributionSubsystem { result.ok_or(FatalError::ResponderReceiverFinished)?, ) .await; - log_error(result.map_err(From::from), "handle_responder_message")?; + log_error(result.map_err(From::from), "handle_responder_message", &mut warn_freq)?; }, }; }