Alert on frequent network errors (#7410)

* Introduce is_frequent util

* Add dirty warn_if_frequent! implementation

* Add freq

* Fix order in condition

* Update

* Update docs

* Fix

* Remove old impl

* Fix errors

* Add wif to av-distr

* Add wif to col prot

* Rename

* Add wif to state-distr

* Address review comments

* Change Freq implementation

* Remove the zero division check

* Make rate explicit

* Fix typo

* Update rate constant

* Introduce explicit rates

* Update docs

* Split errors freq

* Downgrade coarsetime
This commit is contained in:
Andrei Eres
2023-07-17 14:05:57 +02:00
committed by GitHub
parent dd7d2f924b
commit 174f23d1cc
12 changed files with 268 additions and 15 deletions
+1
View File
@@ -12953,6 +12953,7 @@ dependencies = [
name = "tracing-gum"
version = "0.9.43"
dependencies = [
"coarsetime",
"polkadot-node-jaeger",
"polkadot-primitives",
"tracing",
+1
View File
@@ -6,6 +6,7 @@ edition.workspace = true
description = "Stick logs together with the TraceID as provided by tempo"
[dependencies]
coarsetime = "0.1.22"
tracing = "0.1.35"
jaeger = { path = "../jaeger", package = "polkadot-node-jaeger" }
gum-proc-macro = { path = "./proc-macro", package = "tracing-gum-proc-macro" }
+21
View File
@@ -43,6 +43,27 @@ pub fn warn(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
gum(item, Level::Warn)
}
/// Print a warning or debug level message depending on their frequency
#[proc_macro]
pub fn warn_if_frequent(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
let ArgsIfFrequent { freq, max_rate, rest } = parse2(item.into()).unwrap();
let freq_expr = freq.expr;
let max_rate_expr = max_rate.expr;
let debug: proc_macro2::TokenStream = gum(rest.clone().into(), Level::Debug).into();
let warn: proc_macro2::TokenStream = gum(rest.into(), Level::Warn).into();
let stream = quote! {
if #freq_expr .is_frequent(#max_rate_expr) {
#warn
} else {
#debug
}
};
stream.into()
}
/// Print a info level message.
#[proc_macro]
pub fn info(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
+46
View File
@@ -23,6 +23,8 @@ use syn::{
pub(crate) mod kw {
syn::custom_keyword!(target);
syn::custom_keyword!(freq);
syn::custom_keyword!(max_rate);
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -248,6 +250,50 @@ impl ToTokens for FmtGroup {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Freq {
kw: kw::freq,
colon: Token![:],
pub expr: syn::Expr,
}
impl Parse for Freq {
fn parse(input: ParseStream) -> Result<Self> {
Ok(Self { kw: input.parse()?, colon: input.parse()?, expr: input.parse()? })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct MaxRate {
kw: kw::max_rate,
colon: Token![:],
pub expr: syn::Expr,
}
impl Parse for MaxRate {
fn parse(input: ParseStream) -> Result<Self> {
Ok(Self { kw: input.parse()?, colon: input.parse()?, expr: input.parse()? })
}
}
pub(crate) struct ArgsIfFrequent {
pub freq: Freq,
pub max_rate: MaxRate,
pub rest: TokenStream,
}
impl Parse for ArgsIfFrequent {
fn parse(input: ParseStream) -> Result<Self> {
let freq = input.parse()?;
let _: Token![,] = input.parse()?;
let max_rate = input.parse()?;
let _: Token![,] = input.parse()?;
let rest = input.parse()?;
Ok(Self { freq, max_rate, rest })
}
}
/// Full set of arguments as provided to the `gum::warn!` call.
#[derive(Debug, Clone)]
pub(crate) struct Args {
+78 -1
View File
@@ -112,7 +112,84 @@ pub use jaeger::hash_to_trace_identifier;
#[doc(hidden)]
pub use polkadot_primitives::{CandidateHash, Hash};
pub use gum_proc_macro::{debug, error, info, trace, warn};
pub use gum_proc_macro::{debug, error, info, trace, warn, warn_if_frequent};
#[cfg(test)]
mod tests;
const FREQ_SMOOTHING_FACTOR: f32 = 0.5;
/// Exponential moving average
#[derive(Debug, Default)]
struct EmaBucket {
current: f32,
count: u32,
}
impl EmaBucket {
fn update(&mut self, value: f32, alpha: f32) {
if self.count == 0 {
self.current = value;
} else {
self.current += alpha * (value - self.current);
}
self.count += 1;
}
}
/// Utility struct to compare the rate of its own calls.
pub struct Freq {
ema: EmaBucket,
last: u64,
}
impl Freq {
/// Initiates a new instance
pub fn new() -> Self {
Self { ema: Default::default(), last: Default::default() }
}
/// Compares the rate of its own calls with the passed one.
pub fn is_frequent(&mut self, max_rate: Times) -> bool {
self.record();
// Two attempts is not enough to call something as frequent.
if self.ema.count < 3 {
return false
}
let rate = 1000.0 / self.ema.current; // Current EMA represents interval in ms
rate > max_rate.into()
}
fn record(&mut self) {
let now = coarsetime::Clock::now_since_epoch().as_millis() as u64;
if self.last > 0 {
self.ema.update((now - self.last) as f32, FREQ_SMOOTHING_FACTOR);
}
self.last = now;
}
}
/// Represents frequency per second, minute, hour and day
pub enum Times {
/// Per second
PerSecond(u32),
/// Per minute
PerMinute(u32),
/// Per hour
PerHour(u32),
/// Per day
PerDay(u32),
}
impl From<Times> for f32 {
fn from(value: Times) -> Self {
match value {
Times::PerSecond(v) => v as f32,
Times::PerMinute(v) => v as f32 / 60.0,
Times::PerHour(v) => v as f32 / (60.0 * 60.0),
Times::PerDay(v) => v as f32 / (60.0 * 60.0 * 24.0),
}
}
}
+62
View File
@@ -50,6 +50,21 @@ fn wo_unnecessary() {
);
}
#[test]
fn if_frequent() {
let a: i32 = 7;
let mut f = Freq::new();
warn_if_frequent!(
freq: f,
max_rate: Times::PerSecond(1),
target: "bar",
a = a,
b = ?Y::default(),
"fff {c}",
c = a,
);
}
#[test]
fn w_candidate_hash_value_assignment() {
let a: i32 = 7;
@@ -102,3 +117,50 @@ fn w_candidate_hash_aliased_unnecessary() {
"xxx",
);
}
#[test]
fn frequent_at_fourth_time() {
let mut freq = Freq::new();
assert!(!freq.is_frequent(Times::PerSecond(1)));
assert!(!freq.is_frequent(Times::PerSecond(1)));
assert!(!freq.is_frequent(Times::PerSecond(1)));
assert!(freq.is_frequent(Times::PerSecond(1)));
}
#[test]
fn not_frequent_at_fourth_time_if_slow() {
let mut freq = Freq::new();
assert!(!freq.is_frequent(Times::PerSecond(1000)));
assert!(!freq.is_frequent(Times::PerSecond(1000)));
assert!(!freq.is_frequent(Times::PerSecond(1000)));
std::thread::sleep(std::time::Duration::from_millis(10));
assert!(!freq.is_frequent(Times::PerSecond(1000)));
}
#[test]
fn calculate_rate_per_second() {
let rate: f32 = Times::PerSecond(100).into();
assert_eq!(rate, 100.0)
}
#[test]
fn calculate_rate_per_minute() {
let rate: f32 = Times::PerMinute(100).into();
assert_eq!(rate, 1.6666666)
}
#[test]
fn calculate_rate_per_hour() {
let rate: f32 = Times::PerHour(100).into();
assert_eq!(rate, 0.027777778)
}
#[test]
fn calculate_rate_per_day() {
let rate: f32 = Times::PerDay(100).into();
assert_eq!(rate, 0.0011574074)
}
@@ -91,7 +91,11 @@ pub type Result<T> = std::result::Result<T, Error>;
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> {
pub fn log_error(
result: Result<()>,
ctx: &'static str,
warn_freq: &mut gum::Freq,
) -> std::result::Result<(), FatalError> {
match result.into_nested()? {
Ok(()) => Ok(()),
Err(jfyi) => {
@@ -104,7 +108,8 @@ pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(
JfyiError::FetchPoV(_) |
JfyiError::SendResponse |
JfyiError::NoSuchPoV |
JfyiError::Runtime(_) => gum::debug!(target: LOG_TARGET, error = ?jfyi, ctx),
JfyiError::Runtime(_) =>
gum::warn_if_frequent!(freq: warn_freq, max_rate: gum::Times::PerHour(100), target: LOG_TARGET, error = ?jfyi, ctx),
}
Ok(())
},
@@ -97,6 +97,7 @@ impl AvailabilityDistributionSubsystem {
let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
let mut requester = Requester::new(metrics.clone()).fuse();
let mut warn_freq = gum::Freq::new();
{
let sender = ctx.sender().clone();
@@ -147,6 +148,7 @@ impl AvailabilityDistributionSubsystem {
.update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await,
"Error in Requester::update_fetching_heads",
&mut warn_freq,
)?;
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _)) => {
@@ -188,6 +190,7 @@ impl AvailabilityDistributionSubsystem {
)
.await,
"pov_requester::fetch_pov",
&mut warn_freq,
)?;
},
}
@@ -260,6 +260,8 @@ impl RunningTask {
let mut succeeded = false;
let mut count: u32 = 0;
let mut span = self.span.child("run-fetch-chunk-task").with_relay_parent(self.relay_parent);
let mut network_error_freq = gum::Freq::new();
let mut canceled_freq = gum::Freq::new();
// Try validators in reverse order:
while let Some(validator) = self.group.pop() {
// Report retries:
@@ -272,7 +274,10 @@ impl RunningTask {
.with_chunk_index(self.request.index.0)
.with_stage(jaeger::Stage::AvailabilityDistribution);
// Send request:
let resp = match self.do_request(&validator).await {
let resp = match self
.do_request(&validator, &mut network_error_freq, &mut canceled_freq)
.await
{
Ok(resp) => resp,
Err(TaskError::ShuttingDown) => {
gum::info!(
@@ -342,6 +347,8 @@ impl RunningTask {
async fn do_request(
&mut self,
validator: &AuthorityDiscoveryId,
nerwork_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> std::result::Result<ChunkFetchingResponse, TaskError> {
gum::trace!(
target: LOG_TARGET,
@@ -386,7 +393,9 @@ impl RunningTask {
Err(TaskError::PeerError)
},
Err(RequestError::NetworkError(err)) => {
gum::debug!(
gum::warn_if_frequent!(
freq: nerwork_error_freq,
max_rate: gum::Times::PerHour(100),
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
@@ -400,7 +409,9 @@ impl RunningTask {
Err(TaskError::PeerError)
},
Err(RequestError::Canceled(oneshot::Canceled)) => {
gum::debug!(
gum::warn_if_frequent!(
freq: canceled_freq,
max_rate: gum::Times::PerHour(100),
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
@@ -1270,6 +1270,9 @@ async fn run_inner<Context>(
let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL);
futures::pin_mut!(check_collations_stream);
let mut network_error_freq = gum::Freq::new();
let mut canceled_freq = gum::Freq::new();
loop {
select! {
_ = reputation_delay => {
@@ -1312,6 +1315,8 @@ async fn run_inner<Context>(
&mut state.requested_collations,
&state.metrics,
&state.span_per_relay_parent,
&mut network_error_freq,
&mut canceled_freq,
).await;
for (peer_id, rep) in reputation_changes {
@@ -1328,14 +1333,22 @@ async fn poll_requests(
requested_collations: &mut HashMap<PendingCollation, PerRequest>,
metrics: &Metrics,
span_per_relay_parent: &HashMap<Hash, PerLeafSpan>,
network_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> Vec<(PeerId, Rep)> {
let mut retained_requested = HashSet::new();
let mut reputation_changes = Vec::new();
for (pending_collation, per_req) in requested_collations.iter_mut() {
// Despite the await, this won't block on the response itself.
let result =
poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req)
.await;
let result = poll_collation_response(
metrics,
span_per_relay_parent,
pending_collation,
per_req,
network_error_freq,
canceled_freq,
)
.await;
if !result.is_ready() {
retained_requested.insert(pending_collation.clone());
@@ -1479,6 +1492,8 @@ async fn poll_collation_response(
spans: &HashMap<Hash, PerLeafSpan>,
pending_collation: &PendingCollation,
per_req: &mut PerRequest,
network_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> CollationFetchResult {
if never!(per_req.from_collator.is_terminated()) {
gum::error!(
@@ -1522,7 +1537,9 @@ async fn poll_collation_response(
CollationFetchResult::Error(None)
},
Err(RequestError::NetworkError(err)) => {
gum::debug!(
gum::warn_if_frequent!(
freq: network_error_freq,
max_rate: gum::Times::PerHour(100),
target: LOG_TARGET,
hash = ?pending_collation.relay_parent,
para_id = ?pending_collation.para_id,
@@ -1537,7 +1554,9 @@ async fn poll_collation_response(
CollationFetchResult::Error(Some(COST_NETWORK_ERROR))
},
Err(RequestError::Canceled(err)) => {
gum::debug!(
gum::warn_if_frequent!(
freq: canceled_freq,
max_rate: gum::Times::PerHour(100),
target: LOG_TARGET,
hash = ?pending_collation.relay_parent,
para_id = ?pending_collation.para_id,
@@ -82,13 +82,18 @@ pub enum Error {
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them.
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> {
pub fn log_error(
result: Result<()>,
ctx: &'static str,
warn_freq: &mut gum::Freq,
) -> std::result::Result<(), FatalError> {
match result.into_nested()? {
Err(jfyi) => {
match jfyi {
JfyiError::RequestedUnannouncedCandidate(_, _) =>
gum::warn!(target: LOG_TARGET, error = %jfyi, ctx),
_ => gum::debug!(target: LOG_TARGET, error = %jfyi, ctx),
_ =>
gum::warn_if_frequent!(freq: warn_freq, max_rate: gum::Times::PerHour(100), target: LOG_TARGET, error = %jfyi, ctx),
}
Ok(())
},
@@ -1810,6 +1810,8 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
)
.map_err(FatalError::SpawnTask)?;
let mut warn_freq = gum::Freq::new();
loop {
select! {
_ = reputation_delay => {
@@ -1851,7 +1853,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
result.ok_or(FatalError::RequesterReceiverFinished)?,
)
.await;
log_error(result.map_err(From::from), "handle_requester_message")?;
log_error(result.map_err(From::from), "handle_requester_message", &mut warn_freq)?;
},
MuxedMessage::Responder(result) => {
let result = self
@@ -1861,7 +1863,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
result.ok_or(FatalError::ResponderReceiverFinished)?,
)
.await;
log_error(result.map_err(From::from), "handle_responder_message")?;
log_error(result.map_err(From::from), "handle_responder_message", &mut warn_freq)?;
},
};
}