diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 8bc471e7f9..40ec8982d8 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -15,13 +15,13 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-network-protocol = { path = "../protocol" } +polkadot-node-subsystem-util = { path = "../../subsystem-util"} strum = "0.20.0" parking_lot = "0.11.1" [dev-dependencies] assert_matches = "1.4.0" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } -polkadot-node-subsystem-util = { path = "../../subsystem-util"} sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } futures-timer = "3" diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index ef1c0ac1fd..ccdaf4dc79 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -40,6 +40,7 @@ use polkadot_node_network_protocol::{ PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep, ObservedRole, }; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; /// Peer set infos for network initialization. /// @@ -77,6 +78,154 @@ const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view"); // network bridge log target const LOG_TARGET: &'static str = "parachain::network-bridge"; +/// Metrics for the network bridge. +#[derive(Clone, Default)] +pub struct Metrics(Option); + +impl Metrics { + fn on_peer_connected(&self, peer_set: PeerSet) { + self.0.as_ref().map(|metrics| metrics + .connected_events + .with_label_values(&[peer_set.get_protocol_name_static()]) + .inc() + ); + } + + fn on_peer_disconnected(&self, peer_set: PeerSet) { + self.0.as_ref().map(|metrics| metrics + .disconnected_events + .with_label_values(&[peer_set.get_protocol_name_static()]) + .inc() + ); + } + + fn note_peer_count(&self, peer_set: PeerSet, count: usize) { + self.0.as_ref().map(|metrics| metrics + .peer_count + .with_label_values(&[peer_set.get_protocol_name_static()]) + .set(count as u64) + ); + } + + fn on_notification_received(&self, peer_set: PeerSet, size: usize) { + if let Some(metrics) = self.0.as_ref() { + metrics.notifications_received + .with_label_values(&[peer_set.get_protocol_name_static()]) + .inc(); + + metrics.bytes_received + .with_label_values(&[peer_set.get_protocol_name_static()]) + .inc_by(size as u64); + } + } + + fn on_notification_sent(&self, peer_set: PeerSet, size: usize, to_peers: usize) { + if let Some(metrics) = self.0.as_ref() { + metrics.notifications_sent + .with_label_values(&[peer_set.get_protocol_name_static()]) + .inc_by(to_peers as u64); + + metrics.bytes_sent + .with_label_values(&[peer_set.get_protocol_name_static()]) + .inc_by((size * to_peers) as u64); + } + } +} + +#[derive(Clone)] +struct MetricsInner { + peer_count: prometheus::GaugeVec, + connected_events: prometheus::CounterVec, + disconnected_events: prometheus::CounterVec, + + notifications_received: prometheus::CounterVec, + notifications_sent: prometheus::CounterVec, + + bytes_received: prometheus::CounterVec, + bytes_sent: prometheus::CounterVec, +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) + -> std::result::Result + { + let metrics = MetricsInner { + peer_count: prometheus::register( + prometheus::GaugeVec::new( + prometheus::Opts::new( + "parachain_peer_count", + "The number of peers on a parachain-related peer-set", + ), + &["protocol"] + )?, + registry, + )?, + connected_events: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_peer_connect_events_total", + "The number of peer connect events on a parachain notifications protocol", + ), + &["protocol"] + )?, + registry, + )?, + disconnected_events: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_peer_disconnect_events_total", + "The number of peer disconnect events on a parachain notifications protocol", + ), + &["protocol"] + )?, + registry, + )?, + notifications_received: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_notifications_received_total", + "The number of notifications received on a parachain protocol", + ), + &["protocol"] + )?, + registry, + )?, + notifications_sent: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_notifications_sent_total", + "The number of notifications sent on a parachain protocol", + ), + &["protocol"] + )?, + registry, + )?, + bytes_received: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_notification_bytes_received_total", + "The number of bytes received on a parachain notification protocol", + ), + &["protocol"] + )?, + registry, + )?, + bytes_sent: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_notification_bytes_sent_total", + "The number of bytes sent on a parachain notification protocol", + ), + &["protocol"] + )?, + registry, + )?, + }; + + Ok(Metrics(Some(metrics))) + } +} + /// Messages from and to the network. /// /// As transmitted to and received from subsystems. @@ -90,7 +239,6 @@ pub enum WireMessage { ViewUpdate(View), } - /// The network bridge subsystem. pub struct NetworkBridge { /// `Network` trait implementing type. @@ -98,6 +246,7 @@ pub struct NetworkBridge { authority_discovery_service: AD, request_multiplexer: RequestMultiplexer, sync_oracle: Box, + metrics: Metrics, } impl NetworkBridge { @@ -110,12 +259,14 @@ impl NetworkBridge { authority_discovery_service: AD, request_multiplexer: RequestMultiplexer, sync_oracle: Box, + metrics: Metrics, ) -> Self { NetworkBridge { network_service, authority_discovery_service, request_multiplexer, sync_oracle, + metrics, } } } @@ -190,6 +341,7 @@ async fn handle_subsystem_messages( validator_discovery_notifications: mpsc::Receiver, shared: Shared, sync_oracle: Box, + metrics: Metrics, ) -> Result<(), UnexpectedAbort> where Context: SubsystemContext, @@ -243,6 +395,7 @@ where &live_heads, &shared, finalized_number, + &metrics, ).await?; } } @@ -292,6 +445,7 @@ where peers, PeerSet::Validation, WireMessage::ProtocolMessage(msg), + &metrics, ).await? } NetworkBridgeMessage::SendValidationMessages(msgs) => { @@ -307,6 +461,7 @@ where peers, PeerSet::Validation, WireMessage::ProtocolMessage(msg), + &metrics, ).await? } } @@ -322,6 +477,7 @@ where peers, PeerSet::Collation, WireMessage::ProtocolMessage(msg), + &metrics, ).await? } NetworkBridgeMessage::SendCollationMessages(msgs) => { @@ -337,6 +493,7 @@ where peers, PeerSet::Collation, WireMessage::ProtocolMessage(msg), + &metrics, ).await? } } @@ -402,6 +559,7 @@ async fn handle_network_messages( mut network_service: impl Network, mut request_multiplexer: RequestMultiplexer, mut validator_discovery_notifications: mpsc::Sender, + metrics: Metrics, shared: Shared, ) -> Result<(), UnexpectedAbort> { let mut network_stream = network_service.event_stream(); @@ -442,6 +600,9 @@ async fn handle_network_messages( } } + metrics.on_peer_connected(peer_set); + metrics.note_peer_count(peer_set, peer_map.len()); + shared.local_view.clone().unwrap_or(View::default()) }; @@ -472,6 +633,7 @@ async fn handle_network_messages( WireMessage::::ViewUpdate( local_view, ), + &metrics, ).await?; } PeerSet::Collation => { @@ -493,6 +655,7 @@ async fn handle_network_messages( WireMessage::::ViewUpdate( local_view, ), + &metrics, ).await?; } } @@ -517,7 +680,12 @@ async fn handle_network_messages( PeerSet::Collation => &mut shared.collation_peers, }; - peer_map.remove(&peer).is_some() + let w = peer_map.remove(&peer).is_some(); + + metrics.on_peer_disconnected(peer_set); + metrics.note_peer_count(peer_set, peer_map.len()); + + w }; // Failure here means that the other side of the network bridge @@ -545,7 +713,10 @@ async fn handle_network_messages( .filter(|(protocol, _)| { protocol == &PeerSet::Validation.into_protocol_name() }) - .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .map(|(_, msg_bytes)| { + WireMessage::decode(&mut msg_bytes.as_ref()) + .map(|m| (m, msg_bytes.len())) + }) .collect(); let v_messages = match v_messages { @@ -566,7 +737,10 @@ async fn handle_network_messages( .filter(|(protocol, _)| { protocol == &PeerSet::Collation.into_protocol_name() }) - .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .map(|(_, msg_bytes)| { + WireMessage::decode(&mut msg_bytes.as_ref()) + .map(|m| (m, msg_bytes.len())) + }) .collect(); match c_messages { @@ -594,8 +768,10 @@ async fn handle_network_messages( if !v_messages.is_empty() { let (events, reports) = handle_peer_messages( remote.clone(), + PeerSet::Validation, &mut shared.0.lock().validation_peers, v_messages, + &metrics, ); for report in reports { @@ -608,8 +784,10 @@ async fn handle_network_messages( if !c_messages.is_empty() { let (events, reports) = handle_peer_messages( remote.clone(), + PeerSet::Collation, &mut shared.0.lock().collation_peers, c_messages, + &metrics, ); for report in reports { @@ -666,6 +844,7 @@ where network_service, request_multiplexer, authority_discovery_service, + metrics, sync_oracle, } = bridge; @@ -676,6 +855,7 @@ where network_service.clone(), request_multiplexer, validation_worker_tx, + metrics.clone(), shared.clone(), ).remote_handle(); @@ -688,6 +868,7 @@ where validation_worker_rx, shared, sync_oracle, + metrics, ); futures::pin_mut!(subsystem_event_handler); @@ -738,13 +919,14 @@ fn construct_view(live_heads: impl DoubleEndedIterator, finalized_n ) } -#[tracing::instrument(level = "trace", skip(net, ctx, shared), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(net, ctx, shared, metrics), fields(subsystem = LOG_TARGET))] async fn update_our_view( net: &mut impl Network, ctx: &mut impl SubsystemContext, live_heads: &[ActivatedLeaf], shared: &Shared, finalized_number: BlockNumber, + metrics: &Metrics, ) -> SubsystemResult<()> { let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); @@ -780,12 +962,14 @@ async fn update_our_view( net, validation_peers, WireMessage::ViewUpdate(new_view.clone()), + metrics, ).await?; send_collation_message( net, collation_peers, WireMessage::ViewUpdate(new_view), + metrics, ).await?; let our_view = OurView::new( @@ -808,11 +992,13 @@ async fn update_our_view( // Handle messages on a specific peer-set. The peer is expected to be connected on that // peer-set. -#[tracing::instrument(level = "trace", skip(peers, messages), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(peers, messages, metrics), fields(subsystem = LOG_TARGET))] fn handle_peer_messages( peer: PeerId, + peer_set: PeerSet, peers: &mut HashMap, - messages: Vec>, + messages: Vec<(WireMessage, usize)>, + metrics: &Metrics, ) -> (Vec>, Vec) { let peer_data = match peers.get_mut(&peer) { None => { @@ -824,7 +1010,9 @@ fn handle_peer_messages( let mut outgoing_messages = Vec::with_capacity(messages.len()); let mut reports = Vec::new(); - for message in messages { + for (message, size_bytes) in messages { + metrics.on_notification_received(peer_set, size_bytes); + outgoing_messages.push(match message { WireMessage::ViewUpdate(new_view) => { if new_view.len() > MAX_VIEW_HEADS || @@ -855,30 +1043,32 @@ fn handle_peer_messages( (outgoing_messages, reports) } -#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(net, peers, metrics), fields(subsystem = LOG_TARGET))] async fn send_validation_message( net: &mut impl Network, peers: I, message: WireMessage, + metrics: &Metrics, ) -> SubsystemResult<()> where I: IntoIterator, I::IntoIter: ExactSizeIterator, { - send_message(net, peers, PeerSet::Validation, message).await + send_message(net, peers, PeerSet::Validation, message, metrics).await } -#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(net, peers, metrics), fields(subsystem = LOG_TARGET))] async fn send_collation_message( net: &mut impl Network, peers: I, message: WireMessage, + metrics: &Metrics, ) -> SubsystemResult<()> where I: IntoIterator, I::IntoIter: ExactSizeIterator, { - send_message(net, peers, PeerSet::Collation, message).await + send_message(net, peers, PeerSet::Collation, message, metrics).await } @@ -1193,6 +1383,7 @@ mod tests { network_service: network, authority_discovery_service: discovery, request_multiplexer, + metrics: Metrics(None), sync_oracle, }; diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index a4fb7150d0..8d52b60923 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -50,6 +50,7 @@ pub(crate) async fn send_message( peers: I, peer_set: PeerSet, message: M, + metrics: &super::Metrics, ) -> SubsystemResult<()> where M: Encode + Clone, @@ -59,7 +60,12 @@ where let mut message_producer = stream::iter({ let peers = peers.into_iter(); let n_peers = peers.len(); - let mut message = Some(message.encode()); + let mut message = { + let encoded = message.encode(); + metrics.on_notification_sent(peer_set, encoded.len(), n_peers); + + Some(encoded) + }; peers.enumerate().map(move |(i, peer)| { // optimization: avoid cloning the message for the last peer in the diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index b661c6f956..012f81504e 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -63,12 +63,12 @@ impl Metrics { } /// Provide a timer for handling `ConnectionRequest` which observes on drop. - fn time_handle_connection_request(&self) -> Option { + fn time_handle_connection_request(&self) -> Option { self.0.as_ref().map(|metrics| metrics.handle_connection_request.start_timer()) } /// Provide a timer for `process_msg` which observes on drop. - fn time_process_msg(&self) -> Option { + fn time_process_msg(&self) -> Option { self.0.as_ref().map(|metrics| metrics.process_msg.start_timer()) } } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 6110154c49..ac3c53b744 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -548,6 +548,7 @@ where authority_discovery, request_multiplexer, Box::new(network_service.clone()), + Metrics::register(registry)?, ), provisioner: ProvisionerSubsystem::new( spawner.clone(),