diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 9b425a770f..5b42cbaecd 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -32,11 +32,8 @@ use polkadot_node_network_protocol::{ IfDisconnected, PeerId, UnifiedReputationChange as Rep, View, }; use polkadot_node_primitives::{SignedFullStatement, Statement, UncheckedSignedFullStatement}; -use polkadot_node_subsystem_util::{ - self as util, - metrics::{self, prometheus}, - MIN_GOSSIP_PEERS, -}; +use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS}; + use polkadot_primitives::v1::{ AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, @@ -75,6 +72,10 @@ use requester::{fetch, RequesterMessage}; mod responder; use responder::{respond, ResponderMessage}; +/// Metrics for the statement distribution +pub(crate) mod metrics; +use metrics::Metrics; + #[cfg(test)] mod tests; @@ -118,7 +119,7 @@ pub struct StatementDistributionSubsystem { keystore: SyncCryptoStorePtr, /// Receiver for incoming large statement requests. req_receiver: Option>, - // Prometheus metrics + /// Prometheus metrics metrics: Metrics, } @@ -241,6 +242,10 @@ struct PeerRelayParentKnowledge { /// connecting again with new peer ids, but we assume that the resulting effective bandwidth /// for such an attack would be too low. large_statement_count: usize, + + /// We have seen a message that that is unexpected from this peer, so note this fact + /// and stop subsequent logging and peer reputation flood. + unexpected_count: usize, } impl PeerRelayParentKnowledge { @@ -504,6 +509,17 @@ impl PeerData { .check_can_receive(fingerprint, max_message_count) } + /// Receive a notice about out of view statement and returns the value of the old flag + fn receive_unexpected(&mut self, relay_parent: &Hash) -> usize { + self.view_knowledge + .get_mut(relay_parent) + .map_or(0_usize, |relay_parent_peer_knowledge| { + let old = relay_parent_peer_knowledge.unexpected_count; + relay_parent_peer_knowledge.unexpected_count += 1_usize; + old + }) + } + /// Basic flood protection for large statements. fn receive_large_statement(&mut self, relay_parent: &Hash) -> std::result::Result<(), Rep> { self.view_knowledge @@ -1332,15 +1348,42 @@ async fn handle_incoming_message<'a>( // perform only basic checks before verifying the signature // as it's more computationally heavy if let Err(rep) = peer_data.check_can_receive(&relay_parent, &fingerprint, max_message_count) { + // This situation can happen when a peer's Seconded message was lost + // but we have received the Valid statement. + // So we check it once and then ignore repeated violation to avoid + // reputation change flood. + let unexpected_count = peer_data.receive_unexpected(&relay_parent); + tracing::debug!( target: LOG_TARGET, ?relay_parent, ?peer, ?message, ?rep, + ?unexpected_count, "Error inserting received statement" ); - report_peer(ctx, peer, rep).await; + + match rep { + // This happens when a Valid statement has been received but there is no corresponding Seconded + COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE => { + metrics.on_unexpected_statement_valid(); + // Report peer merely if this is not a duplicate out-of-view statement that + // was caused by a missing Seconded statement from this peer + if unexpected_count == 0_usize { + report_peer(ctx, peer, rep).await; + } + }, + // This happens when we have an unexpected remote peer that announced Seconded + COST_UNEXPECTED_STATEMENT_REMOTE => { + metrics.on_unexpected_statement_seconded(); + report_peer(ctx, peer, rep).await; + }, + _ => { + report_peer(ctx, peer, rep).await; + }, + } + return None } @@ -1920,110 +1963,3 @@ fn requesting_peer_knows_about_candidate( .ok_or_else(|| JfyiError::NoSuchHead(*relay_parent))?; Ok(knowledge.sent_candidates.get(&candidate_hash).is_some()) } - -#[derive(Clone)] -struct MetricsInner { - statements_distributed: prometheus::Counter, - sent_requests: prometheus::Counter, - received_responses: prometheus::CounterVec, - active_leaves_update: prometheus::Histogram, - share: prometheus::Histogram, - network_bridge_update_v1: prometheus::Histogram, -} - -/// Statement Distribution metrics. -#[derive(Default, Clone)] -pub struct Metrics(Option); - -impl Metrics { - fn on_statement_distributed(&self) { - if let Some(metrics) = &self.0 { - metrics.statements_distributed.inc(); - } - } - - fn on_sent_request(&self) { - if let Some(metrics) = &self.0 { - metrics.sent_requests.inc(); - } - } - - fn on_received_response(&self, success: bool) { - if let Some(metrics) = &self.0 { - let label = if success { "succeeded" } else { "failed" }; - metrics.received_responses.with_label_values(&[label]).inc(); - } - } - - /// Provide a timer for `active_leaves_update` which observes on drop. - fn time_active_leaves_update(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer()) - } - - /// Provide a timer for `share` which observes on drop. - fn time_share(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.share.start_timer()) - } - - /// Provide a timer for `network_bridge_update_v1` which observes on drop. - fn time_network_bridge_update_v1( - &self, - ) -> Option { - self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer()) - } -} - -impl metrics::Metrics for Metrics { - fn try_register( - registry: &prometheus::Registry, - ) -> std::result::Result { - let metrics = MetricsInner { - statements_distributed: prometheus::register( - prometheus::Counter::new( - "polkadot_parachain_statements_distributed_total", - "Number of candidate validity statements distributed to other peers.", - )?, - registry, - )?, - sent_requests: prometheus::register( - prometheus::Counter::new( - "polkadot_parachain_statement_distribution_sent_requests_total", - "Number of large statement fetching requests sent.", - )?, - registry, - )?, - received_responses: prometheus::register( - prometheus::CounterVec::new( - prometheus::Opts::new( - "polkadot_parachain_statement_distribution_received_responses_total", - "Number of received responses for large statement data.", - ), - &["success"], - )?, - registry, - )?, - active_leaves_update: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "polkadot_parachain_statement_distribution_active_leaves_update", - "Time spent within `statement_distribution::active_leaves_update`", - ))?, - registry, - )?, - share: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "polkadot_parachain_statement_distribution_share", - "Time spent within `statement_distribution::share`", - ))?, - registry, - )?, - network_bridge_update_v1: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "polkadot_parachain_statement_distribution_network_bridge_update_v1", - "Time spent within `statement_distribution::network_bridge_update_v1`", - ))?, - registry, - )?, - }; - Ok(Metrics(Some(metrics))) - } -} diff --git a/polkadot/node/network/statement-distribution/src/metrics.rs b/polkadot/node/network/statement-distribution/src/metrics.rs new file mode 100644 index 0000000000..1cd6e468a1 --- /dev/null +++ b/polkadot/node/network/statement-distribution/src/metrics.rs @@ -0,0 +1,165 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Metrics for the statement distribution module + +use polkadot_node_subsystem_util::metrics::{self, prometheus}; + +#[derive(Clone)] +struct MetricsInner { + statements_distributed: prometheus::Counter, + sent_requests: prometheus::Counter, + received_responses: prometheus::CounterVec, + active_leaves_update: prometheus::Histogram, + share: prometheus::Histogram, + network_bridge_update_v1: prometheus::Histogram, + statements_unexpected: prometheus::CounterVec, +} + +/// Statement Distribution metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + /// Update statements distributed counter + pub fn on_statement_distributed(&self) { + if let Some(metrics) = &self.0 { + metrics.statements_distributed.inc(); + } + } + + /// Update sent requests counter + /// This counter is updated merely for the statements sent via request/response method, + /// meaning that it counts large statements only + pub fn on_sent_request(&self) { + if let Some(metrics) = &self.0 { + metrics.sent_requests.inc(); + } + } + + /// Update counters for the received responses with `succeeded` or `failed` labels + /// These counters are updated merely for the statements received via request/response method, + /// meaning that they count large statements only + pub fn on_received_response(&self, success: bool) { + if let Some(metrics) = &self.0 { + let label = if success { "succeeded" } else { "failed" }; + metrics.received_responses.with_label_values(&[label]).inc(); + } + } + + /// Provide a timer for `active_leaves_update` which observes on drop. + pub fn time_active_leaves_update( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer()) + } + + /// Provide a timer for `share` which observes on drop. + pub fn time_share(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.share.start_timer()) + } + + /// Provide a timer for `network_bridge_update_v1` which observes on drop. + pub fn time_network_bridge_update_v1( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer()) + } + + /// Update the out-of-view statements counter for unexpected valid statements + pub fn on_unexpected_statement_valid(&self) { + if let Some(metrics) = &self.0 { + metrics.statements_unexpected.with_label_values(&["valid"]).inc(); + } + } + + /// Update the out-of-view statements counter for unexpected seconded statements + pub fn on_unexpected_statement_seconded(&self) { + if let Some(metrics) = &self.0 { + metrics.statements_unexpected.with_label_values(&["seconded"]).inc(); + } + } + + /// Update the out-of-view statements counter for unexpected large statements + pub fn on_unexpected_statement_large(&self) { + if let Some(metrics) = &self.0 { + metrics.statements_unexpected.with_label_values(&["large"]).inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register( + registry: &prometheus::Registry, + ) -> std::result::Result { + let metrics = MetricsInner { + statements_distributed: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_statements_distributed_total", + "Number of candidate validity statements distributed to other peers.", + )?, + registry, + )?, + sent_requests: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_statement_distribution_sent_requests_total", + "Number of large statement fetching requests sent.", + )?, + registry, + )?, + received_responses: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_statement_distribution_received_responses_total", + "Number of received responses for large statement data.", + ), + &["success"], + )?, + registry, + )?, + active_leaves_update: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "polkadot_parachain_statement_distribution_active_leaves_update", + "Time spent within `statement_distribution::active_leaves_update`", + ))?, + registry, + )?, + share: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "polkadot_parachain_statement_distribution_share", + "Time spent within `statement_distribution::share`", + ))?, + registry, + )?, + network_bridge_update_v1: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "polkadot_parachain_statement_distribution_network_bridge_update_v1", + "Time spent within `statement_distribution::network_bridge_update_v1`", + ))?, + registry, + )?, + statements_unexpected: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_statement_distribution_statements_unexpected", + "Number of statements that were not expected to be received.", + ), + &["type"], + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} diff --git a/polkadot/node/network/statement-distribution/src/requester.rs b/polkadot/node/network/statement-distribution/src/requester.rs index e3a4afd61a..bf82b14812 100644 --- a/polkadot/node/network/statement-distribution/src/requester.rs +++ b/polkadot/node/network/statement-distribution/src/requester.rs @@ -32,7 +32,7 @@ use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash}; use polkadot_subsystem::{Span, Stage}; -use crate::{Metrics, COST_WRONG_HASH, LOG_TARGET}; +use crate::{metrics::Metrics, COST_WRONG_HASH, LOG_TARGET}; // In case we failed fetching from our known peers, how long we should wait before attempting a // retry, even though we have not yet discovered any new peers. Or in other words how long to @@ -117,6 +117,7 @@ pub async fn fetch( Ok(StatementFetchingResponse::Statement(statement)) => { if statement.hash() != candidate_hash { metrics.on_received_response(false); + metrics.on_unexpected_statement_large(); if let Err(err) = sender.feed(RequesterMessage::ReportPeer(peer, COST_WRONG_HASH)).await @@ -161,6 +162,7 @@ pub async fn fetch( ); metrics.on_received_response(false); + metrics.on_unexpected_statement_large(); }, }