Handling timers for repeat dispute participation requests (#6901)

* Added participation and queue sizes metrics

* First draft of all metric code

* Tests pass

* Changed Metrics to field on participation + queues

* fmt

* Improving naming

* Refactor, placing timer in ParticipationRequest

* fmt

* Final cleanup

* Revert "Final cleanup"

This reverts commit 02e5608df64b2e0f7810905e4508673b2037d351.

* Changing metric names

* Implementing Eq only for unit tests

* fmt

* Removing Clone trait from ParticipationRequest

* fmt

* Moved clone functionality to tests helper

* fmt

* Fixing dropped timers on repeat requests

* Keep older best effort timers

* Removing comment redundency and explaining better

* Updating queue() to use single mem read

* fmt
This commit is contained in:
Bradley Olson
2023-03-20 17:55:35 -07:00
committed by GitHub
parent e0b8b30288
commit b26cf3b7d0
2 changed files with 51 additions and 21 deletions
@@ -14,7 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{cmp::Ordering, collections::BTreeMap};
use std::{
cmp::Ordering,
collections::{btree_map::Entry, BTreeMap},
};
use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
@@ -70,7 +73,7 @@ pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped
request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped
}
/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
@@ -119,12 +122,7 @@ impl ParticipationRequest {
session: SessionIndex,
request_timer: Option<prometheus::HistogramTimer>,
) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
session,
_request_timer: request_timer,
}
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer }
}
pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
@@ -147,15 +145,11 @@ impl ParticipationRequest {
#[cfg(test)]
impl PartialEq for ParticipationRequest {
fn eq(&self, other: &Self) -> bool {
let ParticipationRequest {
candidate_receipt,
candidate_hash,
session: _session,
_request_timer,
} = self;
let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } =
self;
candidate_receipt == other.candidate_receipt() &&
candidate_hash == other.candidate_hash() &&
self.session == other.session()
*session == other.session()
}
}
#[cfg(test)]
@@ -227,19 +221,46 @@ impl Queues {
Ok(())
}
/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it is considered priority now.
///
/// Returns error in case a queue was found full already.
///
/// # Request timers
///
/// [`ParticipationRequest`]s contain request timers.
/// Where an old request would be replaced by a new one, we keep the old request.
/// This prevents request timers from resetting on each new request.
fn queue_with_comparator(
&mut self,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
mut req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
// Remove any best effort entry, using it to replace our new
// request.
if let Some(older_request) = self.best_effort.remove(&comparator) {
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
}
req = older_request;
}
// Keeping old request if any.
match self.priority.entry(comparator) {
Entry::Occupied(_) =>
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
},
Entry::Vacant(vac) => {
vac.insert(req);
},
}
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
@@ -251,7 +272,16 @@ impl Queues {
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
// Keeping old request if any.
match self.best_effort.entry(comparator) {
Entry::Occupied(_) =>
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
},
Entry::Vacant(vac) => {
vac.insert(req);
},
}
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
@@ -46,7 +46,7 @@ fn clone_request(request: &ParticipationRequest) -> ParticipationRequest {
candidate_receipt: request.candidate_receipt.clone(),
candidate_hash: request.candidate_hash.clone(),
session: request.session,
_request_timer: None,
request_timer: None,
}
}