pvf-checker-subsystem: metrics (#4741)

* pvf-checker-subsystem: metrics

This commits adds metrics to the PVF pre-checking subsystem.

* Apply suggestions from code review

Co-authored-by: sandreim <54316454+sandreim@users.noreply.github.com>

Co-authored-by: sandreim <54316454+sandreim@users.noreply.github.com>
This commit is contained in:
Sergei Shulepov
2022-01-18 17:44:11 +01:00
committed by GitHub
parent 3800b01fe1
commit fb0258e735
5 changed files with 186 additions and 14 deletions
@@ -70,6 +70,14 @@ impl PvfData {
} }
} }
/// The result of [`InterestView::on_leaves_update`].
pub struct OnLeavesUpdateOutcome {
/// The list of PVFs that we first seen in the activated block.
pub newcomers: Vec<ValidationCodeHash>,
/// The number of PVFs that were removed from the view.
pub left_num: usize,
}
/// A structure that keeps track of relevant PVFs and judgements about them. A relevant PVF is one /// A structure that keeps track of relevant PVFs and judgements about them. A relevant PVF is one
/// that resides in at least a single active leaf. /// that resides in at least a single active leaf.
#[derive(Debug)] #[derive(Debug)]
@@ -87,7 +95,7 @@ impl InterestView {
&mut self, &mut self,
activated: Option<(Hash, Vec<ValidationCodeHash>)>, activated: Option<(Hash, Vec<ValidationCodeHash>)>,
deactivated: &[Hash], deactivated: &[Hash],
) -> Vec<ValidationCodeHash> { ) -> OnLeavesUpdateOutcome {
let mut newcomers = Vec::new(); let mut newcomers = Vec::new();
if let Some((leaf, pending_pvfs)) = activated { if let Some((leaf, pending_pvfs)) = activated {
@@ -105,19 +113,21 @@ impl InterestView {
self.active_leaves.entry(leaf).or_default().extend(pending_pvfs); self.active_leaves.entry(leaf).or_default().extend(pending_pvfs);
} }
let mut left_num = 0;
for leaf in deactivated { for leaf in deactivated {
let pvfs = self.active_leaves.remove(leaf); let pvfs = self.active_leaves.remove(leaf);
for pvf in pvfs.into_iter().flatten() { for pvf in pvfs.into_iter().flatten() {
if let btree_map::Entry::Occupied(mut o) = self.pvfs.entry(pvf) { if let btree_map::Entry::Occupied(mut o) = self.pvfs.entry(pvf) {
let now_empty = o.get_mut().remove_origin(leaf); let now_empty = o.get_mut().remove_origin(leaf);
if now_empty { if now_empty {
left_num += 1;
o.remove(); o.remove();
} }
} }
} }
} }
newcomers OnLeavesUpdateOutcome { newcomers, left_num }
} }
/// Handles a new judgement for the given `pvf`. /// Handles a new judgement for the given `pvf`.
+38 -10
View File
@@ -36,22 +36,27 @@ use std::collections::HashSet;
const LOG_TARGET: &str = "parachain::pvf-checker"; const LOG_TARGET: &str = "parachain::pvf-checker";
mod interest_view; mod interest_view;
mod metrics;
mod runtime_api; mod runtime_api;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use self::interest_view::{InterestView, Judgement}; use self::{
interest_view::{InterestView, Judgement},
metrics::Metrics,
};
/// PVF pre-checking subsystem. /// PVF pre-checking subsystem.
pub struct PvfCheckerSubsystem { pub struct PvfCheckerSubsystem {
enabled: bool, enabled: bool,
keystore: SyncCryptoStorePtr, keystore: SyncCryptoStorePtr,
metrics: Metrics,
} }
impl PvfCheckerSubsystem { impl PvfCheckerSubsystem {
pub fn new(enabled: bool, keystore: SyncCryptoStorePtr) -> Self { pub fn new(enabled: bool, keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
PvfCheckerSubsystem { enabled, keystore } PvfCheckerSubsystem { enabled, keystore, metrics }
} }
} }
@@ -62,7 +67,7 @@ where
{ {
fn start(self, ctx: Context) -> SpawnedSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem {
if self.enabled { if self.enabled {
let future = run(ctx, self.keystore) let future = run(ctx, self.keystore, self.metrics)
.map_err(|e| SubsystemError::with_origin("pvf-checker", e)) .map_err(|e| SubsystemError::with_origin("pvf-checker", e))
.boxed(); .boxed();
@@ -118,7 +123,11 @@ struct State {
FuturesUnordered<BoxFuture<'static, Option<(PreCheckOutcome, ValidationCodeHash)>>>, FuturesUnordered<BoxFuture<'static, Option<(PreCheckOutcome, ValidationCodeHash)>>>,
} }
async fn run<Context>(mut ctx: Context, keystore: SyncCryptoStorePtr) -> SubsystemResult<()> async fn run<Context>(
mut ctx: Context,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> SubsystemResult<()>
where where
Context: SubsystemContext<Message = PvfCheckerMessage>, Context: SubsystemContext<Message = PvfCheckerMessage>,
Context: overseer::SubsystemContext<Message = PvfCheckerMessage>, Context: overseer::SubsystemContext<Message = PvfCheckerMessage>,
@@ -141,6 +150,7 @@ where
&mut state, &mut state,
&mut sender, &mut sender,
&keystore, &keystore,
&metrics,
outcome, outcome,
validation_code_hash, validation_code_hash,
).await; ).await;
@@ -154,6 +164,7 @@ where
&mut state, &mut state,
&mut sender, &mut sender,
&keystore, &keystore,
&metrics,
from_overseer?, from_overseer?,
) )
.await; .await;
@@ -170,6 +181,7 @@ async fn handle_pvf_check(
state: &mut State, state: &mut State,
sender: &mut impl SubsystemSender, sender: &mut impl SubsystemSender,
keystore: &SyncCryptoStorePtr, keystore: &SyncCryptoStorePtr,
metrics: &Metrics,
outcome: PreCheckOutcome, outcome: PreCheckOutcome,
validation_code_hash: ValidationCodeHash, validation_code_hash: ValidationCodeHash,
) { ) {
@@ -218,6 +230,7 @@ async fn handle_pvf_check(
keystore, keystore,
&mut state.voted, &mut state.voted,
credentials, credentials,
metrics,
recent_block.1, recent_block.1,
session_index, session_index,
judgement, judgement,
@@ -236,6 +249,7 @@ async fn handle_from_overseer(
state: &mut State, state: &mut State,
sender: &mut impl SubsystemSender, sender: &mut impl SubsystemSender,
keystore: &SyncCryptoStorePtr, keystore: &SyncCryptoStorePtr,
metrics: &Metrics,
from_overseer: FromOverseer<PvfCheckerMessage>, from_overseer: FromOverseer<PvfCheckerMessage>,
) -> Option<Conclude> { ) -> Option<Conclude> {
match from_overseer { match from_overseer {
@@ -248,7 +262,7 @@ async fn handle_from_overseer(
None None
}, },
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_leaves_update(state, sender, keystore, update).await; handle_leaves_update(state, sender, keystore, metrics, update).await;
None None
}, },
FromOverseer::Communication { msg } => match msg { FromOverseer::Communication { msg } => match msg {
@@ -261,6 +275,7 @@ async fn handle_leaves_update(
state: &mut State, state: &mut State,
sender: &mut impl SubsystemSender, sender: &mut impl SubsystemSender,
keystore: &SyncCryptoStorePtr, keystore: &SyncCryptoStorePtr,
metrics: &Metrics,
update: ActiveLeavesUpdate, update: ActiveLeavesUpdate,
) { ) {
if let Some(activated) = update.activated { if let Some(activated) = update.activated {
@@ -280,11 +295,13 @@ async fn handle_leaves_update(
state.recent_block = Some(recent_block); state.recent_block = Some(recent_block);
// Update the PVF view and get the previously unseen PVFs and start working on them. // Update the PVF view and get the previously unseen PVFs and start working on them.
let newcomers = state let outcome = state
.view .view
.on_leaves_update(Some((activated.hash, pending_pvfs)), &update.deactivated); .on_leaves_update(Some((activated.hash, pending_pvfs)), &update.deactivated);
for newcomer in newcomers { metrics.on_pvf_observed(outcome.newcomers.len());
initiate_precheck(state, sender, recent_block_hash, newcomer).await; metrics.on_pvf_left(outcome.left_num);
for newcomer in outcome.newcomers {
initiate_precheck(state, sender, recent_block_hash, newcomer, metrics).await;
} }
if let Some((new_session_index, credentials)) = new_session_index { if let Some((new_session_index, credentials)) = new_session_index {
@@ -305,6 +322,7 @@ async fn handle_leaves_update(
keystore, keystore,
&mut state.voted, &mut state.voted,
credentials, credentials,
metrics,
recent_block_hash, recent_block_hash,
new_session_index, new_session_index,
judgement, judgement,
@@ -429,6 +447,7 @@ async fn sign_and_submit_pvf_check_statement(
keystore: &SyncCryptoStorePtr, keystore: &SyncCryptoStorePtr,
voted: &mut HashSet<ValidationCodeHash>, voted: &mut HashSet<ValidationCodeHash>,
credentials: &SigningCredentials, credentials: &SigningCredentials,
metrics: &Metrics,
relay_parent: Hash, relay_parent: Hash,
session_index: SessionIndex, session_index: SessionIndex,
judgement: Judgement, judgement: Judgement,
@@ -442,6 +461,8 @@ async fn sign_and_submit_pvf_check_statement(
judgement, judgement,
); );
metrics.on_vote_submission_started();
if voted.contains(&validation_code_hash) { if voted.contains(&validation_code_hash) {
tracing::trace!( tracing::trace!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -449,6 +470,7 @@ async fn sign_and_submit_pvf_check_statement(
?validation_code_hash, ?validation_code_hash,
"already voted for this validation code", "already voted for this validation code",
); );
metrics.on_vote_duplicate();
return return
} }
@@ -492,7 +514,9 @@ async fn sign_and_submit_pvf_check_statement(
}; };
match runtime_api::submit_pvf_check_statement(sender, relay_parent, stmt, signature).await { match runtime_api::submit_pvf_check_statement(sender, relay_parent, stmt, signature).await {
Ok(()) => (), Ok(()) => {
metrics.on_vote_submitted();
},
Err(e) => { Err(e) => {
tracing::warn!( tracing::warn!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -514,6 +538,7 @@ async fn initiate_precheck(
sender: &mut impl SubsystemSender, sender: &mut impl SubsystemSender,
relay_parent: Hash, relay_parent: Hash,
validation_code_hash: ValidationCodeHash, validation_code_hash: ValidationCodeHash,
metrics: &Metrics,
) { ) {
tracing::debug!( tracing::debug!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -528,7 +553,10 @@ async fn initiate_precheck(
CandidateValidationMessage::PreCheck(relay_parent, validation_code_hash, tx).into(), CandidateValidationMessage::PreCheck(relay_parent, validation_code_hash, tx).into(),
) )
.await; .await;
let timer = metrics.time_pre_check_judgement();
state.currently_checking.push(Box::pin(async move { state.currently_checking.push(Box::pin(async move {
let _timer = timer;
match rx.await { match rx.await {
Ok(accept) => Some((accept, validation_code_hash)), Ok(accept) => Some((accept, validation_code_hash)),
Err(oneshot::Canceled) => { Err(oneshot::Canceled) => {
@@ -0,0 +1,130 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metrics definitions for the PVF pre-checking subsystem.
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone)]
struct MetricsInner {
pre_check_judgement: prometheus::Histogram,
votes_total: prometheus::Counter<prometheus::U64>,
votes_started: prometheus::Counter<prometheus::U64>,
votes_duplicate: prometheus::Counter<prometheus::U64>,
pvfs_observed: prometheus::Counter<prometheus::U64>,
pvfs_left: prometheus::Counter<prometheus::U64>,
}
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
/// Time between sending the pre-check request to receiving the response.
pub(crate) fn time_pre_check_judgement(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.pre_check_judgement.start_timer())
}
/// Called when a PVF vote/statement is submitted.
pub(crate) fn on_vote_submitted(&self) {
if let Some(metrics) = &self.0 {
metrics.votes_total.inc();
}
}
/// Called when a PVF vote/statement is started submission.
pub(crate) fn on_vote_submission_started(&self) {
if let Some(metrics) = &self.0 {
metrics.votes_started.inc();
}
}
/// Called when the vote is a duplicate.
pub(crate) fn on_vote_duplicate(&self) {
if let Some(metrics) = &self.0 {
metrics.votes_duplicate.inc();
}
}
/// Called when a new PVF is observed.
pub(crate) fn on_pvf_observed(&self, num: usize) {
if let Some(metrics) = &self.0 {
metrics.pvfs_observed.inc_by(num as u64);
}
}
/// Called when a PVF left the view.
pub(crate) fn on_pvf_left(&self, num: usize) {
if let Some(metrics) = &self.0 {
metrics.pvfs_left.inc_by(num as u64);
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
pre_check_judgement: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_precheck_judgement",
"Time between sending the pre-check request to receiving the response.",
)
.buckets(vec![0.1, 0.5, 1.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0]),
)?,
registry,
)?,
votes_total: prometheus::register(
prometheus::Counter::new(
"polkadot_pvf_precheck_votes_total",
"The total number of votes submitted.",
)?,
registry,
)?,
votes_started: prometheus::register(
prometheus::Counter::new(
"polkadot_pvf_precheck_votes_started",
"The number of votes that are pending submission",
)?,
registry,
)?,
votes_duplicate: prometheus::register(
prometheus::Counter::new(
"polkadot_pvf_precheck_votes_duplicate",
"The number of votes that are submitted more than once for the same code within\
the same session.",
)?,
registry,
)?,
pvfs_observed: prometheus::register(
prometheus::Counter::new(
"polkadot_pvf_precheck_pvfs_observed",
"The number of new PVFs observed.",
)?,
registry,
)?,
pvfs_left: prometheus::register(
prometheus::Counter::new(
"polkadot_pvf_precheck_pvfs_left",
"The number of PVFs removed from the view.",
)?,
registry,
)?,
};
Ok(Self(Some(metrics)))
}
}
+1 -1
View File
@@ -372,7 +372,7 @@ fn test_harness(test: impl FnOnce(TestState, VirtualOverseer) -> BoxFuture<'stat
) )
.expect("Generating keys for our node failed"); .expect("Generating keys for our node failed");
let subsystem_task = crate::run(ctx, keystore).map(|x| x.unwrap()); let subsystem_task = crate::run(ctx, keystore, crate::Metrics::default()).map(|x| x.unwrap());
let test_state = TestState::new(); let test_state = TestState::new();
let test_task = test(test_state, handle); let test_task = test(test_state, handle);
+5 -1
View File
@@ -213,7 +213,11 @@ where
Metrics::register(registry)?, // candidate-validation metrics Metrics::register(registry)?, // candidate-validation metrics
Metrics::register(registry)?, // validation host metrics Metrics::register(registry)?, // validation host metrics
)) ))
.pvf_checker(PvfCheckerSubsystem::new(pvf_checker_enabled, keystore.clone())) .pvf_checker(PvfCheckerSubsystem::new(
pvf_checker_enabled,
keystore.clone(),
Metrics::register(registry)?,
))
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?)) .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?)) .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({ .collator_protocol({