approval dist imrpo (#5161)

* minor approval voting refactor

* ignore recently outdated ones

* check in another branch

* don't be a doofus

* add newline

* remove a superflous check

* fix missing ,

* consistency

* Update node/network/approval-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* fixup

Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Bernhard Schuster
2022-03-21 13:17:54 +01:00
committed by GitHub
parent 8e01ba9c03
commit 189dfdc006
2 changed files with 228 additions and 172 deletions
@@ -35,15 +35,15 @@ use polkadot_node_subsystem::{
overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
SubsystemError,
};
use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
MIN_GOSSIP_PEERS,
};
use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
use polkadot_primitives::v2::{
BlockNumber, CandidateIndex, Hash, ValidatorIndex, ValidatorSignature,
};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque};
use self::metrics::Metrics;
mod metrics;
#[cfg(test)]
mod tests;
@@ -66,6 +66,29 @@ pub struct ApprovalDistribution {
metrics: Metrics,
}
/// Contains recently finalized
/// or those pruned due to finalization.
#[derive(Default)]
struct RecentlyOutdated {
buf: VecDeque<Hash>,
}
impl RecentlyOutdated {
fn note_outdated(&mut self, hash: Hash) {
const MAX_BUF_LEN: usize = 20;
self.buf.push_back(hash);
while self.buf.len() > MAX_BUF_LEN {
let _ = self.buf.pop_front();
}
}
fn is_recent_outdated(&self, hash: &Hash) -> bool {
self.buf.contains(hash)
}
}
/// The [`State`] struct is responsible for tracking the overall state of the subsystem.
///
/// It tracks metadata about our view of the unfinalized chain,
@@ -90,6 +113,9 @@ struct State {
/// Track all our neighbors in the current gossip topology.
/// We're not necessarily connected to all of them.
gossip_peers: HashSet<PeerId>,
/// Tracks recently finalized blocks.
recent_outdated_blocks: RecentlyOutdated,
}
/// A short description of a validator's assignment or approval.
@@ -500,12 +526,14 @@ impl State {
// split_off returns everything after the given key, including the key
let split_point = finalized_number.saturating_add(1);
let mut old_blocks = self.blocks_by_number.split_off(&split_point);
// after split_off old_blocks actually contains new blocks, we need to swap
std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
old_blocks.values().flatten().for_each(|h| {
self.blocks.remove(h);
old_blocks.values().flatten().for_each(|relay_block| {
self.recent_outdated_blocks.note_outdated(*relay_block);
self.blocks.remove(relay_block);
});
}
@@ -528,11 +556,13 @@ impl State {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?block_hash,
hash = ?block_hash,
?validator_index,
"Unexpected assignment",
);
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
}
}
return
},
@@ -548,16 +578,17 @@ impl State {
hash_map::Entry::Occupied(mut peer_knowledge) => {
let peer_knowledge = peer_knowledge.get_mut();
if peer_knowledge.contains(&fingerprint) {
if peer_knowledge.received.contains(&fingerprint) {
// wasn't included before
if !peer_knowledge.received.insert(fingerprint.clone()) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
hash = ?block_hash,
?fingerprint,
"Duplicate assignment",
);
modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
}
peer_knowledge.received.insert(fingerprint);
return
}
},
@@ -565,6 +596,7 @@ impl State {
gum::debug!(
target: LOG_TARGET,
?peer_id,
hash = ?block_hash,
?fingerprint,
"Assignment from a peer is out of view",
);
@@ -601,7 +633,7 @@ impl State {
};
drop(timer);
gum::trace!(target: LOG_TARGET, ?source, ?fingerprint, ?result, "Checked assignment",);
gum::trace!(target: LOG_TARGET, hash = ?block_hash, ?source, ?fingerprint, ?result, "Checked assignment",);
match result {
AssignmentCheckResult::Accepted => {
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
@@ -619,6 +651,7 @@ impl State {
}
gum::debug!(
target: LOG_TARGET,
hash = ?block_hash,
?peer_id,
"Got an `AcceptedDuplicate` assignment",
);
@@ -627,6 +660,7 @@ impl State {
AssignmentCheckResult::TooFarInFuture => {
gum::debug!(
target: LOG_TARGET,
hash = ?block_hash,
?peer_id,
"Got an assignment too far in the future",
);
@@ -636,6 +670,7 @@ impl State {
AssignmentCheckResult::Bad(error) => {
gum::info!(
target: LOG_TARGET,
hash = ?block_hash,
?peer_id,
%error,
"Got a bad assignment from peer",
@@ -741,7 +776,9 @@ impl State {
Some(entry) if entry.candidates.get(candidate_index as usize).is_some() => entry,
_ => {
if let Some(peer_id) = source.peer_id() {
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
}
}
return
},
@@ -774,7 +811,7 @@ impl State {
hash_map::Entry::Occupied(mut knowledge) => {
let peer_knowledge = knowledge.get_mut();
if peer_knowledge.contains(&fingerprint) {
if peer_knowledge.received.contains(&fingerprint) {
if !peer_knowledge.received.insert(fingerprint.clone()) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
@@ -784,7 +821,6 @@ impl State {
modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
}
peer_knowledge.received.insert(fingerprint);
return
}
},
@@ -1221,61 +1257,15 @@ impl ApprovalDistribution {
},
};
match message {
FromOverseer::Communication {
msg: ApprovalDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
state.handle_network_msg(&mut ctx, &self.metrics, event).await;
},
FromOverseer::Communication {
msg: ApprovalDistributionMessage::NewBlocks(metas),
} => {
gum::debug!(target: LOG_TARGET, "Processing NewBlocks");
state.handle_new_blocks(&mut ctx, &self.metrics, metas).await;
},
FromOverseer::Communication {
msg: ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index),
} => {
gum::debug!(
target: LOG_TARGET,
"Distributing our assignment on candidate (block={}, index={})",
cert.block_hash,
candidate_index,
);
state
.import_and_circulate_assignment(
&mut ctx,
&self.metrics,
MessageSource::Local,
cert,
candidate_index,
)
.await;
},
FromOverseer::Communication {
msg: ApprovalDistributionMessage::DistributeApproval(vote),
} => {
gum::debug!(
target: LOG_TARGET,
"Distributing our approval vote on candidate (block={}, index={})",
vote.block_hash,
vote.candidate_index,
);
state
.import_and_circulate_approval(
&mut ctx,
&self.metrics,
MessageSource::Local,
vote,
)
.await;
},
FromOverseer::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics).await,
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
..
})) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// handled by NewBlocks
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
@@ -1285,6 +1275,55 @@ impl ApprovalDistribution {
}
}
}
async fn handle_incoming<Context>(
ctx: &mut Context,
state: &mut State,
msg: ApprovalDistributionMessage,
metrics: &Metrics,
) where
Context: SubsystemContext<Message = ApprovalDistributionMessage>,
Context: overseer::SubsystemContext<Message = ApprovalDistributionMessage>,
{
match msg {
ApprovalDistributionMessage::NetworkBridgeUpdateV1(event) => {
state.handle_network_msg(ctx, metrics, event).await;
},
ApprovalDistributionMessage::NewBlocks(metas) => {
state.handle_new_blocks(ctx, metrics, metas).await;
},
ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) => {
gum::debug!(
target: LOG_TARGET,
"Distributing our assignment on candidate (block={}, index={})",
cert.block_hash,
candidate_index,
);
state
.import_and_circulate_assignment(
ctx,
&metrics,
MessageSource::Local,
cert,
candidate_index,
)
.await;
},
ApprovalDistributionMessage::DistributeApproval(vote) => {
gum::debug!(
target: LOG_TARGET,
"Distributing our approval vote on candidate (block={}, index={})",
vote.block_hash,
vote.candidate_index,
);
state
.import_and_circulate_approval(ctx, metrics, MessageSource::Local, vote)
.await;
},
}
}
}
impl<Context> overseer::Subsystem<Context, SubsystemError> for ApprovalDistribution
@@ -1298,108 +1337,3 @@ where
SpawnedSubsystem { name: "approval-distribution-subsystem", future }
}
}
/// Approval Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
#[derive(Clone)]
struct MetricsInner {
assignments_imported_total: prometheus::Counter<prometheus::U64>,
approvals_imported_total: prometheus::Counter<prometheus::U64>,
unified_with_peer_total: prometheus::Counter<prometheus::U64>,
time_unify_with_peer: prometheus::Histogram,
time_import_pending_now_known: prometheus::Histogram,
time_awaiting_approval_voting: prometheus::Histogram,
}
impl Metrics {
fn on_assignment_imported(&self) {
if let Some(metrics) = &self.0 {
metrics.assignments_imported_total.inc();
}
}
fn on_approval_imported(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_imported_total.inc();
}
}
fn on_unify_with_peer(&self) {
if let Some(metrics) = &self.0 {
metrics.unified_with_peer_total.inc();
}
}
fn time_unify_with_peer(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_unify_with_peer.start_timer())
}
fn time_import_pending_now_known(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.time_import_pending_now_known.start_timer())
}
fn time_awaiting_approval_voting(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.time_awaiting_approval_voting.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
assignments_imported_total: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_assignments_imported_total",
"Number of valid assignments imported locally or from other peers.",
)?,
registry,
)?,
approvals_imported_total: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_approvals_imported_total",
"Number of valid approvals imported locally or from other peers.",
)?,
registry,
)?,
unified_with_peer_total: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_unified_with_peer_total",
"Number of times `unify_with_peer` is called.",
)?,
registry,
)?,
time_unify_with_peer: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_time_unify_with_peer",
"Time spent within fn `unify_with_peer`.",
))?,
registry,
)?,
time_import_pending_now_known: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_time_import_pending_now_known",
"Time spent on importing pending assignments and approvals.",
))?,
registry,
)?,
time_awaiting_approval_voting: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_time_awaiting_approval_voting",
"Time spent awaiting a reply from the Approval Voting Subsystem.",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
@@ -0,0 +1,122 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem_util::metrics::{prometheus, Metrics as MetricsTrait};
/// Approval Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
#[derive(Clone)]
struct MetricsInner {
assignments_imported_total: prometheus::Counter<prometheus::U64>,
approvals_imported_total: prometheus::Counter<prometheus::U64>,
unified_with_peer_total: prometheus::Counter<prometheus::U64>,
time_unify_with_peer: prometheus::Histogram,
time_import_pending_now_known: prometheus::Histogram,
time_awaiting_approval_voting: prometheus::Histogram,
}
impl Metrics {
pub(crate) fn on_assignment_imported(&self) {
if let Some(metrics) = &self.0 {
metrics.assignments_imported_total.inc();
}
}
pub(crate) fn on_approval_imported(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_imported_total.inc();
}
}
pub(crate) fn on_unify_with_peer(&self) {
if let Some(metrics) = &self.0 {
metrics.unified_with_peer_total.inc();
}
}
pub(crate) fn time_unify_with_peer(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_unify_with_peer.start_timer())
}
pub(crate) fn time_import_pending_now_known(
&self,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.time_import_pending_now_known.start_timer())
}
pub(crate) fn time_awaiting_approval_voting(
&self,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.time_awaiting_approval_voting.start_timer())
}
}
impl MetricsTrait for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
assignments_imported_total: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_assignments_imported_total",
"Number of valid assignments imported locally or from other peers.",
)?,
registry,
)?,
approvals_imported_total: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_approvals_imported_total",
"Number of valid approvals imported locally or from other peers.",
)?,
registry,
)?,
unified_with_peer_total: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_unified_with_peer_total",
"Number of times `unify_with_peer` is called.",
)?,
registry,
)?,
time_unify_with_peer: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_time_unify_with_peer",
"Time spent within fn `unify_with_peer`.",
))?,
registry,
)?,
time_import_pending_now_known: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_time_import_pending_now_known",
"Time spent on importing pending assignments and approvals.",
))?,
registry,
)?,
time_awaiting_approval_voting: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_time_awaiting_approval_voting",
"Time spent awaiting a reply from the Approval Voting Subsystem.",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}