mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 00:57:57 +00:00
Add more prometheus metrics to network::Protocol. (#5145)
This commit is contained in:
@@ -38,6 +38,7 @@ use sp_arithmetic::traits::SaturatedConversion;
|
||||
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
|
||||
use message::generic::Message as GenericMessage;
|
||||
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
|
||||
use prometheus_endpoint::{Registry, Gauge, register, PrometheusError, U64};
|
||||
use sync::{ChainSync, SyncState};
|
||||
use crate::service::{TransactionPool, ExHashT};
|
||||
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
|
||||
@@ -135,6 +136,105 @@ mod rep {
|
||||
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
|
||||
}
|
||||
|
||||
struct Metrics {
|
||||
handshaking_peers: Gauge<U64>,
|
||||
obsolete_requests: Gauge<U64>,
|
||||
peers: Gauge<U64>,
|
||||
queued_blocks: Gauge<U64>,
|
||||
fork_targets: Gauge<U64>,
|
||||
finality_proofs_pending: Gauge<U64>,
|
||||
finality_proofs_active: Gauge<U64>,
|
||||
finality_proofs_failed: Gauge<U64>,
|
||||
finality_proofs_importing: Gauge<U64>,
|
||||
justifications_pending: Gauge<U64>,
|
||||
justifications_active: Gauge<U64>,
|
||||
justifications_failed: Gauge<U64>,
|
||||
justifications_importing: Gauge<U64>
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
fn register(r: &Registry) -> Result<Self, PrometheusError> {
|
||||
Ok(Metrics {
|
||||
handshaking_peers: {
|
||||
let g = Gauge::new("sync_handshaking_peers", "number of newly connected peers")?;
|
||||
register(g, r)?
|
||||
},
|
||||
obsolete_requests: {
|
||||
let g = Gauge::new("sync_obsolete_requests", "total number of obsolete requests")?;
|
||||
register(g, r)?
|
||||
},
|
||||
peers: {
|
||||
let g = Gauge::new("sync_peers", "number of peers we sync with")?;
|
||||
register(g, r)?
|
||||
},
|
||||
queued_blocks: {
|
||||
let g = Gauge::new("sync_queued_blocks", "number of blocks in import queue")?;
|
||||
register(g, r)?
|
||||
},
|
||||
fork_targets: {
|
||||
let g = Gauge::new("sync_fork_targets", "fork sync targets")?;
|
||||
register(g, r)?
|
||||
},
|
||||
justifications_pending: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_justifications_pending",
|
||||
"number of pending extra justifications requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
justifications_active: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_justifications_active",
|
||||
"number of active extra justifications requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
justifications_failed: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_justifications_failed",
|
||||
"number of failed extra justifications requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
justifications_importing: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_justifications_importing",
|
||||
"number of importing extra justifications requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
finality_proofs_pending: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_finality_proofs_pending",
|
||||
"number of pending extra finality proof requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
finality_proofs_active: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_finality_proofs_active",
|
||||
"number of active extra finality proof requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
finality_proofs_failed: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_finality_proofs_failed",
|
||||
"number of failed extra finality proof requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
finality_proofs_importing: {
|
||||
let g = Gauge::new(
|
||||
"sync_extra_finality_proofs_importing",
|
||||
"number of importing extra finality proof requests"
|
||||
)?;
|
||||
register(g, r)?
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Lock must always be taken in order declared here.
|
||||
pub struct Protocol<B: BlockT, H: ExHashT> {
|
||||
/// Interval at which we call `tick`.
|
||||
@@ -163,6 +263,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
|
||||
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
|
||||
/// For each protocol name, the legacy gossiping engine ID.
|
||||
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
|
||||
/// Prometheus metrics.
|
||||
metrics: Option<Metrics>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -371,7 +473,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
|
||||
protocol_id: ProtocolId,
|
||||
peerset_config: sc_peerset::PeersetConfig,
|
||||
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
|
||||
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
|
||||
metrics_registry: Option<&Registry>
|
||||
) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
|
||||
let info = chain.info();
|
||||
let sync = ChainSync::new(
|
||||
@@ -416,6 +519,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
behaviour,
|
||||
protocol_name_by_engine: HashMap::new(),
|
||||
protocol_engine_by_name: HashMap::new(),
|
||||
metrics: if let Some(r) = metrics_registry {
|
||||
Some(Metrics::register(r)?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
Ok((protocol, peerset_handle))
|
||||
@@ -859,6 +967,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
});
|
||||
self.report_metrics()
|
||||
}
|
||||
|
||||
fn maintain_peers(&mut self) {
|
||||
@@ -1767,6 +1876,40 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
fn report_metrics(&self) {
|
||||
use std::convert::TryInto;
|
||||
|
||||
if let Some(metrics) = &self.metrics {
|
||||
let mut obsolete_requests: u64 = 0;
|
||||
for peer in self.context_data.peers.values() {
|
||||
let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX);
|
||||
obsolete_requests = obsolete_requests.saturating_add(n);
|
||||
}
|
||||
metrics.obsolete_requests.set(obsolete_requests);
|
||||
|
||||
let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX);
|
||||
metrics.handshaking_peers.set(n);
|
||||
|
||||
let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
|
||||
metrics.peers.set(n);
|
||||
|
||||
let m = self.sync.metrics();
|
||||
|
||||
metrics.fork_targets.set(m.fork_targets.into());
|
||||
metrics.queued_blocks.set(m.queued_blocks.into());
|
||||
|
||||
metrics.justifications_pending.set(m.justifications.pending_requests.into());
|
||||
metrics.justifications_active.set(m.justifications.active_requests.into());
|
||||
metrics.justifications_failed.set(m.justifications.failed_requests.into());
|
||||
metrics.justifications_importing.set(m.justifications.importing_requests.into());
|
||||
|
||||
metrics.finality_proofs_pending.set(m.finality_proofs.pending_requests.into());
|
||||
metrics.finality_proofs_active.set(m.finality_proofs.active_requests.into());
|
||||
metrics.finality_proofs_failed.set(m.finality_proofs.failed_requests.into());
|
||||
metrics.finality_proofs_importing.set(m.finality_proofs.importing_requests.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of an incoming custom message.
|
||||
|
||||
@@ -1202,6 +1202,27 @@ impl<B: BlockT> ChainSync<B> {
|
||||
fn is_already_downloading(&self, hash: &B::Hash) -> bool {
|
||||
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
|
||||
}
|
||||
|
||||
/// Return some key metrics.
|
||||
pub(crate) fn metrics(&self) -> Metrics {
|
||||
use std::convert::TryInto;
|
||||
Metrics {
|
||||
queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX),
|
||||
fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX),
|
||||
finality_proofs: self.extra_finality_proofs.metrics(),
|
||||
justifications: self.extra_justifications.metrics(),
|
||||
_priv: ()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Metrics {
|
||||
pub(crate) queued_blocks: u32,
|
||||
pub(crate) fork_targets: u32,
|
||||
pub(crate) finality_proofs: extra_requests::Metrics,
|
||||
pub(crate) justifications: extra_requests::Metrics,
|
||||
_priv: ()
|
||||
}
|
||||
|
||||
/// Request the ancestry for a block. Sends a request for header and justification for the given
|
||||
|
||||
@@ -53,6 +53,15 @@ pub(crate) struct ExtraRequests<B: BlockT> {
|
||||
request_type_name: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Metrics {
|
||||
pub(crate) pending_requests: u32,
|
||||
pub(crate) active_requests: u32,
|
||||
pub(crate) importing_requests: u32,
|
||||
pub(crate) failed_requests: u32,
|
||||
_priv: ()
|
||||
}
|
||||
|
||||
impl<B: BlockT> ExtraRequests<B> {
|
||||
pub(crate) fn new(request_type_name: &'static str) -> Self {
|
||||
ExtraRequests {
|
||||
@@ -240,6 +249,18 @@ impl<B: BlockT> ExtraRequests<B> {
|
||||
pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
|
||||
self.pending_requests.iter()
|
||||
}
|
||||
|
||||
/// Get some key metrics.
|
||||
pub(crate) fn metrics(&self) -> Metrics {
|
||||
use std::convert::TryInto;
|
||||
Metrics {
|
||||
pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX),
|
||||
active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX),
|
||||
failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX),
|
||||
importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX),
|
||||
_priv: ()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Matches peers with pending extra requests.
|
||||
|
||||
@@ -210,7 +210,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
params.finality_proof_request_builder,
|
||||
params.protocol_id.clone(),
|
||||
peerset_config,
|
||||
params.block_announce_validator
|
||||
params.block_announce_validator,
|
||||
params.metrics_registry.as_ref()
|
||||
)?;
|
||||
|
||||
// Build the swarm.
|
||||
@@ -858,7 +859,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
|
||||
};
|
||||
|
||||
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
|
||||
|
||||
|
||||
if let Some(metrics) = this.metrics.as_ref() {
|
||||
metrics.is_major_syncing.set(is_major_syncing as u64);
|
||||
metrics.peers_count.set(num_connected_peers as u64);
|
||||
|
||||
Reference in New Issue
Block a user