mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
bitfield dist logging cleanup and enhancements (#5147)
* split metrics from bitfield signing * cleanup all logging * add a unit test for subset generation * chore: add one more test to assert need is properly represented * u8 as usize * chore: overseer fixin * fix test * Update node/network/bitfield-distribution/src/metrics.rs Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> * Update node/network/bitfield-distribution/src/metrics.rs Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> * fallout from suggested rename * consistency Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
61a6004cf1
commit
6e3b5f1888
Generated
+1
@@ -6344,6 +6344,7 @@ dependencies = [
|
||||
"polkadot-primitives",
|
||||
"sp-application-crypto",
|
||||
"sp-core",
|
||||
"sp-keyring",
|
||||
"sp-keystore",
|
||||
"tracing-gum",
|
||||
]
|
||||
|
||||
@@ -18,6 +18,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
maplit = "1.0.2"
|
||||
log = "0.4.13"
|
||||
env_logger = "0.9.0"
|
||||
|
||||
@@ -27,11 +27,7 @@ use futures::{channel::oneshot, FutureExt};
|
||||
use polkadot_node_network_protocol::{
|
||||
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, View,
|
||||
};
|
||||
use polkadot_node_subsystem_util::{
|
||||
self as util,
|
||||
metrics::{self, prometheus},
|
||||
MIN_GOSSIP_PEERS,
|
||||
};
|
||||
use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
|
||||
use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
|
||||
use polkadot_subsystem::{
|
||||
jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
|
||||
@@ -39,6 +35,10 @@ use polkadot_subsystem::{
|
||||
};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use self::metrics::Metrics;
|
||||
|
||||
mod metrics;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -137,19 +137,20 @@ impl PerRelayParentData {
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines if that particular message signed by a validator is needed by the given peer.
|
||||
/// Determines if that particular message signed by a
|
||||
/// validator is needed by the given peer.
|
||||
fn message_from_validator_needed_by_peer(
|
||||
&self,
|
||||
peer: &PeerId,
|
||||
validator: &ValidatorId,
|
||||
signed_by: &ValidatorId,
|
||||
) -> bool {
|
||||
self.message_sent_to_peer
|
||||
.get(peer)
|
||||
.map(|v| !v.contains(validator))
|
||||
.map(|pubkeys| !pubkeys.contains(signed_by))
|
||||
.unwrap_or(true) &&
|
||||
self.message_received_from_peer
|
||||
.get(peer)
|
||||
.map(|v| !v.contains(validator))
|
||||
.map(|pubkeys| !pubkeys.contains(signed_by))
|
||||
.unwrap_or(true)
|
||||
}
|
||||
}
|
||||
@@ -178,21 +179,29 @@ impl BitfieldDistribution {
|
||||
loop {
|
||||
let message = match ctx.recv().await {
|
||||
Ok(message) => message,
|
||||
Err(e) => {
|
||||
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
|
||||
Err(err) => {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"Failed to receive a message from Overseer, exiting"
|
||||
);
|
||||
return
|
||||
},
|
||||
};
|
||||
match message {
|
||||
FromOverseer::Communication {
|
||||
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
|
||||
msg:
|
||||
BitfieldDistributionMessage::DistributeBitfield(
|
||||
relay_parent,
|
||||
signed_availability,
|
||||
),
|
||||
} => {
|
||||
gum::trace!(target: LOG_TARGET, ?hash, "Processing DistributeBitfield");
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield");
|
||||
handle_bitfield_distribution(
|
||||
&mut ctx,
|
||||
&mut state,
|
||||
&self.metrics,
|
||||
hash,
|
||||
relay_parent,
|
||||
signed_availability,
|
||||
)
|
||||
.await;
|
||||
@@ -213,7 +222,7 @@ impl BitfieldDistribution {
|
||||
for activated in activated {
|
||||
let relay_parent = activated.hash;
|
||||
|
||||
gum::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated");
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, "activated");
|
||||
let span = PerLeafSpan::new(activated.span, "bitfield-distribution");
|
||||
let _span = span.child("query-basics");
|
||||
|
||||
@@ -230,18 +239,18 @@ impl BitfieldDistribution {
|
||||
PerRelayParentData::new(signing_context, validator_set, span),
|
||||
);
|
||||
},
|
||||
Err(e) => {
|
||||
gum::warn!(target: LOG_TARGET, err = ?e, "query_basics has failed");
|
||||
Err(err) => {
|
||||
gum::warn!(target: LOG_TARGET, ?err, "query_basics has failed");
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
},
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
|
||||
gum::trace!(target: LOG_TARGET, hash = %hash, number = %number, "block finalized");
|
||||
gum::trace!(target: LOG_TARGET, ?hash, %number, "block finalized");
|
||||
},
|
||||
FromOverseer::Signal(OverseerSignal::Conclude) => {
|
||||
gum::trace!(target: LOG_TARGET, "Conclude");
|
||||
gum::info!(target: LOG_TARGET, "Conclude");
|
||||
return
|
||||
},
|
||||
}
|
||||
@@ -250,11 +259,11 @@ impl BitfieldDistribution {
|
||||
}
|
||||
|
||||
/// Modify the reputation of a peer based on its behavior.
|
||||
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
|
||||
async fn modify_reputation<Context>(ctx: &mut Context, relay_parent: Hash, peer: PeerId, rep: Rep)
|
||||
where
|
||||
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
|
||||
{
|
||||
gum::trace!(target: LOG_TARGET, ?rep, peer_id = %peer, "reputation change");
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, ?rep, %peer, "reputation change");
|
||||
|
||||
ctx.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await
|
||||
}
|
||||
@@ -278,9 +287,9 @@ async fn handle_bitfield_distribution<Context>(
|
||||
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
|
||||
job_data
|
||||
} else {
|
||||
gum::trace!(
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = %relay_parent,
|
||||
?relay_parent,
|
||||
"Not supposed to work on relay parent related data",
|
||||
);
|
||||
|
||||
@@ -288,7 +297,7 @@ async fn handle_bitfield_distribution<Context>(
|
||||
};
|
||||
let validator_set = &job_data.validator_set;
|
||||
if validator_set.is_empty() {
|
||||
gum::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "validator set is empty");
|
||||
gum::debug!(target: LOG_TARGET, ?relay_parent, "validator set is empty");
|
||||
return
|
||||
}
|
||||
|
||||
@@ -296,7 +305,7 @@ async fn handle_bitfield_distribution<Context>(
|
||||
let validator = if let Some(validator) = validator_set.get(validator_index) {
|
||||
validator.clone()
|
||||
} else {
|
||||
gum::trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
|
||||
gum::debug!(target: LOG_TARGET, validator_index, "Could not find a validator for index");
|
||||
return
|
||||
};
|
||||
|
||||
@@ -306,7 +315,7 @@ async fn handle_bitfield_distribution<Context>(
|
||||
let peer_views = &mut state.peer_views;
|
||||
relay_message(ctx, job_data, gossip_peers, peer_views, validator, msg).await;
|
||||
|
||||
metrics.on_own_bitfield_gossipped();
|
||||
metrics.on_own_bitfield_sent();
|
||||
}
|
||||
|
||||
/// Distribute a given valid and signature checked bitfield message.
|
||||
@@ -322,13 +331,14 @@ async fn relay_message<Context>(
|
||||
) where
|
||||
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
|
||||
{
|
||||
let relay_parent = message.relay_parent;
|
||||
let span = job_data.span.child("relay-msg");
|
||||
|
||||
let _span = span.child("provisionable");
|
||||
// notify the overseer about a new and valid signed bitfield
|
||||
ctx.send_message(ProvisionerMessage::ProvisionableData(
|
||||
message.relay_parent,
|
||||
ProvisionableData::Bitfield(message.relay_parent, message.signed_availability.clone()),
|
||||
relay_parent,
|
||||
ProvisionableData::Bitfield(relay_parent, message.signed_availability.clone()),
|
||||
))
|
||||
.await;
|
||||
|
||||
@@ -372,7 +382,7 @@ async fn relay_message<Context>(
|
||||
if interested_peers.is_empty() {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = %message.relay_parent,
|
||||
?relay_parent,
|
||||
"no peers are interested in gossip for relay parent",
|
||||
);
|
||||
} else {
|
||||
@@ -398,13 +408,13 @@ async fn process_incoming_peer_message<Context>(
|
||||
let protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) = message;
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
peer_id = %origin,
|
||||
peer = %origin,
|
||||
?relay_parent,
|
||||
"received bitfield gossip from peer"
|
||||
);
|
||||
// we don't care about this, not part of our view.
|
||||
if !state.view.contains(&relay_parent) {
|
||||
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
|
||||
modify_reputation(ctx, relay_parent, origin, COST_NOT_IN_VIEW).await;
|
||||
return
|
||||
}
|
||||
|
||||
@@ -413,7 +423,7 @@ async fn process_incoming_peer_message<Context>(
|
||||
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
|
||||
job_data
|
||||
} else {
|
||||
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
|
||||
modify_reputation(ctx, relay_parent, origin, COST_NOT_IN_VIEW).await;
|
||||
return
|
||||
};
|
||||
|
||||
@@ -423,18 +433,14 @@ async fn process_incoming_peer_message<Context>(
|
||||
.span
|
||||
.child("msg-received")
|
||||
.with_peer_id(&origin)
|
||||
.with_relay_parent(relay_parent)
|
||||
.with_claimed_validator_index(validator_index)
|
||||
.with_stage(jaeger::Stage::BitfieldDistribution);
|
||||
|
||||
let validator_set = &job_data.validator_set;
|
||||
if validator_set.is_empty() {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = %relay_parent,
|
||||
?origin,
|
||||
"Validator set is empty",
|
||||
);
|
||||
modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, ?origin, "Validator set is empty",);
|
||||
modify_reputation(ctx, relay_parent, origin, COST_MISSING_PEER_SESSION_KEY).await;
|
||||
return
|
||||
}
|
||||
|
||||
@@ -444,7 +450,7 @@ async fn process_incoming_peer_message<Context>(
|
||||
let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
|
||||
validator.clone()
|
||||
} else {
|
||||
modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
|
||||
modify_reputation(ctx, relay_parent, origin, COST_VALIDATOR_INDEX_INVALID).await;
|
||||
return
|
||||
};
|
||||
|
||||
@@ -457,13 +463,13 @@ async fn process_incoming_peer_message<Context>(
|
||||
received_set.insert(validator.clone());
|
||||
} else {
|
||||
gum::trace!(target: LOG_TARGET, ?validator_index, ?origin, "Duplicate message");
|
||||
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
|
||||
modify_reputation(ctx, relay_parent, origin, COST_PEER_DUPLICATE_MESSAGE).await;
|
||||
return
|
||||
};
|
||||
|
||||
let one_per_validator = &mut (job_data.one_per_validator);
|
||||
|
||||
// only relay_message a message of a validator once
|
||||
// relay a message received from a validator at most _once_
|
||||
if let Some(old_message) = one_per_validator.get(&validator) {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -471,13 +477,13 @@ async fn process_incoming_peer_message<Context>(
|
||||
"already received a message for validator",
|
||||
);
|
||||
if old_message.signed_availability.as_unchecked() == &bitfield {
|
||||
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
|
||||
modify_reputation(ctx, relay_parent, origin, BENEFIT_VALID_MESSAGE).await;
|
||||
}
|
||||
return
|
||||
}
|
||||
let signed_availability = match bitfield.try_into_checked(&signing_context, &validator) {
|
||||
Err(_) => {
|
||||
modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
|
||||
modify_reputation(ctx, relay_parent, origin, COST_SIGNATURE_INVALID).await;
|
||||
return
|
||||
},
|
||||
Ok(bitfield) => bitfield,
|
||||
@@ -491,7 +497,7 @@ async fn process_incoming_peer_message<Context>(
|
||||
relay_message(ctx, job_data, &state.gossip_peers, &mut state.peer_views, validator, message)
|
||||
.await;
|
||||
|
||||
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
|
||||
modify_reputation(ctx, relay_parent, origin, BENEFIT_VALID_MESSAGE_FIRST).await
|
||||
}
|
||||
|
||||
/// Deal with network bridge updates and track what needs to be tracked
|
||||
@@ -507,32 +513,35 @@ async fn handle_network_msg<Context>(
|
||||
let _timer = metrics.time_handle_network_msg();
|
||||
|
||||
match bridge_message {
|
||||
NetworkBridgeEvent::PeerConnected(peerid, role, _) => {
|
||||
gum::trace!(target: LOG_TARGET, ?peerid, ?role, "Peer connected");
|
||||
NetworkBridgeEvent::PeerConnected(peer, role, _) => {
|
||||
gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected");
|
||||
// insert if none already present
|
||||
state.peer_views.entry(peerid).or_default();
|
||||
state.peer_views.entry(peer).or_default();
|
||||
},
|
||||
NetworkBridgeEvent::PeerDisconnected(peerid) => {
|
||||
gum::trace!(target: LOG_TARGET, ?peerid, "Peer disconnected");
|
||||
NetworkBridgeEvent::PeerDisconnected(peer) => {
|
||||
gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected");
|
||||
// get rid of superfluous data
|
||||
state.peer_views.remove(&peerid);
|
||||
state.peer_views.remove(&peer);
|
||||
},
|
||||
NetworkBridgeEvent::NewGossipTopology(peers) => {
|
||||
let newly_added: Vec<PeerId> = peers.difference(&state.gossip_peers).cloned().collect();
|
||||
state.gossip_peers = peers;
|
||||
for peer in newly_added {
|
||||
if let Some(view) = state.peer_views.remove(&peer) {
|
||||
handle_peer_view_change(ctx, state, peer, view).await;
|
||||
for new_peer in newly_added {
|
||||
// in case we already knew that peer in the past
|
||||
// it might have had an existing view, we use to initialize
|
||||
// and minimize the delta on `PeerViewChange` to be sent
|
||||
if let Some(old_view) = state.peer_views.remove(&new_peer) {
|
||||
handle_peer_view_change(ctx, state, new_peer, old_view).await;
|
||||
}
|
||||
}
|
||||
},
|
||||
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
|
||||
gum::trace!(target: LOG_TARGET, ?peerid, ?view, "Peer view change");
|
||||
handle_peer_view_change(ctx, state, peerid, view).await;
|
||||
NetworkBridgeEvent::PeerViewChange(peerid, new_view) => {
|
||||
gum::trace!(target: LOG_TARGET, ?peerid, ?new_view, "Peer view change");
|
||||
handle_peer_view_change(ctx, state, peerid, new_view).await;
|
||||
},
|
||||
NetworkBridgeEvent::OurViewChange(view) => {
|
||||
gum::trace!(target: LOG_TARGET, ?view, "Our view change");
|
||||
handle_our_view_change(state, view);
|
||||
NetworkBridgeEvent::OurViewChange(new_view) => {
|
||||
gum::trace!(target: LOG_TARGET, ?new_view, "Our view change");
|
||||
handle_our_view_change(state, new_view);
|
||||
},
|
||||
NetworkBridgeEvent::PeerMessage(remote, message) =>
|
||||
process_incoming_peer_message(ctx, state, metrics, remote, message).await,
|
||||
@@ -545,10 +554,12 @@ fn handle_our_view_change(state: &mut ProtocolState, view: OurView) {
|
||||
|
||||
for added in state.view.difference(&old_view) {
|
||||
if !state.per_relay_parent.contains_key(&added) {
|
||||
gum::warn!(
|
||||
// Is guaranteed to be handled in `ActiveHead` update
|
||||
// so this should never happen.
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
added = %added,
|
||||
"Our view contains {} but the overseer never told use we should work on this",
|
||||
%added,
|
||||
"Our view contains {}, but not in active heads",
|
||||
&added
|
||||
);
|
||||
}
|
||||
@@ -692,100 +703,16 @@ where
|
||||
.await;
|
||||
|
||||
match (validators_rx.await?, session_rx.await?) {
|
||||
(Ok(v), Ok(s)) =>
|
||||
Ok(Some((v, SigningContext { parent_hash: relay_parent, session_index: s }))),
|
||||
(Err(e), _) | (_, Err(e)) => {
|
||||
gum::warn!(target: LOG_TARGET, err = ?e, "Failed to fetch basics from runtime API");
|
||||
(Ok(validators), Ok(session_index)) =>
|
||||
Ok(Some((validators, SigningContext { parent_hash: relay_parent, session_index }))),
|
||||
(Err(err), _) | (_, Err(err)) => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
?err,
|
||||
"Failed to fetch basics from runtime API"
|
||||
);
|
||||
Ok(None)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
gossipped_own_availability_bitfields: prometheus::Counter<prometheus::U64>,
|
||||
received_availability_bitfields: prometheus::Counter<prometheus::U64>,
|
||||
active_leaves_update: prometheus::Histogram,
|
||||
handle_bitfield_distribution: prometheus::Histogram,
|
||||
handle_network_msg: prometheus::Histogram,
|
||||
}
|
||||
|
||||
/// Bitfield Distribution metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
impl Metrics {
|
||||
fn on_own_bitfield_gossipped(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.gossipped_own_availability_bitfields.inc();
|
||||
}
|
||||
}
|
||||
|
||||
fn on_bitfield_received(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.received_availability_bitfields.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide a timer for `active_leaves_update` which observes on drop.
|
||||
fn time_active_leaves_update(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
|
||||
}
|
||||
|
||||
/// Provide a timer for `handle_bitfield_distribution` which observes on drop.
|
||||
fn time_handle_bitfield_distribution(
|
||||
&self,
|
||||
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0
|
||||
.as_ref()
|
||||
.map(|metrics| metrics.handle_bitfield_distribution.start_timer())
|
||||
}
|
||||
|
||||
/// Provide a timer for `handle_network_msg` which observes on drop.
|
||||
fn time_handle_network_msg(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.handle_network_msg.start_timer())
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
gossipped_own_availability_bitfields: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"polkadot_parachain_gossipped_own_availabilty_bitfields_total",
|
||||
"Number of own availability bitfields sent to other peers.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
received_availability_bitfields: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"polkadot_parachain_received_availabilty_bitfields_total",
|
||||
"Number of valid availability bitfields received from other peers.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
active_leaves_update: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"polkadot_parachain_bitfield_distribution_active_leaves_update",
|
||||
"Time spent within `bitfield_distribution::active_leaves_update`",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
handle_bitfield_distribution: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"polkadot_parachain_bitfield_distribution_handle_bitfield_distribution",
|
||||
"Time spent within `bitfield_distribution::handle_bitfield_distribution`",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
handle_network_msg: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"polkadot_parachain_bitfield_distribution_handle_network_msg",
|
||||
"Time spent within `bitfield_distribution::handle_network_msg`",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use polkadot_node_subsystem_util::metrics::{prometheus, Metrics as MetricsTrait};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
sent_own_availability_bitfields: prometheus::Counter<prometheus::U64>,
|
||||
received_availability_bitfields: prometheus::Counter<prometheus::U64>,
|
||||
active_leaves_update: prometheus::Histogram,
|
||||
handle_bitfield_distribution: prometheus::Histogram,
|
||||
handle_network_msg: prometheus::Histogram,
|
||||
}
|
||||
|
||||
/// Bitfield Distribution metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
impl Metrics {
|
||||
pub(crate) fn on_own_bitfield_sent(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.sent_own_availability_bitfields.inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_bitfield_received(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.received_availability_bitfields.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide a timer for `active_leaves_update` which observes on drop.
|
||||
pub(crate) fn time_active_leaves_update(
|
||||
&self,
|
||||
) -> Option<prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
|
||||
}
|
||||
|
||||
/// Provide a timer for `handle_bitfield_distribution` which observes on drop.
|
||||
pub(crate) fn time_handle_bitfield_distribution(
|
||||
&self,
|
||||
) -> Option<prometheus::prometheus::HistogramTimer> {
|
||||
self.0
|
||||
.as_ref()
|
||||
.map(|metrics| metrics.handle_bitfield_distribution.start_timer())
|
||||
}
|
||||
|
||||
/// Provide a timer for `handle_network_msg` which observes on drop.
|
||||
pub(crate) fn time_handle_network_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.handle_network_msg.start_timer())
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsTrait for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
sent_own_availability_bitfields: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"polkadot_parachain_sent_own_availabilty_bitfields_total",
|
||||
"Number of own availability bitfields sent to other peers.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
received_availability_bitfields: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"polkadot_parachain_received_availabilty_bitfields_total",
|
||||
"Number of valid availability bitfields received from other peers.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
active_leaves_update: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"polkadot_parachain_bitfield_distribution_active_leaves_update",
|
||||
"Time spent within `bitfield_distribution::active_leaves_update`",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
handle_bitfield_distribution: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"polkadot_parachain_bitfield_distribution_handle_bitfield_distribution",
|
||||
"Time spent within `bitfield_distribution::handle_bitfield_distribution`",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
handle_network_msg: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"polkadot_parachain_bitfield_distribution_handle_network_msg",
|
||||
"Time spent within `bitfield_distribution::handle_network_msg`",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
@@ -23,9 +23,15 @@ use polkadot_node_network_protocol::{our_view, view, ObservedRole};
|
||||
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_primitives::v2::{AvailabilityBitfield, Signed, ValidatorIndex};
|
||||
use polkadot_subsystem::jaeger;
|
||||
use polkadot_subsystem::{
|
||||
jaeger,
|
||||
jaeger::{PerLeafSpan, Span},
|
||||
};
|
||||
use sp_application_crypto::AppKey;
|
||||
use sp_core::Pair as PairT;
|
||||
use sp_keyring::Sr25519Keyring;
|
||||
use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
|
||||
|
||||
use std::{iter::FromIterator as _, sync::Arc, time::Duration};
|
||||
|
||||
macro_rules! launch {
|
||||
@@ -720,3 +726,63 @@ fn do_not_send_message_back_to_origin() {
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn need_message_works() {
|
||||
let validators = vec![Sr25519Keyring::Alice.pair(), Sr25519Keyring::Bob.pair()];
|
||||
|
||||
let validator_set = Vec::from_iter(validators.iter().map(|k| ValidatorId::from(k.public())));
|
||||
|
||||
let signing_context = SigningContext { session_index: 1, parent_hash: Hash::repeat_byte(0x00) };
|
||||
let mut state = PerRelayParentData::new(
|
||||
signing_context,
|
||||
validator_set.clone(),
|
||||
PerLeafSpan::new(Arc::new(Span::Disabled), "foo"),
|
||||
);
|
||||
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
assert_ne!(peer_a, peer_b);
|
||||
|
||||
let pretend_send =
|
||||
|state: &mut PerRelayParentData, dest_peer: PeerId, signed_by: &ValidatorId| -> bool {
|
||||
if state.message_from_validator_needed_by_peer(&dest_peer, signed_by) {
|
||||
state
|
||||
.message_sent_to_peer
|
||||
.entry(dest_peer)
|
||||
.or_default()
|
||||
.insert(signed_by.clone());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let pretend_receive =
|
||||
|state: &mut PerRelayParentData, source_peer: PeerId, signed_by: &ValidatorId| {
|
||||
state
|
||||
.message_received_from_peer
|
||||
.entry(source_peer)
|
||||
.or_default()
|
||||
.insert(signed_by.clone());
|
||||
};
|
||||
|
||||
assert!(true == pretend_send(&mut state, peer_a, &validator_set[0]));
|
||||
assert!(true == pretend_send(&mut state, peer_b, &validator_set[1]));
|
||||
// sending the same thing must not be allowed
|
||||
assert!(false == pretend_send(&mut state, peer_a, &validator_set[0]));
|
||||
|
||||
// receive by Alice
|
||||
pretend_receive(&mut state, peer_a, &validator_set[0]);
|
||||
// must be marked as not needed by Alice, so attempt to send to Alice must be false
|
||||
assert!(false == pretend_send(&mut state, peer_a, &validator_set[0]));
|
||||
// but ok for Bob
|
||||
assert!(false == pretend_send(&mut state, peer_b, &validator_set[1]));
|
||||
|
||||
// receive by Bob
|
||||
pretend_receive(&mut state, peer_a, &validator_set[0]);
|
||||
// not ok for Alice
|
||||
assert!(false == pretend_send(&mut state, peer_a, &validator_set[0]));
|
||||
// also not ok for Bob
|
||||
assert!(false == pretend_send(&mut state, peer_b, &validator_set[1]));
|
||||
}
|
||||
|
||||
@@ -577,7 +577,7 @@ where
|
||||
SupportsParachains: HeadSupportsParachains,
|
||||
S: SpawnNamed,
|
||||
{
|
||||
/// Stop the overseer.
|
||||
/// Stop the `Overseer`.
|
||||
async fn stop(mut self) {
|
||||
let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
|
||||
}
|
||||
|
||||
@@ -245,3 +245,14 @@ fn tick_tack_metronome() {
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subset_generation_check() {
|
||||
let values = (0_u8..=25).collect::<Vec<_>>();
|
||||
// 12 even numbers exist
|
||||
let mut chosen = choose_random_subset::<u8, _>(|v| v & 0x01 == 0, values, 12);
|
||||
chosen.sort();
|
||||
for (idx, v) in dbg!(chosen).into_iter().enumerate() {
|
||||
assert_eq!(v as usize, idx * 2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user