mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
Network bridge metrics (#2818)
* add metrics (unused) to network bridge * fix test compilation * trigger metrics messages * add some more metrics * track sent and received notifications * restore metrics import * integrate into service * Update node/network/bridge/src/lib.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Update node/network/bridge/src/lib.rs Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
committed by
GitHub
parent
4df29e71ab
commit
ec5ad35e14
@@ -15,13 +15,13 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master
|
||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
|
||||
polkadot-node-network-protocol = { path = "../protocol" }
|
||||
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
|
||||
strum = "0.20.0"
|
||||
parking_lot = "0.11.1"
|
||||
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.4.0"
|
||||
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
|
||||
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
futures-timer = "3"
|
||||
|
||||
@@ -40,6 +40,7 @@ use polkadot_node_network_protocol::{
|
||||
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
|
||||
ObservedRole,
|
||||
};
|
||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
|
||||
/// Peer set infos for network initialization.
|
||||
///
|
||||
@@ -77,6 +78,154 @@ const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view");
|
||||
// network bridge log target
|
||||
const LOG_TARGET: &'static str = "parachain::network-bridge";
|
||||
|
||||
/// Metrics for the network bridge.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
impl Metrics {
|
||||
fn on_peer_connected(&self, peer_set: PeerSet) {
|
||||
self.0.as_ref().map(|metrics| metrics
|
||||
.connected_events
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.inc()
|
||||
);
|
||||
}
|
||||
|
||||
fn on_peer_disconnected(&self, peer_set: PeerSet) {
|
||||
self.0.as_ref().map(|metrics| metrics
|
||||
.disconnected_events
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.inc()
|
||||
);
|
||||
}
|
||||
|
||||
fn note_peer_count(&self, peer_set: PeerSet, count: usize) {
|
||||
self.0.as_ref().map(|metrics| metrics
|
||||
.peer_count
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.set(count as u64)
|
||||
);
|
||||
}
|
||||
|
||||
fn on_notification_received(&self, peer_set: PeerSet, size: usize) {
|
||||
if let Some(metrics) = self.0.as_ref() {
|
||||
metrics.notifications_received
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.inc();
|
||||
|
||||
metrics.bytes_received
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.inc_by(size as u64);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_notification_sent(&self, peer_set: PeerSet, size: usize, to_peers: usize) {
|
||||
if let Some(metrics) = self.0.as_ref() {
|
||||
metrics.notifications_sent
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.inc_by(to_peers as u64);
|
||||
|
||||
metrics.bytes_sent
|
||||
.with_label_values(&[peer_set.get_protocol_name_static()])
|
||||
.inc_by((size * to_peers) as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
peer_count: prometheus::GaugeVec<prometheus::U64>,
|
||||
connected_events: prometheus::CounterVec<prometheus::U64>,
|
||||
disconnected_events: prometheus::CounterVec<prometheus::U64>,
|
||||
|
||||
notifications_received: prometheus::CounterVec<prometheus::U64>,
|
||||
notifications_sent: prometheus::CounterVec<prometheus::U64>,
|
||||
|
||||
bytes_received: prometheus::CounterVec<prometheus::U64>,
|
||||
bytes_sent: prometheus::CounterVec<prometheus::U64>,
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry)
|
||||
-> std::result::Result<Self, prometheus::PrometheusError>
|
||||
{
|
||||
let metrics = MetricsInner {
|
||||
peer_count: prometheus::register(
|
||||
prometheus::GaugeVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_peer_count",
|
||||
"The number of peers on a parachain-related peer-set",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
connected_events: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_peer_connect_events_total",
|
||||
"The number of peer connect events on a parachain notifications protocol",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
disconnected_events: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_peer_disconnect_events_total",
|
||||
"The number of peer disconnect events on a parachain notifications protocol",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
notifications_received: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_notifications_received_total",
|
||||
"The number of notifications received on a parachain protocol",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
notifications_sent: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_notifications_sent_total",
|
||||
"The number of notifications sent on a parachain protocol",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
bytes_received: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_notification_bytes_received_total",
|
||||
"The number of bytes received on a parachain notification protocol",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
bytes_sent: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_notification_bytes_sent_total",
|
||||
"The number of bytes sent on a parachain notification protocol",
|
||||
),
|
||||
&["protocol"]
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Messages from and to the network.
|
||||
///
|
||||
/// As transmitted to and received from subsystems.
|
||||
@@ -90,7 +239,6 @@ pub enum WireMessage<M> {
|
||||
ViewUpdate(View),
|
||||
}
|
||||
|
||||
|
||||
/// The network bridge subsystem.
|
||||
pub struct NetworkBridge<N, AD> {
|
||||
/// `Network` trait implementing type.
|
||||
@@ -98,6 +246,7 @@ pub struct NetworkBridge<N, AD> {
|
||||
authority_discovery_service: AD,
|
||||
request_multiplexer: RequestMultiplexer,
|
||||
sync_oracle: Box<dyn SyncOracle + Send>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl<N, AD> NetworkBridge<N, AD> {
|
||||
@@ -110,12 +259,14 @@ impl<N, AD> NetworkBridge<N, AD> {
|
||||
authority_discovery_service: AD,
|
||||
request_multiplexer: RequestMultiplexer,
|
||||
sync_oracle: Box<dyn SyncOracle + Send>,
|
||||
metrics: Metrics,
|
||||
) -> Self {
|
||||
NetworkBridge {
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
request_multiplexer,
|
||||
sync_oracle,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -190,6 +341,7 @@ async fn handle_subsystem_messages<Context, N, AD>(
|
||||
validator_discovery_notifications: mpsc::Receiver<ValidatorDiscoveryNotification>,
|
||||
shared: Shared,
|
||||
sync_oracle: Box<dyn SyncOracle + Send>,
|
||||
metrics: Metrics,
|
||||
) -> Result<(), UnexpectedAbort>
|
||||
where
|
||||
Context: SubsystemContext<Message = NetworkBridgeMessage>,
|
||||
@@ -243,6 +395,7 @@ where
|
||||
&live_heads,
|
||||
&shared,
|
||||
finalized_number,
|
||||
&metrics,
|
||||
).await?;
|
||||
}
|
||||
}
|
||||
@@ -292,6 +445,7 @@ where
|
||||
peers,
|
||||
PeerSet::Validation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
}
|
||||
NetworkBridgeMessage::SendValidationMessages(msgs) => {
|
||||
@@ -307,6 +461,7 @@ where
|
||||
peers,
|
||||
PeerSet::Validation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
}
|
||||
}
|
||||
@@ -322,6 +477,7 @@ where
|
||||
peers,
|
||||
PeerSet::Collation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
}
|
||||
NetworkBridgeMessage::SendCollationMessages(msgs) => {
|
||||
@@ -337,6 +493,7 @@ where
|
||||
peers,
|
||||
PeerSet::Collation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
}
|
||||
}
|
||||
@@ -402,6 +559,7 @@ async fn handle_network_messages(
|
||||
mut network_service: impl Network,
|
||||
mut request_multiplexer: RequestMultiplexer,
|
||||
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
|
||||
metrics: Metrics,
|
||||
shared: Shared,
|
||||
) -> Result<(), UnexpectedAbort> {
|
||||
let mut network_stream = network_service.event_stream();
|
||||
@@ -442,6 +600,9 @@ async fn handle_network_messages(
|
||||
}
|
||||
}
|
||||
|
||||
metrics.on_peer_connected(peer_set);
|
||||
metrics.note_peer_count(peer_set, peer_map.len());
|
||||
|
||||
shared.local_view.clone().unwrap_or(View::default())
|
||||
};
|
||||
|
||||
@@ -472,6 +633,7 @@ async fn handle_network_messages(
|
||||
WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
|
||||
local_view,
|
||||
),
|
||||
&metrics,
|
||||
).await?;
|
||||
}
|
||||
PeerSet::Collation => {
|
||||
@@ -493,6 +655,7 @@ async fn handle_network_messages(
|
||||
WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(
|
||||
local_view,
|
||||
),
|
||||
&metrics,
|
||||
).await?;
|
||||
}
|
||||
}
|
||||
@@ -517,7 +680,12 @@ async fn handle_network_messages(
|
||||
PeerSet::Collation => &mut shared.collation_peers,
|
||||
};
|
||||
|
||||
peer_map.remove(&peer).is_some()
|
||||
let w = peer_map.remove(&peer).is_some();
|
||||
|
||||
metrics.on_peer_disconnected(peer_set);
|
||||
metrics.note_peer_count(peer_set, peer_map.len());
|
||||
|
||||
w
|
||||
};
|
||||
|
||||
// Failure here means that the other side of the network bridge
|
||||
@@ -545,7 +713,10 @@ async fn handle_network_messages(
|
||||
.filter(|(protocol, _)| {
|
||||
protocol == &PeerSet::Validation.into_protocol_name()
|
||||
})
|
||||
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
|
||||
.map(|(_, msg_bytes)| {
|
||||
WireMessage::decode(&mut msg_bytes.as_ref())
|
||||
.map(|m| (m, msg_bytes.len()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let v_messages = match v_messages {
|
||||
@@ -566,7 +737,10 @@ async fn handle_network_messages(
|
||||
.filter(|(protocol, _)| {
|
||||
protocol == &PeerSet::Collation.into_protocol_name()
|
||||
})
|
||||
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
|
||||
.map(|(_, msg_bytes)| {
|
||||
WireMessage::decode(&mut msg_bytes.as_ref())
|
||||
.map(|m| (m, msg_bytes.len()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
match c_messages {
|
||||
@@ -594,8 +768,10 @@ async fn handle_network_messages(
|
||||
if !v_messages.is_empty() {
|
||||
let (events, reports) = handle_peer_messages(
|
||||
remote.clone(),
|
||||
PeerSet::Validation,
|
||||
&mut shared.0.lock().validation_peers,
|
||||
v_messages,
|
||||
&metrics,
|
||||
);
|
||||
|
||||
for report in reports {
|
||||
@@ -608,8 +784,10 @@ async fn handle_network_messages(
|
||||
if !c_messages.is_empty() {
|
||||
let (events, reports) = handle_peer_messages(
|
||||
remote.clone(),
|
||||
PeerSet::Collation,
|
||||
&mut shared.0.lock().collation_peers,
|
||||
c_messages,
|
||||
&metrics,
|
||||
);
|
||||
|
||||
for report in reports {
|
||||
@@ -666,6 +844,7 @@ where
|
||||
network_service,
|
||||
request_multiplexer,
|
||||
authority_discovery_service,
|
||||
metrics,
|
||||
sync_oracle,
|
||||
} = bridge;
|
||||
|
||||
@@ -676,6 +855,7 @@ where
|
||||
network_service.clone(),
|
||||
request_multiplexer,
|
||||
validation_worker_tx,
|
||||
metrics.clone(),
|
||||
shared.clone(),
|
||||
).remote_handle();
|
||||
|
||||
@@ -688,6 +868,7 @@ where
|
||||
validation_worker_rx,
|
||||
shared,
|
||||
sync_oracle,
|
||||
metrics,
|
||||
);
|
||||
|
||||
futures::pin_mut!(subsystem_event_handler);
|
||||
@@ -738,13 +919,14 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n
|
||||
)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(net, ctx, shared), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(net, ctx, shared, metrics), fields(subsystem = LOG_TARGET))]
|
||||
async fn update_our_view(
|
||||
net: &mut impl Network,
|
||||
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
|
||||
live_heads: &[ActivatedLeaf],
|
||||
shared: &Shared,
|
||||
finalized_number: BlockNumber,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<()> {
|
||||
let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
|
||||
|
||||
@@ -780,12 +962,14 @@ async fn update_our_view(
|
||||
net,
|
||||
validation_peers,
|
||||
WireMessage::ViewUpdate(new_view.clone()),
|
||||
metrics,
|
||||
).await?;
|
||||
|
||||
send_collation_message(
|
||||
net,
|
||||
collation_peers,
|
||||
WireMessage::ViewUpdate(new_view),
|
||||
metrics,
|
||||
).await?;
|
||||
|
||||
let our_view = OurView::new(
|
||||
@@ -808,11 +992,13 @@ async fn update_our_view(
|
||||
|
||||
// Handle messages on a specific peer-set. The peer is expected to be connected on that
|
||||
// peer-set.
|
||||
#[tracing::instrument(level = "trace", skip(peers, messages), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(peers, messages, metrics), fields(subsystem = LOG_TARGET))]
|
||||
fn handle_peer_messages<M>(
|
||||
peer: PeerId,
|
||||
peer_set: PeerSet,
|
||||
peers: &mut HashMap<PeerId, PeerData>,
|
||||
messages: Vec<WireMessage<M>>,
|
||||
messages: Vec<(WireMessage<M>, usize)>,
|
||||
metrics: &Metrics,
|
||||
) -> (Vec<NetworkBridgeEvent<M>>, Vec<Rep>) {
|
||||
let peer_data = match peers.get_mut(&peer) {
|
||||
None => {
|
||||
@@ -824,7 +1010,9 @@ fn handle_peer_messages<M>(
|
||||
let mut outgoing_messages = Vec::with_capacity(messages.len());
|
||||
let mut reports = Vec::new();
|
||||
|
||||
for message in messages {
|
||||
for (message, size_bytes) in messages {
|
||||
metrics.on_notification_received(peer_set, size_bytes);
|
||||
|
||||
outgoing_messages.push(match message {
|
||||
WireMessage::ViewUpdate(new_view) => {
|
||||
if new_view.len() > MAX_VIEW_HEADS ||
|
||||
@@ -855,30 +1043,32 @@ fn handle_peer_messages<M>(
|
||||
(outgoing_messages, reports)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(net, peers, metrics), fields(subsystem = LOG_TARGET))]
|
||||
async fn send_validation_message<I>(
|
||||
net: &mut impl Network,
|
||||
peers: I,
|
||||
message: WireMessage<protocol_v1::ValidationProtocol>,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<()>
|
||||
where
|
||||
I: IntoIterator<Item=PeerId>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
send_message(net, peers, PeerSet::Validation, message).await
|
||||
send_message(net, peers, PeerSet::Validation, message, metrics).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(net, peers, metrics), fields(subsystem = LOG_TARGET))]
|
||||
async fn send_collation_message<I>(
|
||||
net: &mut impl Network,
|
||||
peers: I,
|
||||
message: WireMessage<protocol_v1::CollationProtocol>,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<()>
|
||||
where
|
||||
I: IntoIterator<Item=PeerId>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
send_message(net, peers, PeerSet::Collation, message).await
|
||||
send_message(net, peers, PeerSet::Collation, message, metrics).await
|
||||
}
|
||||
|
||||
|
||||
@@ -1193,6 +1383,7 @@ mod tests {
|
||||
network_service: network,
|
||||
authority_discovery_service: discovery,
|
||||
request_multiplexer,
|
||||
metrics: Metrics(None),
|
||||
sync_oracle,
|
||||
};
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ pub(crate) async fn send_message<M, I>(
|
||||
peers: I,
|
||||
peer_set: PeerSet,
|
||||
message: M,
|
||||
metrics: &super::Metrics,
|
||||
) -> SubsystemResult<()>
|
||||
where
|
||||
M: Encode + Clone,
|
||||
@@ -59,7 +60,12 @@ where
|
||||
let mut message_producer = stream::iter({
|
||||
let peers = peers.into_iter();
|
||||
let n_peers = peers.len();
|
||||
let mut message = Some(message.encode());
|
||||
let mut message = {
|
||||
let encoded = message.encode();
|
||||
metrics.on_notification_sent(peer_set, encoded.len(), n_peers);
|
||||
|
||||
Some(encoded)
|
||||
};
|
||||
|
||||
peers.enumerate().map(move |(i, peer)| {
|
||||
// optimization: avoid cloning the message for the last peer in the
|
||||
|
||||
@@ -63,12 +63,12 @@ impl Metrics {
|
||||
}
|
||||
|
||||
/// Provide a timer for handling `ConnectionRequest` which observes on drop.
|
||||
fn time_handle_connection_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
fn time_handle_connection_request(&self) -> Option<prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.handle_connection_request.start_timer())
|
||||
}
|
||||
|
||||
/// Provide a timer for `process_msg` which observes on drop.
|
||||
fn time_process_msg(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -548,6 +548,7 @@ where
|
||||
authority_discovery,
|
||||
request_multiplexer,
|
||||
Box::new(network_service.clone()),
|
||||
Metrics::register(registry)?,
|
||||
),
|
||||
provisioner: ProvisionerSubsystem::new(
|
||||
spawner.clone(),
|
||||
|
||||
Reference in New Issue
Block a user