Merge commit '114f487fd9daef4b4cd791446372a9a690c137ac' into update-bridges-subtree-r/w

This commit is contained in:
antonio-dropulic
2021-12-01 16:34:30 +01:00
183 changed files with 1017 additions and 21238 deletions
+21 -17
View File
@@ -46,28 +46,38 @@ pub struct MetricsParams {
/// Interface and TCP port to be used when exposing Prometheus metrics.
pub address: Option<MetricsAddress>,
/// Metrics registry. May be `Some(_)` if several components share the same endpoint.
pub registry: Option<Registry>,
/// Prefix that must be used in metric names.
pub metrics_prefix: Option<String>,
pub registry: Registry,
}
/// Metrics API.
pub trait Metrics: Clone + Send + Sync + 'static {}
/// Metric API.
pub trait Metric: Clone + Send + Sync + 'static {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError>;
}
impl<T: Clone + Send + Sync + 'static> Metrics for T {}
/// Standalone metrics API.
/// Standalone metric API.
///
/// Metrics of this kind know how to update themselves, so we may just spawn and forget the
/// asynchronous self-update task.
#[async_trait]
pub trait StandaloneMetrics: Metrics {
pub trait StandaloneMetric: Metric {
/// Update metric values.
async fn update(&self);
/// Metrics update interval.
fn update_interval(&self) -> Duration;
/// Register and spawn metric. Metric is only spawned if it is registered for the first time.
fn register_and_spawn(self, registry: &Registry) -> Result<(), PrometheusError> {
match self.register(registry) {
Ok(()) => {
self.spawn();
Ok(())
},
Err(PrometheusError::AlreadyReg) => Ok(()),
Err(e) => Err(e),
}
}
/// Spawn the self update task that will keep update metric value at given intervals.
fn spawn(self) {
async_std::task::spawn(async move {
@@ -89,7 +99,7 @@ impl Default for MetricsAddress {
impl MetricsParams {
/// Creates metrics params so that metrics are not exposed.
pub fn disabled() -> Self {
MetricsParams { address: None, registry: None, metrics_prefix: None }
MetricsParams { address: None, registry: Registry::new() }
}
/// Do not expose metrics.
@@ -97,17 +107,11 @@ impl MetricsParams {
self.address = None;
self
}
/// Set prefix to use in metric names.
pub fn metrics_prefix(mut self, prefix: String) -> Self {
self.metrics_prefix = Some(prefix);
self
}
}
impl From<Option<MetricsAddress>> for MetricsParams {
fn from(address: Option<MetricsAddress>) -> Self {
MetricsParams { address, registry: None, metrics_prefix: None }
MetricsParams { address, registry: Registry::new() }
}
}
@@ -17,8 +17,8 @@
use crate::{
error::{self, Error},
metrics::{
metric_name, register, F64SharedRef, Gauge, PrometheusError, Registry, StandaloneMetrics,
F64,
metric_name, register, F64SharedRef, Gauge, Metric, PrometheusError, Registry,
StandaloneMetric, F64,
},
};
@@ -44,8 +44,6 @@ pub struct FloatJsonValueMetric {
impl FloatJsonValueMetric {
/// Create new metric instance with given name and help.
pub fn new(
registry: &Registry,
prefix: Option<&str>,
url: String,
json_path: String,
name: String,
@@ -55,7 +53,7 @@ impl FloatJsonValueMetric {
Ok(FloatJsonValueMetric {
url,
json_path,
metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
metric: Gauge::new(metric_name(None, &name), help)?,
shared_value_ref,
})
}
@@ -81,8 +79,14 @@ impl FloatJsonValueMetric {
}
}
impl Metric for FloatJsonValueMetric {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
register(self.metric.clone(), registry).map(drop)
}
}
#[async_trait]
impl StandaloneMetrics for FloatJsonValueMetric {
impl StandaloneMetric for FloatJsonValueMetric {
fn update_interval(&self) -> Duration {
UPDATE_INTERVAL
}
@@ -17,8 +17,8 @@
//! Global system-wide Prometheus metrics exposed by relays.
use crate::metrics::{
metric_name, register, Gauge, GaugeVec, Opts, PrometheusError, Registry, StandaloneMetrics,
F64, U64,
metric_name, register, Gauge, GaugeVec, Metric, Opts, PrometheusError, Registry,
StandaloneMetric, F64, U64,
};
use async_std::sync::{Arc, Mutex};
@@ -40,36 +40,36 @@ pub struct GlobalMetrics {
impl GlobalMetrics {
/// Create and register global metrics.
pub fn new(registry: &Registry, prefix: Option<&str>) -> Result<Self, PrometheusError> {
pub fn new() -> Result<Self, PrometheusError> {
Ok(GlobalMetrics {
system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
system_average_load: register(
GaugeVec::new(
Opts::new(metric_name(prefix, "system_average_load"), "System load average"),
&["over"],
)?,
registry,
system_average_load: GaugeVec::new(
Opts::new(metric_name(None, "system_average_load"), "System load average"),
&["over"],
)?,
process_cpu_usage_percentage: register(
Gauge::new(
metric_name(prefix, "process_cpu_usage_percentage"),
"Process CPU usage",
)?,
registry,
process_cpu_usage_percentage: Gauge::new(
metric_name(None, "process_cpu_usage_percentage"),
"Process CPU usage",
)?,
process_memory_usage_bytes: register(
Gauge::new(
metric_name(prefix, "process_memory_usage_bytes"),
"Process memory (resident set size) usage",
)?,
registry,
process_memory_usage_bytes: Gauge::new(
metric_name(None, "process_memory_usage_bytes"),
"Process memory (resident set size) usage",
)?,
})
}
}
impl Metric for GlobalMetrics {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
register(self.system_average_load.clone(), registry)?;
register(self.process_cpu_usage_percentage.clone(), registry)?;
register(self.process_memory_usage_bytes.clone(), registry)?;
Ok(())
}
}
#[async_trait]
impl StandaloneMetrics for GlobalMetrics {
impl StandaloneMetric for GlobalMetrics {
async fn update(&self) {
// update system-wide metrics
let mut system = self.system.lock().await;
+10 -50
View File
@@ -16,7 +16,7 @@
use crate::{
error::Error,
metrics::{Metrics, MetricsAddress, MetricsParams, PrometheusError, StandaloneMetrics},
metrics::{Metric, MetricsAddress, MetricsParams},
FailedClient, MaybeConnectionError,
};
@@ -53,7 +53,7 @@ pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC,
/// Returns generic relay loop metrics that may be customized and used in one or several relay
/// loops.
pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetrics<(), (), ()> {
pub fn relay_metrics(params: MetricsParams) -> LoopMetrics<(), (), ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
@@ -62,8 +62,7 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
loop_metric: None,
},
address: params.address,
registry: params.registry.unwrap_or_else(|| create_metrics_registry(prefix)),
metrics_prefix: params.metrics_prefix,
registry: params.registry,
loop_metric: None,
}
}
@@ -81,7 +80,6 @@ pub struct LoopMetrics<SC, TC, LM> {
relay_loop: Loop<SC, TC, ()>,
address: Option<MetricsAddress>,
registry: Registry,
metrics_prefix: Option<String>,
loop_metric: Option<LM>,
}
@@ -93,11 +91,7 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
}
/// Start building loop metrics using given prefix.
pub fn with_metrics(
self,
prefix: Option<String>,
params: MetricsParams,
) -> LoopMetrics<SC, TC, ()> {
pub fn with_metrics(self, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: self.reconnect_delay,
@@ -106,8 +100,7 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
loop_metric: None,
},
address: params.address,
registry: params.registry.unwrap_or_else(|| create_metrics_registry(prefix)),
metrics_prefix: params.metrics_prefix,
registry: params.registry,
loop_metric: None,
}
}
@@ -160,44 +153,23 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
/// Add relay loop metrics.
///
/// Loop metrics will be passed to the loop callback.
pub fn loop_metric<NewLM: Metrics>(
pub fn loop_metric<NewLM: Metric>(
self,
create_metric: impl FnOnce(&Registry, Option<&str>) -> Result<NewLM, PrometheusError>,
metric: NewLM,
) -> Result<LoopMetrics<SC, TC, NewLM>, Error> {
let loop_metric = create_metric(&self.registry, self.metrics_prefix.as_deref())?;
metric.register(&self.registry)?;
Ok(LoopMetrics {
relay_loop: self.relay_loop,
address: self.address,
registry: self.registry,
metrics_prefix: self.metrics_prefix,
loop_metric: Some(loop_metric),
loop_metric: Some(metric),
})
}
/// Add standalone metrics.
pub fn standalone_metric<M: StandaloneMetrics>(
self,
create_metric: impl FnOnce(&Registry, Option<&str>) -> Result<M, PrometheusError>,
) -> Result<Self, Error> {
// since standalone metrics are updating themselves, we may just ignore the fact that the
// same standalone metric is exposed by several loops && only spawn single metric
match create_metric(&self.registry, self.metrics_prefix.as_deref()) {
Ok(standalone_metrics) => standalone_metrics.spawn(),
Err(PrometheusError::AlreadyReg) => (),
Err(e) => return Err(e.into()),
}
Ok(self)
}
/// Convert into `MetricsParams` structure so that metrics registry may be extended later.
pub fn into_params(self) -> MetricsParams {
MetricsParams {
address: self.address,
registry: Some(self.registry),
metrics_prefix: self.metrics_prefix,
}
MetricsParams { address: self.address, registry: self.registry }
}
/// Expose metrics using address passed at creation.
@@ -274,15 +246,3 @@ pub async fn reconnect_failed_client(
break
}
}
/// Create new registry with global metrics.
fn create_metrics_registry(prefix: Option<String>) -> Registry {
match prefix {
Some(prefix) => {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed")
},
None => Registry::new(),
}
}