Add metrics for out of view statement distribution errors (#4972)

* Add a simple metric for statements out-of-view

* Avoid repeated out-of-view peer reputation change messages

* Log reporting status

* Address review comments

* Use counter to store a number of unexpected messages from a peer

* Distinguish different unexpected statements in the metrics

* Fix labels cardinality

* Rename metric name to `statements_unexpected`

* Move metrics to a separate unit, avoid unnecessary enum

* Prefer specific methods in lieu of public constants
This commit is contained in:
Vsevolod Stakhov
2022-02-25 20:29:51 +00:00
committed by GitHub
parent 86f2d65a72
commit 44f66825c7
3 changed files with 218 additions and 115 deletions
@@ -32,11 +32,8 @@ use polkadot_node_network_protocol::{
IfDisconnected, PeerId, UnifiedReputationChange as Rep, View,
};
use polkadot_node_primitives::{SignedFullStatement, Statement, UncheckedSignedFullStatement};
use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
MIN_GOSSIP_PEERS,
};
use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash,
SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature,
@@ -75,6 +72,10 @@ use requester::{fetch, RequesterMessage};
mod responder;
use responder::{respond, ResponderMessage};
/// Metrics for the statement distribution
pub(crate) mod metrics;
use metrics::Metrics;
#[cfg(test)]
mod tests;
@@ -118,7 +119,7 @@ pub struct StatementDistributionSubsystem {
keystore: SyncCryptoStorePtr,
/// Receiver for incoming large statement requests.
req_receiver: Option<IncomingRequestReceiver<request_v1::StatementFetchingRequest>>,
// Prometheus metrics
/// Prometheus metrics
metrics: Metrics,
}
@@ -241,6 +242,10 @@ struct PeerRelayParentKnowledge {
/// connecting again with new peer ids, but we assume that the resulting effective bandwidth
/// for such an attack would be too low.
large_statement_count: usize,
/// We have seen a message that that is unexpected from this peer, so note this fact
/// and stop subsequent logging and peer reputation flood.
unexpected_count: usize,
}
impl PeerRelayParentKnowledge {
@@ -504,6 +509,17 @@ impl PeerData {
.check_can_receive(fingerprint, max_message_count)
}
/// Receive a notice about out of view statement and returns the value of the old flag
fn receive_unexpected(&mut self, relay_parent: &Hash) -> usize {
self.view_knowledge
.get_mut(relay_parent)
.map_or(0_usize, |relay_parent_peer_knowledge| {
let old = relay_parent_peer_knowledge.unexpected_count;
relay_parent_peer_knowledge.unexpected_count += 1_usize;
old
})
}
/// Basic flood protection for large statements.
fn receive_large_statement(&mut self, relay_parent: &Hash) -> std::result::Result<(), Rep> {
self.view_knowledge
@@ -1332,15 +1348,42 @@ async fn handle_incoming_message<'a>(
// perform only basic checks before verifying the signature
// as it's more computationally heavy
if let Err(rep) = peer_data.check_can_receive(&relay_parent, &fingerprint, max_message_count) {
// This situation can happen when a peer's Seconded message was lost
// but we have received the Valid statement.
// So we check it once and then ignore repeated violation to avoid
// reputation change flood.
let unexpected_count = peer_data.receive_unexpected(&relay_parent);
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
?peer,
?message,
?rep,
?unexpected_count,
"Error inserting received statement"
);
report_peer(ctx, peer, rep).await;
match rep {
// This happens when a Valid statement has been received but there is no corresponding Seconded
COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE => {
metrics.on_unexpected_statement_valid();
// Report peer merely if this is not a duplicate out-of-view statement that
// was caused by a missing Seconded statement from this peer
if unexpected_count == 0_usize {
report_peer(ctx, peer, rep).await;
}
},
// This happens when we have an unexpected remote peer that announced Seconded
COST_UNEXPECTED_STATEMENT_REMOTE => {
metrics.on_unexpected_statement_seconded();
report_peer(ctx, peer, rep).await;
},
_ => {
report_peer(ctx, peer, rep).await;
},
}
return None
}
@@ -1920,110 +1963,3 @@ fn requesting_peer_knows_about_candidate(
.ok_or_else(|| JfyiError::NoSuchHead(*relay_parent))?;
Ok(knowledge.sent_candidates.get(&candidate_hash).is_some())
}
#[derive(Clone)]
struct MetricsInner {
statements_distributed: prometheus::Counter<prometheus::U64>,
sent_requests: prometheus::Counter<prometheus::U64>,
received_responses: prometheus::CounterVec<prometheus::U64>,
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
network_bridge_update_v1: prometheus::Histogram,
}
/// Statement Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_statement_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_distributed.inc();
}
}
fn on_sent_request(&self) {
if let Some(metrics) = &self.0 {
metrics.sent_requests.inc();
}
}
fn on_received_response(&self, success: bool) {
if let Some(metrics) = &self.0 {
let label = if success { "succeeded" } else { "failed" };
metrics.received_responses.with_label_values(&[label]).inc();
}
}
/// Provide a timer for `active_leaves_update` which observes on drop.
fn time_active_leaves_update(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
}
/// Provide a timer for `share` which observes on drop.
fn time_share(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.share.start_timer())
}
/// Provide a timer for `network_bridge_update_v1` which observes on drop.
fn time_network_bridge_update_v1(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
statements_distributed: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statements_distributed_total",
"Number of candidate validity statements distributed to other peers.",
)?,
registry,
)?,
sent_requests: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statement_distribution_sent_requests_total",
"Number of large statement fetching requests sent.",
)?,
registry,
)?,
received_responses: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_statement_distribution_received_responses_total",
"Number of received responses for large statement data.",
),
&["success"],
)?,
registry,
)?,
active_leaves_update: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_statement_distribution_active_leaves_update",
"Time spent within `statement_distribution::active_leaves_update`",
))?,
registry,
)?,
share: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_statement_distribution_share",
"Time spent within `statement_distribution::share`",
))?,
registry,
)?,
network_bridge_update_v1: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_statement_distribution_network_bridge_update_v1",
"Time spent within `statement_distribution::network_bridge_update_v1`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
@@ -0,0 +1,165 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metrics for the statement distribution module
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone)]
struct MetricsInner {
statements_distributed: prometheus::Counter<prometheus::U64>,
sent_requests: prometheus::Counter<prometheus::U64>,
received_responses: prometheus::CounterVec<prometheus::U64>,
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
network_bridge_update_v1: prometheus::Histogram,
statements_unexpected: prometheus::CounterVec<prometheus::U64>,
}
/// Statement Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
/// Update statements distributed counter
pub fn on_statement_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_distributed.inc();
}
}
/// Update sent requests counter
/// This counter is updated merely for the statements sent via request/response method,
/// meaning that it counts large statements only
pub fn on_sent_request(&self) {
if let Some(metrics) = &self.0 {
metrics.sent_requests.inc();
}
}
/// Update counters for the received responses with `succeeded` or `failed` labels
/// These counters are updated merely for the statements received via request/response method,
/// meaning that they count large statements only
pub fn on_received_response(&self, success: bool) {
if let Some(metrics) = &self.0 {
let label = if success { "succeeded" } else { "failed" };
metrics.received_responses.with_label_values(&[label]).inc();
}
}
/// Provide a timer for `active_leaves_update` which observes on drop.
pub fn time_active_leaves_update(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
}
/// Provide a timer for `share` which observes on drop.
pub fn time_share(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.share.start_timer())
}
/// Provide a timer for `network_bridge_update_v1` which observes on drop.
pub fn time_network_bridge_update_v1(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer())
}
/// Update the out-of-view statements counter for unexpected valid statements
pub fn on_unexpected_statement_valid(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_unexpected.with_label_values(&["valid"]).inc();
}
}
/// Update the out-of-view statements counter for unexpected seconded statements
pub fn on_unexpected_statement_seconded(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_unexpected.with_label_values(&["seconded"]).inc();
}
}
/// Update the out-of-view statements counter for unexpected large statements
pub fn on_unexpected_statement_large(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_unexpected.with_label_values(&["large"]).inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
statements_distributed: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statements_distributed_total",
"Number of candidate validity statements distributed to other peers.",
)?,
registry,
)?,
sent_requests: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statement_distribution_sent_requests_total",
"Number of large statement fetching requests sent.",
)?,
registry,
)?,
received_responses: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_statement_distribution_received_responses_total",
"Number of received responses for large statement data.",
),
&["success"],
)?,
registry,
)?,
active_leaves_update: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_statement_distribution_active_leaves_update",
"Time spent within `statement_distribution::active_leaves_update`",
))?,
registry,
)?,
share: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_statement_distribution_share",
"Time spent within `statement_distribution::share`",
))?,
registry,
)?,
network_bridge_update_v1: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_statement_distribution_network_bridge_update_v1",
"Time spent within `statement_distribution::network_bridge_update_v1`",
))?,
registry,
)?,
statements_unexpected: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_statement_distribution_statements_unexpected",
"Number of statements that were not expected to be received.",
),
&["type"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
@@ -32,7 +32,7 @@ use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash};
use polkadot_subsystem::{Span, Stage};
use crate::{Metrics, COST_WRONG_HASH, LOG_TARGET};
use crate::{metrics::Metrics, COST_WRONG_HASH, LOG_TARGET};
// In case we failed fetching from our known peers, how long we should wait before attempting a
// retry, even though we have not yet discovered any new peers. Or in other words how long to
@@ -117,6 +117,7 @@ pub async fn fetch(
Ok(StatementFetchingResponse::Statement(statement)) => {
if statement.hash() != candidate_hash {
metrics.on_received_response(false);
metrics.on_unexpected_statement_large();
if let Err(err) =
sender.feed(RequesterMessage::ReportPeer(peer, COST_WRONG_HASH)).await
@@ -161,6 +162,7 @@ pub async fn fetch(
);
metrics.on_received_response(false);
metrics.on_unexpected_statement_large();
},
}