mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 19:41:05 +00:00
Use timestamp deltas in grafana-data-store (#4199)
* Optimize grafana storage slightly * Test on grafana test binary * Cast to f32 * Silence record_metrics warning * Add source for error * More changes * Fix record_metrics_slice * Test Database * Docs * Update client/grafana-data-source/src/lib.rs Co-Authored-By: Niklas Adolfsson <niklasadolfsson1@gmail.com> * Fix macro * Update server.rs
This commit is contained in:
@@ -0,0 +1,170 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use crate::Error;
|
||||
|
||||
pub struct Database {
|
||||
base_timestamp: i64,
|
||||
storage: HashMap<String, Vec<Datapoint>>
|
||||
}
|
||||
|
||||
impl Database {
|
||||
/// Create a new Database.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
base_timestamp: now_millis(),
|
||||
storage: HashMap::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Produce an iterator for keys starting with a base string.
|
||||
pub fn keys_starting_with<'a>(&'a self, base: &'a str) -> impl Iterator<Item = String> + 'a {
|
||||
self.storage.keys()
|
||||
.filter(move |key| key.starts_with(base))
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Select `max_datapoints` datapoints that have been added between `from` and `to`.
|
||||
pub fn datapoints_between(&self, key: &str, from: i64, to: i64, max_datapoints: usize) -> Option<Vec<(f32, i64)>> {
|
||||
self.storage.get(key)
|
||||
.map(|vec| {
|
||||
let from = find_index(vec, self.base_timestamp, from);
|
||||
let to = find_index(vec, self.base_timestamp, to);
|
||||
let slice = &vec[from .. to];
|
||||
|
||||
if max_datapoints == 0 {
|
||||
Vec::new()
|
||||
} else if max_datapoints >= slice.len() {
|
||||
// Just convert the slice as-is
|
||||
slice.iter()
|
||||
.map(|dp| dp.make_absolute(self.base_timestamp))
|
||||
.collect()
|
||||
} else {
|
||||
// We have more datapoints than we need, so we need to skip some
|
||||
(0 .. max_datapoints - 1)
|
||||
.map(|i| &slice[i * slice.len() / (max_datapoints - 1)])
|
||||
.chain(slice.last())
|
||||
.map(|dp| dp.make_absolute(self.base_timestamp))
|
||||
.collect()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Push a new datapoint. Will error if the base timestamp hasn't been updated in `2^32`
|
||||
/// milliseconds (49 days).
|
||||
pub fn push(&mut self, key: &str, value: f32) -> Result<(), Error> {
|
||||
self.storage.entry(key.into())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Datapoint::new(self.base_timestamp, value)?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set a new base timestamp, and remove metrics older than this new timestamp. Errors if the
|
||||
/// difference between timestamps is greater than `2^32` milliseconds (49 days).
|
||||
pub fn truncate(&mut self, new_base_timestamp: i64) -> Result<(), Error> {
|
||||
// Ensure that the new base is older.
|
||||
if self.base_timestamp >= new_base_timestamp {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// If the old base timestamp was too long ago, the
|
||||
let delta = u32::try_from(new_base_timestamp - self.base_timestamp)
|
||||
.map_err(Error::Timestamp)?;
|
||||
|
||||
for metric in self.storage.values_mut() {
|
||||
// Find the index of the oldest allowed timestamp and cut out all those before it.
|
||||
let index = find_index(&metric, self.base_timestamp, new_base_timestamp);
|
||||
|
||||
*metric = metric.iter_mut()
|
||||
.skip(index)
|
||||
.map(|dp| {
|
||||
dp.delta_timestamp -= delta;
|
||||
*dp
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
|
||||
self.base_timestamp = new_base_timestamp;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct Datapoint {
|
||||
delta_timestamp: u32,
|
||||
value: f32
|
||||
}
|
||||
|
||||
impl Datapoint {
|
||||
fn new(base_timestamp: i64, value: f32) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
delta_timestamp: u32::try_from(now_millis() - base_timestamp)
|
||||
.map_err(Error::Timestamp)?,
|
||||
value
|
||||
})
|
||||
}
|
||||
|
||||
fn make_absolute(&self, base_timestamp: i64) -> (f32, i64) {
|
||||
(self.value, base_timestamp + self.delta_timestamp as i64)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_index(slice: &[Datapoint], base_timestamp: i64, timestamp: i64) -> usize {
|
||||
slice.binary_search_by_key(×tamp, |datapoint| {
|
||||
base_timestamp + datapoint.delta_timestamp as i64
|
||||
}).unwrap_or_else(|index| index)
|
||||
}
|
||||
|
||||
/// Get the current unix timestamp in milliseconds.
|
||||
fn now_millis() -> i64 {
|
||||
chrono::Utc::now().timestamp_millis()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let mut database = Database::new();
|
||||
let start = now_millis();
|
||||
|
||||
database.push("test", 1.0).unwrap();
|
||||
database.push("test", 2.5).unwrap();
|
||||
database.push("test", 2.0).unwrap();
|
||||
database.push("test 2", 1.0).unwrap();
|
||||
|
||||
let mut keys: Vec<_> = database.keys_starting_with("test").collect();
|
||||
keys.sort();
|
||||
|
||||
assert_eq!(keys, ["test", "test 2"]);
|
||||
assert_eq!(database.keys_starting_with("test ").collect::<Vec<_>>(), ["test 2"]);
|
||||
|
||||
assert_eq!(
|
||||
database.datapoints_between("test", start - 1000, start + 1000, 4),
|
||||
Some(vec![(1.0, start), (2.5, start), (2.0, start)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
database.datapoints_between("test", start - 1000, start + 1000, 3),
|
||||
Some(vec![(1.0, start), (2.5, start), (2.0, start)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
database.datapoints_between("test", start - 1000, start + 1000, 2),
|
||||
Some(vec![(1.0, start), (2.0, start)])
|
||||
);
|
||||
}
|
||||
@@ -23,36 +23,65 @@
|
||||
//! [Grafana]: https://grafana.com/
|
||||
//! [`grafana-json-data-source`]: https://github.com/simPod/grafana-json-datasource
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use std::collections::HashMap;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
mod types;
|
||||
mod server;
|
||||
mod util;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mod networking;
|
||||
mod database;
|
||||
|
||||
use database::Database;
|
||||
pub use server::run_server;
|
||||
pub use util::now_millis;
|
||||
|
||||
type Metrics = HashMap<String, Vec<(f32, i64)>>;
|
||||
use std::num::TryFromIntError;
|
||||
|
||||
lazy_static! {
|
||||
/// The `RwLock` wrapping the metrics. Not intended to be used directly.
|
||||
#[doc(hidden)]
|
||||
pub static ref METRICS: RwLock<Metrics> = RwLock::new(Metrics::new());
|
||||
// The `RwLock` wrapping the metrics database.
|
||||
static ref DATABASE: RwLock<Database> = RwLock::new(Database::new());
|
||||
}
|
||||
|
||||
/// Write metrics to `METRICS`.
|
||||
#[macro_export]
|
||||
macro_rules! record_metrics(
|
||||
($($key:expr => $value:expr),*) => {
|
||||
use $crate::{METRICS, now_millis};
|
||||
let mut metrics = METRICS.write();
|
||||
let now = now_millis();
|
||||
$(
|
||||
metrics.entry($key).or_insert_with(Vec::new).push(($value as f32, now));
|
||||
)*
|
||||
($($key:expr => $value:expr,)*) => {
|
||||
$crate::record_metrics_slice(&[
|
||||
$( ($key, $value as f32), )*
|
||||
]);
|
||||
}
|
||||
);
|
||||
|
||||
/// Write metrics to `METRICS` as a slice. Intended to be only used via `record_metrics!`.
|
||||
pub fn record_metrics_slice(metrics: &[(&str, f32)]) -> Result<(), Error> {
|
||||
let mut database = crate::DATABASE.write();
|
||||
|
||||
for &(key, value) in metrics.iter() {
|
||||
database.push(key, value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Error type that can be returned by either `record_metrics` or `run_server`.
|
||||
#[derive(Debug, derive_more::Display, derive_more::From)]
|
||||
pub enum Error {
|
||||
Hyper(hyper::Error),
|
||||
Serde(serde_json::Error),
|
||||
Http(hyper::http::Error),
|
||||
Timestamp(TryFromIntError),
|
||||
Io(std::io::Error)
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
match self {
|
||||
Error::Hyper(error) => Some(error),
|
||||
Error::Serde(error) => Some(error),
|
||||
Error::Http(error) => Some(error),
|
||||
Error::Timestamp(error) => Some(error),
|
||||
Error::Io(error) => Some(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,45 +19,31 @@ use hyper::{Body, Request, Response, header, service::{service_fn, make_service_
|
||||
use chrono::{Duration, Utc};
|
||||
use futures_util::{FutureExt, future::{Future, select, Either}};
|
||||
use futures_timer::Delay;
|
||||
use crate::{METRICS, util, types::{Target, Query, TimeseriesData}};
|
||||
|
||||
#[derive(Debug, derive_more::Display)]
|
||||
enum Error {
|
||||
Hyper(hyper::Error),
|
||||
Serde(serde_json::Error),
|
||||
Http(hyper::http::Error)
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {}
|
||||
use crate::{DATABASE, Error, types::{Target, Query, TimeseriesData, Range}};
|
||||
|
||||
async fn api_response(req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
match req.uri().path() {
|
||||
"/search" => {
|
||||
map_request_to_response(req, |target: Target| {
|
||||
// Filter and return metrics relating to the target
|
||||
METRICS.read()
|
||||
.keys()
|
||||
.filter(|key| key.starts_with(&target.target))
|
||||
.cloned()
|
||||
DATABASE.read()
|
||||
.keys_starting_with(&target.target)
|
||||
.collect::<Vec<_>>()
|
||||
}).await
|
||||
},
|
||||
"/query" => {
|
||||
map_request_to_response(req, |query: Query| {
|
||||
let metrics = METRICS.read();
|
||||
let metrics = DATABASE.read();
|
||||
|
||||
let Query {
|
||||
range: Range { from, to },
|
||||
max_datapoints, ..
|
||||
} = query;
|
||||
|
||||
// Return timeseries data related to the specified metrics
|
||||
query.targets.iter()
|
||||
.map(|target| {
|
||||
let datapoints = metrics.get(target.target.as_str())
|
||||
.map(|metric| {
|
||||
let from = util::find_index(&metric, query.range.from);
|
||||
let to = util::find_index(&metric, query.range.to);
|
||||
|
||||
// Avoid returning more than `max_datapoints` (mostly to stop
|
||||
// the web browser from having to do a ton of work)
|
||||
util::select_points(&metric[from .. to], query.max_datapoints)
|
||||
})
|
||||
let datapoints = metrics.datapoints_between(&target.target, from, to, max_datapoints)
|
||||
.unwrap_or_else(Vec::new);
|
||||
|
||||
TimeseriesData {
|
||||
@@ -110,27 +96,9 @@ impl<T> tokio_executor::TypedExecutor<T> for Executor
|
||||
}
|
||||
}
|
||||
|
||||
/// An error that may occur during server runtime.
|
||||
#[derive(Debug, derive_more::Display, derive_more::From)]
|
||||
pub enum RunError {
|
||||
/// Propagated hyper server error.
|
||||
Hyper(hyper::Error),
|
||||
/// Initial bind IO error.
|
||||
Io(std::io::Error),
|
||||
}
|
||||
|
||||
impl std::error::Error for RunError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
match *self {
|
||||
Self::Hyper(ref e) => Some(e),
|
||||
Self::Io(ref e) => Some(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the data source server.
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), RunError> {
|
||||
pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), Error> {
|
||||
use async_std::{net, io};
|
||||
use crate::networking::Incoming;
|
||||
|
||||
@@ -172,33 +140,23 @@ pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), RunErro
|
||||
|
||||
let result = match select(server, clean).await {
|
||||
Either::Left((result, _)) => result.map_err(Into::into),
|
||||
Either::Right(_) => Ok(())
|
||||
Either::Right((result, _)) => result
|
||||
};
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
#[cfg(target_os = "unknown")]
|
||||
pub async fn run_server(_: std::net::SocketAddr) -> Result<(), RunError> {
|
||||
pub async fn run_server(_: std::net::SocketAddr) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Periodically remove old metrics.
|
||||
async fn clean_up(every: std::time::Duration, before: Duration) {
|
||||
async fn clean_up(every: std::time::Duration, before: Duration) -> Result<(), Error> {
|
||||
loop {
|
||||
Delay::new(every).await;
|
||||
|
||||
let oldest_allowed = (Utc::now() - before).timestamp_millis();
|
||||
|
||||
let mut metrics = METRICS.write();
|
||||
|
||||
for metric in metrics.values_mut() {
|
||||
// Find the index of the oldest allowed timestamp and cut out all those before it.
|
||||
let index = util::find_index(&metric, oldest_allowed);
|
||||
|
||||
if index > 0 {
|
||||
*metric = metric[index..].to_vec();
|
||||
}
|
||||
}
|
||||
DATABASE.write().truncate(oldest_allowed)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
/// Get the current unix timestamp in milliseconds.
|
||||
pub fn now_millis() -> i64 {
|
||||
chrono::Utc::now().timestamp_millis()
|
||||
}
|
||||
|
||||
// find the index of a timestamp
|
||||
pub fn find_index(slice: &[(f32, i64)], timestamp: i64) -> usize {
|
||||
slice.binary_search_by_key(×tamp, |&(_, timestamp)| timestamp)
|
||||
.unwrap_or_else(|index| index)
|
||||
}
|
||||
|
||||
// Evenly select up to `num_points` points from a slice
|
||||
pub fn select_points<T: Copy>(slice: &[T], num_points: usize) -> Vec<T> {
|
||||
if num_points == 0 {
|
||||
return Vec::new();
|
||||
} else if num_points >= slice.len() {
|
||||
return slice.to_owned();
|
||||
}
|
||||
|
||||
(0 .. num_points - 1)
|
||||
.map(|i| slice[i * slice.len() / (num_points - 1)])
|
||||
.chain(slice.last().cloned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select_points() {
|
||||
let array = [1, 2, 3, 4, 5];
|
||||
assert_eq!(select_points(&array, 0), Vec::<u8>::new());
|
||||
assert_eq!(select_points(&array, 1), vec![5]);
|
||||
assert_eq!(select_points(&array, 2), vec![1, 5]);
|
||||
assert_eq!(select_points(&array, 3), vec![1, 3, 5]);
|
||||
assert_eq!(select_points(&array, 4), vec![1, 2, 4, 5]);
|
||||
assert_eq!(select_points(&array, 5), vec![1, 2, 3, 4, 5]);
|
||||
assert_eq!(select_points(&array, 6), vec![1, 2, 3, 4, 5]);
|
||||
}
|
||||
@@ -25,10 +25,14 @@ async fn randomness() {
|
||||
|
||||
let random = rand::thread_rng().gen_range(0.0, 1000.0);
|
||||
|
||||
record_metrics!(
|
||||
"random data".to_owned() => random,
|
||||
"random^2".to_owned() => random * random
|
||||
let result = record_metrics!(
|
||||
"random data" => random,
|
||||
"random^2" => random * random,
|
||||
);
|
||||
|
||||
if let Err(error) = result {
|
||||
eprintln!("{}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -954,16 +954,16 @@ ServiceBuilder<
|
||||
"bandwidth_upload" => bandwidth_upload,
|
||||
"used_state_cache_size" => used_state_cache_size,
|
||||
);
|
||||
record_metrics!(
|
||||
"peers".to_owned() => num_peers,
|
||||
"height".to_owned() => best_number,
|
||||
"txcount".to_owned() => txpool_status.ready,
|
||||
"cpu".to_owned() => cpu_usage,
|
||||
"memory".to_owned() => memory,
|
||||
"finalized_height".to_owned() => finalized_number,
|
||||
"bandwidth_download".to_owned() => bandwidth_download,
|
||||
"bandwidth_upload".to_owned() => bandwidth_upload,
|
||||
"used_state_cache_size".to_owned() => used_state_cache_size
|
||||
let _ = record_metrics!(
|
||||
"peers" => num_peers,
|
||||
"height" => best_number,
|
||||
"txcount" => txpool_status.ready,
|
||||
"cpu" => cpu_usage,
|
||||
"memory" => memory,
|
||||
"finalized_height" => finalized_number,
|
||||
"bandwidth_download" => bandwidth_download,
|
||||
"bandwidth_upload" => bandwidth_upload,
|
||||
"used_state_cache_size" => used_state_cache_size,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -293,5 +293,5 @@ fn send_telemetry(span_datum: SpanDatum) {
|
||||
|
||||
fn send_grafana(span_datum: SpanDatum) {
|
||||
let name = format!("{}::{}", span_datum.target, span_datum.name);
|
||||
record_metrics!(name => span_datum.overall_time.as_nanos());
|
||||
record_metrics!(&name => span_datum.overall_time.as_nanos(),);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user