From 925b23a3cd134263568c78118f1485bf13057515 Mon Sep 17 00:00:00 2001 From: Ashley Date: Wed, 4 Dec 2019 18:49:12 +0100 Subject: [PATCH] Use timestamp deltas in grafana-data-store (#4199) * Optimize grafana storage slightly * Test on grafana test binary * Cast to f32 * Silence record_metrics warning * Add source for error * More changes * Fix record_metrics_slice * Test Database * Docs * Update client/grafana-data-source/src/lib.rs Co-Authored-By: Niklas Adolfsson * Fix macro * Update server.rs --- .../grafana-data-source/src/database.rs | 170 ++++++++++++++++++ .../client/grafana-data-source/src/lib.rs | 59 ++++-- .../client/grafana-data-source/src/server.rs | 72 ++------ .../client/grafana-data-source/src/util.rs | 52 ------ .../grafana-data-source/test/src/main.rs | 10 +- substrate/client/service/src/builder.rs | 20 +-- substrate/client/tracing/src/lib.rs | 2 +- 7 files changed, 247 insertions(+), 138 deletions(-) create mode 100644 substrate/client/grafana-data-source/src/database.rs delete mode 100644 substrate/client/grafana-data-source/src/util.rs diff --git a/substrate/client/grafana-data-source/src/database.rs b/substrate/client/grafana-data-source/src/database.rs new file mode 100644 index 0000000000..21c6ed5b0b --- /dev/null +++ b/substrate/client/grafana-data-source/src/database.rs @@ -0,0 +1,170 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::collections::HashMap; +use std::convert::TryFrom; +use crate::Error; + +pub struct Database { + base_timestamp: i64, + storage: HashMap> +} + +impl Database { + /// Create a new Database. + pub fn new() -> Self { + Self { + base_timestamp: now_millis(), + storage: HashMap::new() + } + } + + /// Produce an iterator for keys starting with a base string. + pub fn keys_starting_with<'a>(&'a self, base: &'a str) -> impl Iterator + 'a { + self.storage.keys() + .filter(move |key| key.starts_with(base)) + .cloned() + } + + /// Select `max_datapoints` datapoints that have been added between `from` and `to`. + pub fn datapoints_between(&self, key: &str, from: i64, to: i64, max_datapoints: usize) -> Option> { + self.storage.get(key) + .map(|vec| { + let from = find_index(vec, self.base_timestamp, from); + let to = find_index(vec, self.base_timestamp, to); + let slice = &vec[from .. to]; + + if max_datapoints == 0 { + Vec::new() + } else if max_datapoints >= slice.len() { + // Just convert the slice as-is + slice.iter() + .map(|dp| dp.make_absolute(self.base_timestamp)) + .collect() + } else { + // We have more datapoints than we need, so we need to skip some + (0 .. max_datapoints - 1) + .map(|i| &slice[i * slice.len() / (max_datapoints - 1)]) + .chain(slice.last()) + .map(|dp| dp.make_absolute(self.base_timestamp)) + .collect() + } + }) + } + + /// Push a new datapoint. Will error if the base timestamp hasn't been updated in `2^32` + /// milliseconds (49 days). + pub fn push(&mut self, key: &str, value: f32) -> Result<(), Error> { + self.storage.entry(key.into()) + .or_insert_with(Vec::new) + .push(Datapoint::new(self.base_timestamp, value)?); + + Ok(()) + } + + /// Set a new base timestamp, and remove metrics older than this new timestamp. Errors if the + /// difference between timestamps is greater than `2^32` milliseconds (49 days). + pub fn truncate(&mut self, new_base_timestamp: i64) -> Result<(), Error> { + // Ensure that the new base is older. + if self.base_timestamp >= new_base_timestamp { + return Ok(()); + } + + // If the old base timestamp was too long ago, the + let delta = u32::try_from(new_base_timestamp - self.base_timestamp) + .map_err(Error::Timestamp)?; + + for metric in self.storage.values_mut() { + // Find the index of the oldest allowed timestamp and cut out all those before it. + let index = find_index(&metric, self.base_timestamp, new_base_timestamp); + + *metric = metric.iter_mut() + .skip(index) + .map(|dp| { + dp.delta_timestamp -= delta; + *dp + }) + .collect(); + } + + self.base_timestamp = new_base_timestamp; + + Ok(()) + } +} + +#[derive(Clone, Copy)] +struct Datapoint { + delta_timestamp: u32, + value: f32 +} + +impl Datapoint { + fn new(base_timestamp: i64, value: f32) -> Result { + Ok(Self { + delta_timestamp: u32::try_from(now_millis() - base_timestamp) + .map_err(Error::Timestamp)?, + value + }) + } + + fn make_absolute(&self, base_timestamp: i64) -> (f32, i64) { + (self.value, base_timestamp + self.delta_timestamp as i64) + } +} + +fn find_index(slice: &[Datapoint], base_timestamp: i64, timestamp: i64) -> usize { + slice.binary_search_by_key(×tamp, |datapoint| { + base_timestamp + datapoint.delta_timestamp as i64 + }).unwrap_or_else(|index| index) +} + +/// Get the current unix timestamp in milliseconds. +fn now_millis() -> i64 { + chrono::Utc::now().timestamp_millis() +} + +#[test] +fn test() { + let mut database = Database::new(); + let start = now_millis(); + + database.push("test", 1.0).unwrap(); + database.push("test", 2.5).unwrap(); + database.push("test", 2.0).unwrap(); + database.push("test 2", 1.0).unwrap(); + + let mut keys: Vec<_> = database.keys_starting_with("test").collect(); + keys.sort(); + + assert_eq!(keys, ["test", "test 2"]); + assert_eq!(database.keys_starting_with("test ").collect::>(), ["test 2"]); + + assert_eq!( + database.datapoints_between("test", start - 1000, start + 1000, 4), + Some(vec![(1.0, start), (2.5, start), (2.0, start)]) + ); + + assert_eq!( + database.datapoints_between("test", start - 1000, start + 1000, 3), + Some(vec![(1.0, start), (2.5, start), (2.0, start)]) + ); + + assert_eq!( + database.datapoints_between("test", start - 1000, start + 1000, 2), + Some(vec![(1.0, start), (2.0, start)]) + ); +} diff --git a/substrate/client/grafana-data-source/src/lib.rs b/substrate/client/grafana-data-source/src/lib.rs index 09822adb58..a1841178e9 100644 --- a/substrate/client/grafana-data-source/src/lib.rs +++ b/substrate/client/grafana-data-source/src/lib.rs @@ -23,36 +23,65 @@ //! [Grafana]: https://grafana.com/ //! [`grafana-json-data-source`]: https://github.com/simPod/grafana-json-datasource +#![warn(missing_docs)] + use lazy_static::lazy_static; -use std::collections::HashMap; use parking_lot::RwLock; mod types; mod server; -mod util; #[cfg(not(target_os = "unknown"))] mod networking; +mod database; +use database::Database; pub use server::run_server; -pub use util::now_millis; - -type Metrics = HashMap>; +use std::num::TryFromIntError; lazy_static! { - /// The `RwLock` wrapping the metrics. Not intended to be used directly. - #[doc(hidden)] - pub static ref METRICS: RwLock = RwLock::new(Metrics::new()); + // The `RwLock` wrapping the metrics database. + static ref DATABASE: RwLock = RwLock::new(Database::new()); } /// Write metrics to `METRICS`. #[macro_export] macro_rules! record_metrics( - ($($key:expr => $value:expr),*) => { - use $crate::{METRICS, now_millis}; - let mut metrics = METRICS.write(); - let now = now_millis(); - $( - metrics.entry($key).or_insert_with(Vec::new).push(($value as f32, now)); - )* + ($($key:expr => $value:expr,)*) => { + $crate::record_metrics_slice(&[ + $( ($key, $value as f32), )* + ]); } ); + +/// Write metrics to `METRICS` as a slice. Intended to be only used via `record_metrics!`. +pub fn record_metrics_slice(metrics: &[(&str, f32)]) -> Result<(), Error> { + let mut database = crate::DATABASE.write(); + + for &(key, value) in metrics.iter() { + database.push(key, value)?; + } + + Ok(()) +} + +/// Error type that can be returned by either `record_metrics` or `run_server`. +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { + Hyper(hyper::Error), + Serde(serde_json::Error), + Http(hyper::http::Error), + Timestamp(TryFromIntError), + Io(std::io::Error) +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Error::Hyper(error) => Some(error), + Error::Serde(error) => Some(error), + Error::Http(error) => Some(error), + Error::Timestamp(error) => Some(error), + Error::Io(error) => Some(error) + } + } +} diff --git a/substrate/client/grafana-data-source/src/server.rs b/substrate/client/grafana-data-source/src/server.rs index cb1efd67bd..37717fa244 100644 --- a/substrate/client/grafana-data-source/src/server.rs +++ b/substrate/client/grafana-data-source/src/server.rs @@ -19,45 +19,31 @@ use hyper::{Body, Request, Response, header, service::{service_fn, make_service_ use chrono::{Duration, Utc}; use futures_util::{FutureExt, future::{Future, select, Either}}; use futures_timer::Delay; -use crate::{METRICS, util, types::{Target, Query, TimeseriesData}}; - -#[derive(Debug, derive_more::Display)] -enum Error { - Hyper(hyper::Error), - Serde(serde_json::Error), - Http(hyper::http::Error) -} - -impl std::error::Error for Error {} +use crate::{DATABASE, Error, types::{Target, Query, TimeseriesData, Range}}; async fn api_response(req: Request) -> Result, Error> { match req.uri().path() { "/search" => { map_request_to_response(req, |target: Target| { // Filter and return metrics relating to the target - METRICS.read() - .keys() - .filter(|key| key.starts_with(&target.target)) - .cloned() + DATABASE.read() + .keys_starting_with(&target.target) .collect::>() }).await }, "/query" => { map_request_to_response(req, |query: Query| { - let metrics = METRICS.read(); + let metrics = DATABASE.read(); + + let Query { + range: Range { from, to }, + max_datapoints, .. + } = query; // Return timeseries data related to the specified metrics query.targets.iter() .map(|target| { - let datapoints = metrics.get(target.target.as_str()) - .map(|metric| { - let from = util::find_index(&metric, query.range.from); - let to = util::find_index(&metric, query.range.to); - - // Avoid returning more than `max_datapoints` (mostly to stop - // the web browser from having to do a ton of work) - util::select_points(&metric[from .. to], query.max_datapoints) - }) + let datapoints = metrics.datapoints_between(&target.target, from, to, max_datapoints) .unwrap_or_else(Vec::new); TimeseriesData { @@ -110,27 +96,9 @@ impl tokio_executor::TypedExecutor for Executor } } -/// An error that may occur during server runtime. -#[derive(Debug, derive_more::Display, derive_more::From)] -pub enum RunError { - /// Propagated hyper server error. - Hyper(hyper::Error), - /// Initial bind IO error. - Io(std::io::Error), -} - -impl std::error::Error for RunError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match *self { - Self::Hyper(ref e) => Some(e), - Self::Io(ref e) => Some(e), - } - } -} - /// Start the data source server. #[cfg(not(target_os = "unknown"))] -pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), RunError> { +pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), Error> { use async_std::{net, io}; use crate::networking::Incoming; @@ -172,33 +140,23 @@ pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), RunErro let result = match select(server, clean).await { Either::Left((result, _)) => result.map_err(Into::into), - Either::Right(_) => Ok(()) + Either::Right((result, _)) => result }; result } #[cfg(target_os = "unknown")] -pub async fn run_server(_: std::net::SocketAddr) -> Result<(), RunError> { +pub async fn run_server(_: std::net::SocketAddr) -> Result<(), Error> { Ok(()) } /// Periodically remove old metrics. -async fn clean_up(every: std::time::Duration, before: Duration) { +async fn clean_up(every: std::time::Duration, before: Duration) -> Result<(), Error> { loop { Delay::new(every).await; let oldest_allowed = (Utc::now() - before).timestamp_millis(); - - let mut metrics = METRICS.write(); - - for metric in metrics.values_mut() { - // Find the index of the oldest allowed timestamp and cut out all those before it. - let index = util::find_index(&metric, oldest_allowed); - - if index > 0 { - *metric = metric[index..].to_vec(); - } - } + DATABASE.write().truncate(oldest_allowed)?; } } diff --git a/substrate/client/grafana-data-source/src/util.rs b/substrate/client/grafana-data-source/src/util.rs deleted file mode 100644 index cd27c440f7..0000000000 --- a/substrate/client/grafana-data-source/src/util.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -/// Get the current unix timestamp in milliseconds. -pub fn now_millis() -> i64 { - chrono::Utc::now().timestamp_millis() -} - -// find the index of a timestamp -pub fn find_index(slice: &[(f32, i64)], timestamp: i64) -> usize { - slice.binary_search_by_key(×tamp, |&(_, timestamp)| timestamp) - .unwrap_or_else(|index| index) -} - -// Evenly select up to `num_points` points from a slice -pub fn select_points(slice: &[T], num_points: usize) -> Vec { - if num_points == 0 { - return Vec::new(); - } else if num_points >= slice.len() { - return slice.to_owned(); - } - - (0 .. num_points - 1) - .map(|i| slice[i * slice.len() / (num_points - 1)]) - .chain(slice.last().cloned()) - .collect() -} - -#[test] -fn test_select_points() { - let array = [1, 2, 3, 4, 5]; - assert_eq!(select_points(&array, 0), Vec::::new()); - assert_eq!(select_points(&array, 1), vec![5]); - assert_eq!(select_points(&array, 2), vec![1, 5]); - assert_eq!(select_points(&array, 3), vec![1, 3, 5]); - assert_eq!(select_points(&array, 4), vec![1, 2, 4, 5]); - assert_eq!(select_points(&array, 5), vec![1, 2, 3, 4, 5]); - assert_eq!(select_points(&array, 6), vec![1, 2, 3, 4, 5]); -} diff --git a/substrate/client/grafana-data-source/test/src/main.rs b/substrate/client/grafana-data-source/test/src/main.rs index a4769e2bbb..a723e7e476 100644 --- a/substrate/client/grafana-data-source/test/src/main.rs +++ b/substrate/client/grafana-data-source/test/src/main.rs @@ -25,10 +25,14 @@ async fn randomness() { let random = rand::thread_rng().gen_range(0.0, 1000.0); - record_metrics!( - "random data".to_owned() => random, - "random^2".to_owned() => random * random + let result = record_metrics!( + "random data" => random, + "random^2" => random * random, ); + + if let Err(error) = result { + eprintln!("{}", error); + } } } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 0ca842e4bc..157af7d296 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -954,16 +954,16 @@ ServiceBuilder< "bandwidth_upload" => bandwidth_upload, "used_state_cache_size" => used_state_cache_size, ); - record_metrics!( - "peers".to_owned() => num_peers, - "height".to_owned() => best_number, - "txcount".to_owned() => txpool_status.ready, - "cpu".to_owned() => cpu_usage, - "memory".to_owned() => memory, - "finalized_height".to_owned() => finalized_number, - "bandwidth_download".to_owned() => bandwidth_download, - "bandwidth_upload".to_owned() => bandwidth_upload, - "used_state_cache_size".to_owned() => used_state_cache_size + let _ = record_metrics!( + "peers" => num_peers, + "height" => best_number, + "txcount" => txpool_status.ready, + "cpu" => cpu_usage, + "memory" => memory, + "finalized_height" => finalized_number, + "bandwidth_download" => bandwidth_download, + "bandwidth_upload" => bandwidth_upload, + "used_state_cache_size" => used_state_cache_size, ); Ok(()) diff --git a/substrate/client/tracing/src/lib.rs b/substrate/client/tracing/src/lib.rs index b87273bdaa..4be87bc2f7 100644 --- a/substrate/client/tracing/src/lib.rs +++ b/substrate/client/tracing/src/lib.rs @@ -293,5 +293,5 @@ fn send_telemetry(span_datum: SpanDatum) { fn send_grafana(span_datum: SpanDatum) { let name = format!("{}::{}", span_datum.target, span_datum.name); - record_metrics!(name => span_datum.overall_time.as_nanos()); + record_metrics!(&name => span_datum.overall_time.as_nanos(),); }