Adding Dispute Participation Metrics (#6838)

* Added participation and queue sizes metrics

* First draft of all metric code

* Tests pass

* Changed Metrics to field on participation + queues

* fmt

* Improving naming

* Refactor, placing timer in ParticipationRequest

* fmt

* Final cleanup

* Revert "Final cleanup"

This reverts commit 02e5608df64b2e0f7810905e4508673b2037d351.

* Changing metric names

* Implementing Eq only for unit tests

* fmt
This commit is contained in:
Bradley Olson
2023-03-10 16:38:24 -08:00
committed by GitHub
parent 3777f2db42
commit c99362e56c
7 changed files with 171 additions and 31 deletions
@@ -96,7 +96,7 @@ impl Initialized {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;
let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender);
let participation = Participation::new(participation_sender, metrics.clone());
let highest_session = rolling_session_window.latest_session();
Self {
@@ -916,12 +916,17 @@ impl Initialized {
} else {
self.metrics.on_queued_best_effort_participation();
}
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
let r = self
.participation
.queue_participation(
ctx,
priority,
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
ParticipationRequest::new(
new_state.candidate_receipt().clone(),
session,
request_timer,
),
)
.await;
log_error(r)?;
@@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
request_timer,
),
));
}
@@ -32,6 +32,16 @@ struct MetricsInner {
vote_cleanup_time: prometheus::Histogram,
/// Number of refrained participations.
refrained_participations: prometheus::Counter<prometheus::U64>,
/// Distribution of participation durations.
participation_durations: prometheus::Histogram,
/// Measures the duration of the full participation pipeline: From when
/// a participation request is first queued to when participation in the
/// requested dispute is complete.
participation_pipeline_durations: prometheus::Histogram,
/// Size of participation priority queue
participation_priority_queue_size: prometheus::Gauge<prometheus::U64>,
/// Size of participation best effort queue
participation_best_effort_queue_size: prometheus::Gauge<prometheus::U64>,
}
/// Candidate validation metrics.
@@ -96,6 +106,36 @@ impl Metrics {
metrics.refrained_participations.inc();
}
}
/// Provide a timer for participation durations which updates on drop.
pub(crate) fn time_participation(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
}
/// Provide a timer for participation pipeline durations which updates on drop.
pub(crate) fn time_participation_pipeline(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.participation_pipeline_durations.start_timer())
}
/// Set the priority_queue_size metric
pub fn report_priority_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_priority_queue_size.set(size);
}
}
/// Set the best_effort_queue_size metric
pub fn report_best_effort_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_best_effort_queue_size.set(size);
}
}
}
impl metrics::Metrics for Metrics {
@@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
participation_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_durations",
"Time spent within fn Participation::participate",
)
)?,
registry,
)?,
participation_pipeline_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_pipeline_durations",
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
)
)?,
registry,
)?,
participation_priority_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size",
"Number of disputes waiting for local participation in the priority queue.")?,
registry,
)?,
participation_best_effort_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size",
"Number of disputes waiting for local participation in the best effort queue.")?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
@@ -48,6 +48,9 @@ mod queues;
use queues::Queues;
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};
use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;
/// How many participation processes do we want to run in parallel the most.
///
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
@@ -71,6 +74,8 @@ pub struct Participation {
worker_sender: WorkerMessageSender,
/// Some recent block for retrieving validation code from chain.
recent_block: Option<(BlockNumber, Hash)>,
/// Metrics handle cloned from Initialized
metrics: Metrics,
}
/// Message from worker tasks.
@@ -135,12 +140,13 @@ impl Participation {
/// The passed in sender will be used by background workers to communicate back their results.
/// The calling context should make sure to call `Participation::on_worker_message()` for the
/// received messages.
pub fn new(sender: WorkerMessageSender) -> Self {
pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self {
Self {
running_participations: HashSet::new(),
queue: Queues::new(),
queue: Queues::new(metrics.clone()),
worker_sender: sender,
recent_block: None,
metrics,
}
}
@@ -253,11 +259,19 @@ impl Participation {
req: ParticipationRequest,
recent_head: Hash,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
let sender = ctx.sender().clone();
ctx.spawn(
"participation-worker",
participate(self.worker_sender.clone(), sender, recent_head, req).boxed(),
participate(
self.worker_sender.clone(),
sender,
recent_head,
req,
participation_timer,
)
.boxed(),
)
.map_err(FatalError::SpawnFailed)?;
}
@@ -269,7 +283,8 @@ async fn participate(
mut result_sender: WorkerMessageSender,
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
// Hack for tests, so we get recovery messages not too early.
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
@@ -25,6 +25,9 @@ use crate::{
LOG_TARGET,
};
use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;
#[cfg(test)]
mod tests;
@@ -56,14 +59,18 @@ pub struct Queues {
/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,
/// Handle for recording queues data in metrics
metrics: Metrics,
}
/// A dispute participation request that can be queued.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Arc<Option<prometheus::HistogramTimer>>, // Sends metric data when request is dropped
}
/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
@@ -107,8 +114,17 @@ pub enum QueueError {
impl ParticipationRequest {
/// Create a new `ParticipationRequest` to be queued.
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
pub fn new(
candidate_receipt: CandidateReceipt,
session: SessionIndex,
request_timer: Arc<Option<prometheus::HistogramTimer>>,
) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
session,
_request_timer: request_timer,
}
}
pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
@@ -126,10 +142,29 @@ impl ParticipationRequest {
}
}
// We want to compare participation requests in unit tests, so we
// only implement Eq for tests.
#[cfg(test)]
impl PartialEq for ParticipationRequest {
fn eq(&self, other: &Self) -> bool {
let ParticipationRequest {
candidate_receipt,
candidate_hash,
session: _session,
_request_timer,
} = self;
candidate_receipt == other.candidate_receipt() &&
candidate_hash == other.candidate_hash() &&
self.session == other.session()
}
}
#[cfg(test)]
impl Eq for ParticipationRequest {}
impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
pub fn new(metrics: Metrics) -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), metrics }
}
/// Will put message in queue, either priority or best effort depending on priority.
@@ -154,9 +189,14 @@ impl Queues {
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
return Some(req.1)
}
self.pop_best_effort().map(|d| d.1)
if let Some(req) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
return Some(req.1)
}
None
}
/// Reprioritizes any participation requests pertaining to the
@@ -180,6 +220,9 @@ impl Queues {
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
// Report changes to both queue sizes
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
@@ -197,6 +240,8 @@ impl Queues {
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
@@ -207,6 +252,7 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
@@ -14,10 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::ParticipationPriority;
use crate::{metrics::Metrics, ParticipationPriority};
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use assert_matches::assert_matches;
use polkadot_primitives::{BlockNumber, Hash};
use std::sync::Arc;
use super::{CandidateComparator, ParticipationRequest, QueueError, Queues};
@@ -26,7 +27,8 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest {
let mut receipt = dummy_candidate_receipt(dummy_hash());
// make it differ:
receipt.commitments_hash = hash;
ParticipationRequest::new(receipt, 1)
let request_timer = Arc::new(Metrics::default().time_participation_pipeline());
ParticipationRequest::new(receipt, 1, request_timer)
}
/// Make dummy comparator for request, based on the given block number.
@@ -44,7 +46,8 @@ fn make_dummy_comparator(
/// block number should be treated with lowest priority.
#[test]
fn ordering_works_as_expected() {
let mut queue = Queues::new();
let metrics = Metrics::default();
let mut queue = Queues::new(metrics.clone());
let req1 = make_participation_request(Hash::repeat_byte(0x01));
let req_prio = make_participation_request(Hash::repeat_byte(0x02));
let req3 = make_participation_request(Hash::repeat_byte(0x03));
@@ -91,7 +94,7 @@ fn ordering_works_as_expected() {
queue.queue_with_comparator(
make_dummy_comparator(&req_prio_full, Some(3)),
ParticipationPriority::Priority,
req_prio_full
req_prio_full,
),
Err(QueueError::PriorityFull)
);
@@ -99,7 +102,7 @@ fn ordering_works_as_expected() {
queue.queue_with_comparator(
make_dummy_comparator(&req_full, Some(3)),
ParticipationPriority::BestEffort,
req_full
req_full,
),
Err(QueueError::BestEffortFull)
);
@@ -118,7 +121,8 @@ fn ordering_works_as_expected() {
/// No matter how often a candidate gets queued, it should only ever get dequeued once.
#[test]
fn candidate_is_only_dequeued_once() {
let mut queue = Queues::new();
let metrics = Metrics::default();
let mut queue = Queues::new(metrics.clone());
let req1 = make_participation_request(Hash::repeat_byte(0x01));
let req_prio = make_participation_request(Hash::repeat_byte(0x02));
let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03));
@@ -154,7 +158,6 @@ fn candidate_is_only_dequeued_once() {
req_prio.clone(),
)
.unwrap();
// Insert first as best effort:
queue
.queue_with_comparator(
@@ -195,5 +198,5 @@ fn candidate_is_only_dequeued_once() {
assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio));
assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort));
assert_eq!(queue.dequeue(), Some(req1));
assert_eq!(queue.dequeue(), None);
assert_matches!(queue.dequeue(), None);
}
@@ -72,7 +72,8 @@ async fn participate_with_commitments_hash<Context>(
};
let session = 1;
let req = ParticipationRequest::new(candidate_receipt, session);
let request_timer = Arc::new(participation.metrics.time_participation_pipeline());
let req = ParticipationRequest::new(candidate_receipt, session, request_timer);
participation
.queue_participation(ctx, ParticipationPriority::BestEffort, req)
@@ -189,7 +190,7 @@ fn same_req_wont_get_queued_if_participation_is_already_running() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
for _ in 0..MAX_PARALLEL_PARTICIPATIONS {
@@ -228,7 +229,7 @@ fn reqs_get_queued_when_out_of_capacity() {
let test = async {
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
for i in 0..MAX_PARALLEL_PARTICIPATIONS {
@@ -292,7 +293,7 @@ fn reqs_get_queued_on_no_recent_block() {
let (mut unblock_test, mut wait_for_verification) = mpsc::channel(0);
let test = async {
let (sender, _worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
participate(&mut ctx, &mut participation).await.unwrap();
// We have initiated participation but we'll block `active_leaf` so that we can check that
@@ -342,7 +343,7 @@ fn cannot_participate_if_cannot_recover_available_data() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
@@ -372,7 +373,7 @@ fn cannot_participate_if_cannot_recover_validation_code() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
@@ -409,7 +410,7 @@ fn cast_invalid_vote_if_available_data_is_invalid() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
@@ -440,7 +441,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
@@ -477,7 +478,7 @@ fn cast_invalid_vote_if_commitments_dont_match() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
@@ -514,7 +515,7 @@ fn cast_valid_vote_if_validation_passes() {
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender);
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();