implement remaining subsystem metrics (#1770)

* overseer metrics: messages relayed

* provisioner metrics: cosmetic changes

* candidate selection metrics: cosmetic changes

* availability bitfields metrics

* availability distribution metrics

* PoV distribution metrics

* statement-distribution: small simplification

* statement-distribution: extract log target into a const

* statement-distribution: metrics

* address review nits
This commit is contained in:
Andronik Ordian
2020-10-01 12:08:03 +02:00
committed by GitHub
parent 693d40831d
commit 579614d127
12 changed files with 450 additions and 177 deletions
+2
View File
@@ -5144,6 +5144,7 @@ dependencies = [
"polkadot-node-primitives", "polkadot-node-primitives",
"polkadot-node-subsystem", "polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives", "polkadot-primitives",
"sp-core", "sp-core",
"sp-runtime", "sp-runtime",
@@ -5499,6 +5500,7 @@ dependencies = [
"polkadot-node-primitives", "polkadot-node-primitives",
"polkadot-node-subsystem", "polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives", "polkadot-primitives",
"sp-core", "sp-core",
"sp-keyring", "sp-keyring",
@@ -282,18 +282,18 @@ impl CandidateSelectionJob {
candidate_receipt candidate_receipt
); );
let succeeded = let result =
if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await { if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
log::warn!( log::warn!(
target: TARGET, target: TARGET,
"failed to forward invalidity note: {:?}", "failed to forward invalidity note: {:?}",
err err
); );
false Err(())
} else { } else {
true Ok(())
}; };
self.metrics.on_invalid_selection(succeeded); self.metrics.on_invalid_selection(result);
} }
} }
@@ -363,11 +363,11 @@ async fn second_candidate(
{ {
Err(err) => { Err(err) => {
log::warn!(target: TARGET, "failed to send a seconding message"); log::warn!(target: TARGET, "failed to send a seconding message");
metrics.on_second(false); metrics.on_second(Err(()));
Err(err.into()) Err(err.into())
} }
Ok(_) => { Ok(_) => {
metrics.on_second(true); metrics.on_second(Ok(()));
Ok(()) Ok(())
} }
} }
@@ -391,21 +391,21 @@ struct MetricsInner {
invalid_selections: prometheus::CounterVec<prometheus::U64>, invalid_selections: prometheus::CounterVec<prometheus::U64>,
} }
/// Candidate backing metrics. /// Candidate selection metrics.
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>); pub struct Metrics(Option<MetricsInner>);
impl Metrics { impl Metrics {
fn on_second(&self, succeeded: bool) { fn on_second(&self, result: Result<(), ()>) {
if let Some(metrics) = &self.0 { if let Some(metrics) = &self.0 {
let label = if succeeded { "succeeded" } else { "failed" }; let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.seconds.with_label_values(&[label]).inc(); metrics.seconds.with_label_values(&[label]).inc();
} }
} }
fn on_invalid_selection(&self, succeeded: bool) { fn on_invalid_selection(&self, result: Result<(), ()>) {
if let Some(metrics) = &self.0 { if let Some(metrics) = &self.0 {
let label = if succeeded { "succeeded" } else { "failed" }; let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.invalid_selections.with_label_values(&[label]).inc(); metrics.invalid_selections.with_label_values(&[label]).inc();
} }
} }
+7 -8
View File
@@ -196,9 +196,9 @@ impl ProvisioningJob {
.await .await
{ {
log::warn!(target: "provisioner", "failed to assemble or send inherent data: {:?}", err); log::warn!(target: "provisioner", "failed to assemble or send inherent data: {:?}", err);
self.metrics.on_inherent_data_request(false); self.metrics.on_inherent_data_request(Err(()));
} else { } else {
self.metrics.on_inherent_data_request(true); self.metrics.on_inherent_data_request(Ok(()));
} }
} }
ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => { ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => {
@@ -467,17 +467,16 @@ struct MetricsInner {
inherent_data_requests: prometheus::CounterVec<prometheus::U64>, inherent_data_requests: prometheus::CounterVec<prometheus::U64>,
} }
/// Candidate backing metrics. /// Provisioner metrics.
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>); pub struct Metrics(Option<MetricsInner>);
impl Metrics { impl Metrics {
fn on_inherent_data_request(&self, succeeded: bool) { fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 { if let Some(metrics) = &self.0 {
if succeeded { match response {
metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(); Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(),
} else { Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(),
metrics.inherent_data_requests.with_label_values(&["failed"]).inc();
} }
} }
} }
@@ -49,6 +49,9 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError, SubsystemContext, SubsystemError,
}; };
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{ use polkadot_node_network_protocol::{
v1 as protocol_v1, View, ReputationChange as Rep, PeerId, v1 as protocol_v1, View, ReputationChange as Rep, PeerId,
NetworkBridgeEvent, NetworkBridgeEvent,
@@ -292,6 +295,7 @@ async fn handle_network_msg<Context>(
ctx: &mut Context, ctx: &mut Context,
keystore: KeyStorePtr, keystore: KeyStorePtr,
state: &mut ProtocolState, state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>, bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>,
) -> Result<()> ) -> Result<()>
where where
@@ -307,10 +311,10 @@ where
state.peer_views.remove(&peerid); state.peer_views.remove(&peerid);
} }
NetworkBridgeEvent::PeerViewChange(peerid, view) => { NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view).await?; handle_peer_view_change(ctx, state, peerid, view, metrics).await?;
} }
NetworkBridgeEvent::OurViewChange(view) => { NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view).await?; handle_our_view_change(ctx, keystore, state, view, metrics).await?;
} }
NetworkBridgeEvent::PeerMessage(remote, msg) => { NetworkBridgeEvent::PeerMessage(remote, msg) => {
let gossiped_availability = match msg { let gossiped_availability = match msg {
@@ -318,7 +322,7 @@ where
AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk } AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk }
}; };
process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?; process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics).await?;
} }
} }
Ok(()) Ok(())
@@ -331,6 +335,7 @@ async fn handle_our_view_change<Context>(
keystore: KeyStorePtr, keystore: KeyStorePtr,
state: &mut ProtocolState, state: &mut ProtocolState,
view: View, view: View,
metrics: &Metrics,
) -> Result<()> ) -> Result<()>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -426,7 +431,7 @@ where
erasure_chunk, erasure_chunk,
}; };
send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await?; send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await?;
} }
} }
@@ -442,31 +447,34 @@ where
async fn send_tracked_gossip_message_to_peers<Context>( async fn send_tracked_gossip_message_to_peers<Context>(
ctx: &mut Context, ctx: &mut Context,
per_candidate: &mut PerCandidate, per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>, peers: Vec<PeerId>,
message: AvailabilityGossipMessage, message: AvailabilityGossipMessage,
) -> Result<()> ) -> Result<()>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
send_tracked_gossip_messages_to_peers(ctx, per_candidate, peers, iter::once(message)).await send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await
} }
#[inline(always)] #[inline(always)]
async fn send_tracked_gossip_messages_to_peer<Context>( async fn send_tracked_gossip_messages_to_peer<Context>(
ctx: &mut Context, ctx: &mut Context,
per_candidate: &mut PerCandidate, per_candidate: &mut PerCandidate,
metrics: &Metrics,
peer: PeerId, peer: PeerId,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>, message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()> ) -> Result<()>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
send_tracked_gossip_messages_to_peers(ctx, per_candidate, vec![peer], message_iter).await send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await
} }
async fn send_tracked_gossip_messages_to_peers<Context>( async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context, ctx: &mut Context,
per_candidate: &mut PerCandidate, per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>, peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>, message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()> ) -> Result<()>
@@ -503,6 +511,8 @@ where
)) ))
.await .await
.map_err::<Error, _>(Into::into)?; .map_err::<Error, _>(Into::into)?;
metrics.on_chunk_distributed();
} }
Ok(()) Ok(())
@@ -515,6 +525,7 @@ async fn handle_peer_view_change<Context>(
state: &mut ProtocolState, state: &mut ProtocolState,
origin: PeerId, origin: PeerId,
view: View, view: View,
metrics: &Metrics,
) -> Result<()> ) -> Result<()>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -560,7 +571,7 @@ where
.cloned() .cloned()
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, origin.clone(), messages).await?; send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await?;
} }
Ok(()) Ok(())
} }
@@ -588,6 +599,7 @@ async fn process_incoming_peer_message<Context>(
state: &mut ProtocolState, state: &mut ProtocolState,
origin: PeerId, origin: PeerId,
message: AvailabilityGossipMessage, message: AvailabilityGossipMessage,
metrics: &Metrics,
) -> Result<()> ) -> Result<()>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -694,13 +706,15 @@ where
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// gossip that message to interested peers // gossip that message to interested peers
send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await
} }
/// The bitfield distribution subsystem. /// The bitfield distribution subsystem.
pub struct AvailabilityDistributionSubsystem { pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index. /// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: KeyStorePtr, keystore: KeyStorePtr,
/// Prometheus metrics.
metrics: Metrics,
} }
impl AvailabilityDistributionSubsystem { impl AvailabilityDistributionSubsystem {
@@ -708,8 +722,8 @@ impl AvailabilityDistributionSubsystem {
const K: usize = 3; const K: usize = 3;
/// Create a new instance of the availability distribution. /// Create a new instance of the availability distribution.
pub fn new(keystore: KeyStorePtr) -> Self { pub fn new(keystore: KeyStorePtr, metrics: Metrics) -> Self {
Self { keystore } Self { keystore, metrics }
} }
/// Start processing work as passed on from the Overseer. /// Start processing work as passed on from the Overseer.
@@ -729,7 +743,8 @@ impl AvailabilityDistributionSubsystem {
&mut ctx, &mut ctx,
self.keystore.clone(), self.keystore.clone(),
&mut state, &mut state,
event &self.metrics,
event,
).await { ).await {
warn!( warn!(
target: TARGET, target: TARGET,
@@ -1073,5 +1088,37 @@ where
} }
#[derive(Clone)]
struct MetricsInner {
gossipped_availability_chunks: prometheus::Counter<prometheus::U64>,
}
/// Availability Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_chunk_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.gossipped_availability_chunks.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
gossipped_availability_chunks: prometheus::register(
prometheus::Counter::new(
"parachain_gossipped_availability_chunks_total",
"Number of availability chunks gossipped to other peers."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@@ -72,7 +72,7 @@ fn test_harness<T: Future<Output = ()>>(
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityDistributionSubsystem::new(keystore); let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
let subsystem = subsystem.run(context); let subsystem = subsystem.run(context);
let test_fut = test(TestHarness { virtual_overseer }); let test_fut = test(TestHarness { virtual_overseer });
@@ -13,7 +13,7 @@ codec = { package="parity-scale-codec", version = "1.3.4" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" } polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { package = "polkadot-node-subsystem-util", path = "../../subsystem-util" } polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-network-bridge = { path = "../../network/bridge" } polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-node-network-protocol = { path = "../../network/protocol" } polkadot-node-network-protocol = { path = "../../network/protocol" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -28,6 +28,9 @@ use polkadot_subsystem::messages::*;
use polkadot_subsystem::{ use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
}; };
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange}; use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@@ -130,11 +133,18 @@ impl PerRelayParentData {
const TARGET: &'static str = "bitd"; const TARGET: &'static str = "bitd";
/// The bitfield distribution subsystem. /// The bitfield distribution subsystem.
pub struct BitfieldDistribution; pub struct BitfieldDistribution {
metrics: Metrics,
}
impl BitfieldDistribution { impl BitfieldDistribution {
/// Create a new instance of the `BitfieldDistribution` subsystem.
pub fn new(metrics: Metrics) -> Self {
Self { metrics }
}
/// Start processing work as passed on from the Overseer. /// Start processing work as passed on from the Overseer.
async fn run<Context>(mut ctx: Context) -> SubsystemResult<()> async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()>
where where
Context: SubsystemContext<Message = BitfieldDistributionMessage>, Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{ {
@@ -147,7 +157,7 @@ impl BitfieldDistribution {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability), msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => { } => {
trace!(target: TARGET, "Processing DistributeBitfield"); trace!(target: TARGET, "Processing DistributeBitfield");
handle_bitfield_distribution(&mut ctx, &mut state, hash, signed_availability) handle_bitfield_distribution(&mut ctx, &mut state, &self.metrics, hash, signed_availability)
.await?; .await?;
} }
FromOverseer::Communication { FromOverseer::Communication {
@@ -155,7 +165,7 @@ impl BitfieldDistribution {
} => { } => {
trace!(target: TARGET, "Processing NetworkMessage"); trace!(target: TARGET, "Processing NetworkMessage");
// a network message was received // a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, event).await { if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
warn!(target: TARGET, "Failed to handle incomming network messages: {:?}", e); warn!(target: TARGET, "Failed to handle incomming network messages: {:?}", e);
} }
} }
@@ -221,6 +231,7 @@ where
async fn handle_bitfield_distribution<Context>( async fn handle_bitfield_distribution<Context>(
ctx: &mut Context, ctx: &mut Context,
state: &mut ProtocolState, state: &mut ProtocolState,
metrics: &Metrics,
relay_parent: Hash, relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield, signed_availability: SignedAvailabilityBitfield,
) -> SubsystemResult<()> ) -> SubsystemResult<()>
@@ -262,6 +273,8 @@ where
relay_message(ctx, job_data, peer_views, validator, msg).await?; relay_message(ctx, job_data, peer_views, validator, msg).await?;
metrics.on_own_bitfield_gossipped();
Ok(()) Ok(())
} }
@@ -330,6 +343,7 @@ where
async fn process_incoming_peer_message<Context>( async fn process_incoming_peer_message<Context>(
ctx: &mut Context, ctx: &mut Context,
state: &mut ProtocolState, state: &mut ProtocolState,
metrics: &Metrics,
origin: PeerId, origin: PeerId,
message: BitfieldGossipMessage, message: BitfieldGossipMessage,
) -> SubsystemResult<()> ) -> SubsystemResult<()>
@@ -388,6 +402,7 @@ where
.check_signature(&signing_context, &validator) .check_signature(&signing_context, &validator)
.is_ok() .is_ok()
{ {
metrics.on_bitfield_received();
let one_per_validator = &mut (job_data.one_per_validator); let one_per_validator = &mut (job_data.one_per_validator);
// only relay_message a message of a validator once // only relay_message a message of a validator once
@@ -415,6 +430,7 @@ where
async fn handle_network_msg<Context>( async fn handle_network_msg<Context>(
ctx: &mut Context, ctx: &mut Context,
state: &mut ProtocolState, state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>, bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>,
) -> SubsystemResult<()> ) -> SubsystemResult<()>
where where
@@ -443,7 +459,7 @@ where
relay_parent, relay_parent,
signed_availability: bitfield, signed_availability: bitfield,
}; };
process_incoming_peer_message(ctx, state, remote, gossiped_bitfield).await?; process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await?;
} }
} }
} }
@@ -564,7 +580,7 @@ where
fn start(self, ctx: C) -> SpawnedSubsystem { fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem { SpawnedSubsystem {
name: "bitfield-distribution-subsystem", name: "bitfield-distribution-subsystem",
future: Box::pin(async move { Self::run(ctx) }.map(|_| ())), future: Box::pin(async move { Self::run(self, ctx) }.map(|_| ())),
} }
} }
} }
@@ -605,6 +621,53 @@ where
} }
} }
#[derive(Clone)]
struct MetricsInner {
gossipped_own_availability_bitfields: prometheus::Counter<prometheus::U64>,
received_availability_bitfields: prometheus::Counter<prometheus::U64>,
}
/// Bitfield Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_own_bitfield_gossipped(&self) {
if let Some(metrics) = &self.0 {
metrics.gossipped_own_availability_bitfields.inc();
}
}
fn on_bitfield_received(&self) {
if let Some(metrics) = &self.0 {
metrics.received_availability_bitfields.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
gossipped_own_availability_bitfields: prometheus::register(
prometheus::Counter::new(
"parachain_gossipped_own_availabilty_bitfields_total",
"Number of own availability bitfields sent to other peers."
)?,
registry,
)?,
received_availability_bitfields: prometheus::register(
prometheus::Counter::new(
"parachain_received_availabilty_bitfields_total",
"Number of valid availability bitfields received from other peers."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@@ -748,6 +811,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()),
)); ));
@@ -801,6 +865,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()),
)); ));
@@ -854,6 +919,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage( NetworkBridgeEvent::PeerMessage(
peer_b.clone(), peer_b.clone(),
msg.clone().into_network_message(), msg.clone().into_network_message(),
@@ -887,6 +953,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage( NetworkBridgeEvent::PeerMessage(
peer_a.clone(), peer_a.clone(),
msg.clone().into_network_message(), msg.clone().into_network_message(),
@@ -907,6 +974,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage( NetworkBridgeEvent::PeerMessage(
peer_b.clone(), peer_b.clone(),
msg.clone().into_network_message(), msg.clone().into_network_message(),
@@ -960,6 +1028,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
)); ));
@@ -967,6 +1036,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a, hash_b]), NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a, hash_b]),
)); ));
@@ -976,6 +1046,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage( NetworkBridgeEvent::PeerMessage(
peer_b.clone(), peer_b.clone(),
msg.clone().into_network_message(), msg.clone().into_network_message(),
@@ -1018,6 +1089,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![]), NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![]),
)); ));
@@ -1032,6 +1104,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage( NetworkBridgeEvent::PeerMessage(
peer_b.clone(), peer_b.clone(),
msg.clone().into_network_message(), msg.clone().into_network_message(),
@@ -1052,6 +1125,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), NetworkBridgeEvent::PeerDisconnected(peer_b.clone()),
)); ));
@@ -1063,6 +1137,7 @@ mod test {
launch!(handle_network_msg( launch!(handle_network_msg(
&mut ctx, &mut ctx,
&mut state, &mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage( NetworkBridgeEvent::PeerMessage(
peer_a.clone(), peer_a.clone(),
msg.clone().into_network_message(), msg.clone().into_network_message(),
@@ -14,6 +14,7 @@ node-primitives = { package = "polkadot-node-primitives", path = "../../primitiv
parity-scale-codec = "1.3.4" parity-scale-codec = "1.3.4"
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" } polkadot-node-network-protocol = { path = "../../network/protocol" }
[dev-dependencies] [dev-dependencies]
+111 -42
View File
@@ -22,9 +22,12 @@
use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor};
use polkadot_subsystem::{ use polkadot_subsystem::{
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem,
messages::{
PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage,
},
}; };
use polkadot_subsystem::messages::{ use polkadot_node_subsystem_util::{
PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, metrics::{self, prometheus},
}; };
use polkadot_node_network_protocol::{ use polkadot_node_network_protocol::{
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
@@ -46,7 +49,10 @@ const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV
but was not the first to do so"); but was not the first to do so");
/// The PoV Distribution Subsystem. /// The PoV Distribution Subsystem.
pub struct PoVDistribution; pub struct PoVDistribution {
// Prometheus metrics
metrics: Metrics,
}
impl<C> Subsystem<C> for PoVDistribution impl<C> Subsystem<C> for PoVDistribution
where C: SubsystemContext<Message = PoVDistributionMessage> where C: SubsystemContext<Message = PoVDistributionMessage>
@@ -56,7 +62,7 @@ impl<C> Subsystem<C> for PoVDistribution
// within `run`. // within `run`.
SpawnedSubsystem { SpawnedSubsystem {
name: "pov-distribution-subsystem", name: "pov-distribution-subsystem",
future: run(ctx).map(|_| ()).boxed(), future: self.run(ctx).map(|_| ()).boxed(),
} }
} }
} }
@@ -65,6 +71,7 @@ struct State {
relay_parent_state: HashMap<Hash, BlockBasedState>, relay_parent_state: HashMap<Hash, BlockBasedState>,
peer_state: HashMap<PeerId, PeerState>, peer_state: HashMap<PeerId, PeerState>,
our_view: View, our_view: View,
metrics: Metrics,
} }
struct BlockBasedState { struct BlockBasedState {
@@ -206,6 +213,7 @@ async fn notify_one_we_are_awaiting_many(
async fn distribute_to_awaiting( async fn distribute_to_awaiting(
peers: &mut HashMap<PeerId, PeerState>, peers: &mut HashMap<PeerId, PeerState>,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>, ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
metrics: &Metrics,
relay_parent: Hash, relay_parent: Hash,
pov_hash: Hash, pov_hash: Hash,
pov: &PoV, pov: &PoV,
@@ -230,7 +238,11 @@ async fn distribute_to_awaiting(
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send, peers_to_send,
payload, payload,
))).await ))).await?;
metrics.on_pov_distributed();
Ok(())
} }
/// Handles a `FetchPoV` message. /// Handles a `FetchPoV` message.
@@ -307,6 +319,7 @@ async fn handle_distribute(
distribute_to_awaiting( distribute_to_awaiting(
&mut state.peer_state, &mut state.peer_state,
ctx, ctx,
&state.metrics,
relay_parent, relay_parent,
descriptor.pov_hash, descriptor.pov_hash,
&*pov, &*pov,
@@ -436,6 +449,7 @@ async fn handle_incoming_pov(
distribute_to_awaiting( distribute_to_awaiting(
&mut state.peer_state, &mut state.peer_state,
ctx, ctx,
&state.metrics,
relay_parent, relay_parent,
pov_hash, pov_hash,
&*pov, &*pov,
@@ -508,48 +522,91 @@ async fn handle_network_update(
} }
} }
async fn run( impl PoVDistribution {
mut ctx: impl SubsystemContext<Message = PoVDistributionMessage>, /// Create a new instance of `PovDistribution`.
) -> SubsystemResult<()> { pub fn new(metrics: Metrics) -> Self {
let mut state = State { Self { metrics }
relay_parent_state: HashMap::new(), }
peer_state: HashMap::new(),
our_view: View(Vec::new()),
};
loop { async fn run(
match ctx.recv().await? { self,
FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { mut ctx: impl SubsystemContext<Message = PoVDistributionMessage>,
return Ok(()); ) -> SubsystemResult<()> {
}, let mut state = State {
FromOverseer::Communication { msg } => match msg { relay_parent_state: HashMap::new(),
PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => peer_state: HashMap::new(),
handle_fetch( our_view: View(Vec::new()),
&mut state, metrics: self.metrics,
&mut ctx, };
relay_parent,
descriptor, loop {
response_sender, match ctx.recv().await? {
).await?, FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? {
PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => return Ok(());
handle_distribute( },
&mut state, FromOverseer::Communication { msg } => match msg {
&mut ctx, PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) =>
relay_parent, handle_fetch(
descriptor, &mut state,
pov, &mut ctx,
).await?, relay_parent,
PoVDistributionMessage::NetworkBridgeUpdateV1(event) => descriptor,
handle_network_update( response_sender,
&mut state, ).await?,
&mut ctx, PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
event, handle_distribute(
).await?, &mut state,
}, &mut ctx,
relay_parent,
descriptor,
pov,
).await?,
PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut state,
&mut ctx,
event,
).await?,
},
}
} }
} }
} }
#[derive(Clone)]
struct MetricsInner {
povs_distributed: prometheus::Counter<prometheus::U64>,
}
/// Availability Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_pov_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.povs_distributed.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
povs_distributed: prometheus::register(
prometheus::Counter::new(
"parachain_povs_distributed_total",
"Number of PoVs distributed to other peers."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -619,6 +676,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a, hash_b]), our_view: View(vec![hash_a, hash_b]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -698,6 +756,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -775,6 +834,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -846,6 +906,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -934,6 +995,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -997,6 +1059,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -1058,6 +1121,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -1116,6 +1180,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -1201,6 +1266,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a, hash_b]), our_view: View(vec![hash_a, hash_b]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -1263,6 +1329,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -1340,6 +1407,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -1422,6 +1490,7 @@ mod tests {
s s
}, },
our_view: View(vec![hash_a]), our_view: View(vec![hash_a]),
metrics: Default::default(),
}; };
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
@@ -16,6 +16,7 @@ parity-scale-codec = "1.3.4"
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" } polkadot-node-network-protocol = { path = "../../network/protocol" }
arrayvec = "0.5.1" arrayvec = "0.5.1"
indexmap = "1.4.0" indexmap = "1.4.0"
@@ -22,10 +22,13 @@
use polkadot_subsystem::{ use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
messages::{
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
RuntimeApiMessage, RuntimeApiRequest,
},
}; };
use polkadot_subsystem::messages::{ use polkadot_node_subsystem_util::{
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage, metrics::{self, prometheus},
RuntimeApiMessage, RuntimeApiRequest,
}; };
use node_primitives::SignedFullStatement; use node_primitives::SignedFullStatement;
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
@@ -59,8 +62,13 @@ const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new(
/// Typically we will only keep 1, but when a validator equivocates we will need to track 2. /// Typically we will only keep 1, but when a validator equivocates we will need to track 2.
const VC_THRESHOLD: usize = 2; const VC_THRESHOLD: usize = 2;
const LOG_TARGET: &str = "statement_distribution";
/// The statement distribution subsystem. /// The statement distribution subsystem.
pub struct StatementDistribution; pub struct StatementDistribution {
// Prometheus metrics
metrics: Metrics,
}
impl<C> Subsystem<C> for StatementDistribution impl<C> Subsystem<C> for StatementDistribution
where C: SubsystemContext<Message=StatementDistributionMessage> where C: SubsystemContext<Message=StatementDistributionMessage>
@@ -70,7 +78,7 @@ impl<C> Subsystem<C> for StatementDistribution
// within `run`. // within `run`.
SpawnedSubsystem { SpawnedSubsystem {
name: "statement-distribution-subsystem", name: "statement-distribution-subsystem",
future: run(ctx).map(|_| ()).boxed(), future: self.run(ctx).map(|_| ()).boxed(),
} }
} }
} }
@@ -111,14 +119,7 @@ fn note_hash(
) -> bool { ) -> bool {
if observed.contains(&h) { return true; } if observed.contains(&h) { return true; }
if observed.is_full() { observed.try_push(h).is_ok()
false
} else {
observed.try_push(h).expect("length of storage guarded above; \
only panics if length exceeds capacity; qed");
true
}
} }
/// knowledge that a peer has about goings-on in a relay parent. /// knowledge that a peer has about goings-on in a relay parent.
@@ -502,6 +503,7 @@ async fn circulate_statement_and_dependents(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
relay_parent: Hash, relay_parent: Hash,
statement: SignedFullStatement, statement: SignedFullStatement,
metrics: &Metrics,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
if let Some(active_head)= active_heads.get_mut(&relay_parent) { if let Some(active_head)= active_heads.get_mut(&relay_parent) {
@@ -529,7 +531,8 @@ async fn circulate_statement_and_dependents(
ctx, ctx,
relay_parent, relay_parent,
candidate_hash, candidate_hash,
&*active_head &*active_head,
metrics,
).await?; ).await?;
} }
} }
@@ -589,6 +592,7 @@ async fn send_statements_about(
relay_parent: Hash, relay_parent: Hash,
candidate_hash: Hash, candidate_hash: Hash,
active_head: &ActiveHeadData, active_head: &ActiveHeadData,
metrics: &Metrics,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
for statement in active_head.statements_about(candidate_hash) { for statement in active_head.statements_about(candidate_hash) {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
@@ -600,6 +604,8 @@ async fn send_statements_about(
ctx.send_message(AllMessages::NetworkBridge( ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?; )).await?;
metrics.on_statement_distributed();
} }
} }
@@ -612,7 +618,8 @@ async fn send_statements(
peer_data: &mut PeerData, peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
relay_parent: Hash, relay_parent: Hash,
active_head: &ActiveHeadData active_head: &ActiveHeadData,
metrics: &Metrics,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
for statement in active_head.statements() { for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
@@ -624,6 +631,8 @@ async fn send_statements(
ctx.send_message(AllMessages::NetworkBridge( ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?; )).await?;
metrics.on_statement_distributed();
} }
} }
@@ -652,6 +661,7 @@ async fn handle_incoming_message<'a>(
active_heads: &'a mut HashMap<Hash, ActiveHeadData>, active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage, message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
) -> SubsystemResult<Option<(Hash, &'a StoredStatement)>> { ) -> SubsystemResult<Option<(Hash, &'a StoredStatement)>> {
let (relay_parent, statement) = match message { let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
@@ -697,6 +707,7 @@ async fn handle_incoming_message<'a>(
relay_parent, relay_parent,
fingerprint.0.candidate_hash().clone(), fingerprint.0.candidate_hash().clone(),
&*active_head, &*active_head,
metrics,
).await? ).await?
} }
Ok(false) => {} Ok(false) => {}
@@ -724,6 +735,7 @@ async fn update_peer_view_and_send_unlocked(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
active_heads: &HashMap<Hash, ActiveHeadData>, active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View, new_view: View,
metrics: &Metrics,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
let old_view = std::mem::replace(&mut peer_data.view, new_view); let old_view = std::mem::replace(&mut peer_data.view, new_view);
@@ -745,6 +757,7 @@ async fn update_peer_view_and_send_unlocked(
ctx, ctx,
new, new,
active_head, active_head,
metrics,
).await?; ).await?;
} }
} }
@@ -758,6 +771,7 @@ async fn handle_network_update(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
our_view: &mut View, our_view: &mut View,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>, update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
match update { match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => { NetworkBridgeEvent::PeerConnected(peer, _role) => {
@@ -782,6 +796,7 @@ async fn handle_network_update(
active_heads, active_heads,
ctx, ctx,
message, message,
metrics,
).await?; ).await?;
if let Some((relay_parent, new)) = new_stored { if let Some((relay_parent, new)) = new_stored {
@@ -808,6 +823,7 @@ async fn handle_network_update(
ctx, ctx,
&*active_heads, &*active_heads,
view, view,
metrics,
).await ).await
} }
None => Ok(()), None => Ok(()),
@@ -819,7 +835,7 @@ async fn handle_network_update(
for new in our_view.difference(&old_view) { for new in our_view.difference(&old_view) {
if !active_heads.contains_key(&new) { if !active_heads.contains_key(&new) {
log::warn!(target: "statement_distribution", "Our network bridge view update \ log::warn!(target: LOG_TARGET, "Our network bridge view update \
inconsistent with `StartWork` messages we have received from overseer. \ inconsistent with `StartWork` messages we have received from overseer. \
Contains unknown hash {}", new); Contains unknown hash {}", new);
} }
@@ -831,94 +847,132 @@ async fn handle_network_update(
} }
async fn run( impl StatementDistribution {
mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>, async fn run(
) -> SubsystemResult<()> { self,
let mut peers: HashMap<PeerId, PeerData> = HashMap::new(); mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
let mut our_view = View::default(); ) -> SubsystemResult<()> {
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new(); let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new(); let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new();
let metrics = self.metrics;
loop { loop {
let message = ctx.recv().await?; let message = ctx.recv().await?;
match message { match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
for relay_parent in activated { for relay_parent in activated {
let (validators, session_index) = { let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel(); let (val_tx, val_rx) = oneshot::channel();
let (session_tx, session_rx) = oneshot::channel(); let (session_tx, session_rx) = oneshot::channel();
let val_message = AllMessages::RuntimeApi( let val_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request( RuntimeApiMessage::Request(
relay_parent, relay_parent,
RuntimeApiRequest::Validators(val_tx), RuntimeApiRequest::Validators(val_tx),
), ),
); );
let session_message = AllMessages::RuntimeApi( let session_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request( RuntimeApiMessage::Request(
relay_parent, relay_parent,
RuntimeApiRequest::SessionIndexForChild(session_tx), RuntimeApiRequest::SessionIndexForChild(session_tx),
), ),
); );
ctx.send_messages( ctx.send_messages(
std::iter::once(val_message).chain(std::iter::once(session_message)) std::iter::once(val_message).chain(std::iter::once(session_message))
).await?; ).await?;
match (val_rx.await?, session_rx.await?) { match (val_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => (v, s), (Ok(v), Ok(s)) => (v, s),
(Err(e), _) | (_, Err(e)) => { (Err(e), _) | (_, Err(e)) => {
log::warn!( log::warn!(
target: "statement_distribution", target: LOG_TARGET,
"Failed to fetch runtime API data for active leaf: {:?}", "Failed to fetch runtime API data for active leaf: {:?}",
e, e,
); );
// Lacking this bookkeeping might make us behave funny, although // Lacking this bookkeeping might make us behave funny, although
// not in any slashable way. But we shouldn't take down the node // not in any slashable way. But we shouldn't take down the node
// on what are likely spurious runtime API errors. // on what are likely spurious runtime API errors.
continue; continue;
}
} }
} };
};
active_heads.entry(relay_parent) active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index)); .or_insert(ActiveHeadData::new(validators, session_index));
}
} }
} FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => {
FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => { // do nothing
// do nothing
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) => {
inform_statement_listeners(
&statement,
&mut statement_listeners,
).await;
circulate_statement_and_dependents(
&mut peers,
&mut active_heads,
&mut ctx,
relay_parent,
statement,
).await?;
} }
StatementDistributionMessage::NetworkBridgeUpdateV1(event) => FromOverseer::Signal(OverseerSignal::Conclude) => break,
handle_network_update( FromOverseer::Communication { msg } => match msg {
&mut peers, StatementDistributionMessage::Share(relay_parent, statement) => {
&mut active_heads, inform_statement_listeners(
&mut ctx, &statement,
&mut our_view, &mut statement_listeners,
event, ).await;
).await?, circulate_statement_and_dependents(
StatementDistributionMessage::RegisterStatementListener(tx) => { &mut peers,
statement_listeners.push(tx); &mut active_heads,
&mut ctx,
relay_parent,
statement,
&metrics,
).await?;
}
StatementDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut peers,
&mut active_heads,
&mut ctx,
&mut our_view,
event,
&metrics,
).await?,
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
}
} }
} }
} }
Ok(())
}
}
#[derive(Clone)]
struct MetricsInner {
statements_distributed: prometheus::Counter<prometheus::U64>,
}
/// Statement Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_statement_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_distributed.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
statements_distributed: prometheus::register(
prometheus::Counter::new(
"parachain_statements_distributed_total",
"Number of candidate validity statements distributed to other peers."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
} }
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@@ -1256,6 +1310,7 @@ mod tests {
&mut ctx, &mut ctx,
&active_heads, &active_heads,
new_view.clone(), new_view.clone(),
&Default::default(),
).await.unwrap(); ).await.unwrap();
assert_eq!(peer_data.view, new_view); assert_eq!(peer_data.view, new_view);
+30 -6
View File
@@ -470,6 +470,7 @@ pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG
struct MetricsInner { struct MetricsInner {
activated_heads_total: prometheus::Counter<prometheus::U64>, activated_heads_total: prometheus::Counter<prometheus::U64>,
deactivated_heads_total: prometheus::Counter<prometheus::U64>, deactivated_heads_total: prometheus::Counter<prometheus::U64>,
messages_relayed_total: prometheus::Counter<prometheus::U64>,
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
@@ -487,6 +488,12 @@ impl Metrics {
metrics.deactivated_heads_total.inc(); metrics.deactivated_heads_total.inc();
} }
} }
fn on_message_relayed(&self) {
if let Some(metrics) = &self.0 {
metrics.messages_relayed_total.inc();
}
}
} }
impl metrics::Metrics for Metrics { impl metrics::Metrics for Metrics {
@@ -506,6 +513,13 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
messages_relayed_total: prometheus::register(
prometheus::Counter::new(
"parachain_messages_relayed_total",
"Number of messages relayed by Overseer."
)?,
registry,
)?,
}; };
Ok(Metrics(Some(metrics))) Ok(Metrics(Some(metrics)))
} }
@@ -1046,10 +1060,11 @@ where
} }
async fn route_message(&mut self, msg: AllMessages) { async fn route_message(&mut self, msg: AllMessages) {
self.metrics.on_message_relayed();
match msg { match msg {
AllMessages::CandidateValidation(msg) => { AllMessages::CandidateValidation(msg) => {
if let Some(ref mut s) = self.candidate_validation_subsystem.instance { if let Some(ref mut s) = self.candidate_validation_subsystem.instance {
let _= s.tx.send(FromOverseer::Communication { msg }).await; let _ = s.tx.send(FromOverseer::Communication { msg }).await;
} }
} }
AllMessages::CandidateBacking(msg) => { AllMessages::CandidateBacking(msg) => {
@@ -1209,6 +1224,7 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::atomic; use std::sync::atomic;
use std::collections::HashMap;
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV}; use polkadot_primitives::v1::{BlockData, CollatorPair, PoV};
@@ -1435,27 +1451,35 @@ mod tests {
handler.block_imported(second_block).await.unwrap(); handler.block_imported(second_block).await.unwrap();
handler.block_imported(third_block).await.unwrap(); handler.block_imported(third_block).await.unwrap();
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
handler.stop().await.unwrap(); handler.stop().await.unwrap();
select! { select! {
res = overseer_fut => { res = overseer_fut => {
assert!(res.is_ok()); assert!(res.is_ok());
let (activated, deactivated) = extract_metrics(&registry); let metrics = extract_metrics(&registry);
assert_eq!(activated, 3); assert_eq!(metrics["activated"], 3);
assert_eq!(deactivated, 2); assert_eq!(metrics["deactivated"], 2);
assert_eq!(metrics["relayed"], 1);
}, },
complete => (), complete => (),
} }
}); });
} }
fn extract_metrics(registry: &prometheus::Registry) -> (u64, u64) { fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> {
let gather = registry.gather(); let gather = registry.gather();
assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
(activated, deactivated) let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
let mut result = HashMap::new();
result.insert("activated", activated);
result.insert("deactivated", deactivated);
result.insert("relayed", relayed);
result
} }
// Spawn a subsystem that immediately exits. // Spawn a subsystem that immediately exits.