diff --git a/substrate/client/grafana-data-source/src/database.rs b/substrate/client/grafana-data-source/src/database.rs
new file mode 100644
index 0000000000..21c6ed5b0b
--- /dev/null
+++ b/substrate/client/grafana-data-source/src/database.rs
@@ -0,0 +1,170 @@
+// Copyright 2019 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use std::collections::HashMap;
+use std::convert::TryFrom;
+use crate::Error;
+
+pub struct Database {
+ base_timestamp: i64,
+ storage: HashMap>
+}
+
+impl Database {
+ /// Create a new Database.
+ pub fn new() -> Self {
+ Self {
+ base_timestamp: now_millis(),
+ storage: HashMap::new()
+ }
+ }
+
+ /// Produce an iterator for keys starting with a base string.
+ pub fn keys_starting_with<'a>(&'a self, base: &'a str) -> impl Iterator- + 'a {
+ self.storage.keys()
+ .filter(move |key| key.starts_with(base))
+ .cloned()
+ }
+
+ /// Select `max_datapoints` datapoints that have been added between `from` and `to`.
+ pub fn datapoints_between(&self, key: &str, from: i64, to: i64, max_datapoints: usize) -> Option> {
+ self.storage.get(key)
+ .map(|vec| {
+ let from = find_index(vec, self.base_timestamp, from);
+ let to = find_index(vec, self.base_timestamp, to);
+ let slice = &vec[from .. to];
+
+ if max_datapoints == 0 {
+ Vec::new()
+ } else if max_datapoints >= slice.len() {
+ // Just convert the slice as-is
+ slice.iter()
+ .map(|dp| dp.make_absolute(self.base_timestamp))
+ .collect()
+ } else {
+ // We have more datapoints than we need, so we need to skip some
+ (0 .. max_datapoints - 1)
+ .map(|i| &slice[i * slice.len() / (max_datapoints - 1)])
+ .chain(slice.last())
+ .map(|dp| dp.make_absolute(self.base_timestamp))
+ .collect()
+ }
+ })
+ }
+
+ /// Push a new datapoint. Will error if the base timestamp hasn't been updated in `2^32`
+ /// milliseconds (49 days).
+ pub fn push(&mut self, key: &str, value: f32) -> Result<(), Error> {
+ self.storage.entry(key.into())
+ .or_insert_with(Vec::new)
+ .push(Datapoint::new(self.base_timestamp, value)?);
+
+ Ok(())
+ }
+
+ /// Set a new base timestamp, and remove metrics older than this new timestamp. Errors if the
+ /// difference between timestamps is greater than `2^32` milliseconds (49 days).
+ pub fn truncate(&mut self, new_base_timestamp: i64) -> Result<(), Error> {
+ // Ensure that the new base is older.
+ if self.base_timestamp >= new_base_timestamp {
+ return Ok(());
+ }
+
+ // If the old base timestamp was too long ago, the
+ let delta = u32::try_from(new_base_timestamp - self.base_timestamp)
+ .map_err(Error::Timestamp)?;
+
+ for metric in self.storage.values_mut() {
+ // Find the index of the oldest allowed timestamp and cut out all those before it.
+ let index = find_index(&metric, self.base_timestamp, new_base_timestamp);
+
+ *metric = metric.iter_mut()
+ .skip(index)
+ .map(|dp| {
+ dp.delta_timestamp -= delta;
+ *dp
+ })
+ .collect();
+ }
+
+ self.base_timestamp = new_base_timestamp;
+
+ Ok(())
+ }
+}
+
+#[derive(Clone, Copy)]
+struct Datapoint {
+ delta_timestamp: u32,
+ value: f32
+}
+
+impl Datapoint {
+ fn new(base_timestamp: i64, value: f32) -> Result {
+ Ok(Self {
+ delta_timestamp: u32::try_from(now_millis() - base_timestamp)
+ .map_err(Error::Timestamp)?,
+ value
+ })
+ }
+
+ fn make_absolute(&self, base_timestamp: i64) -> (f32, i64) {
+ (self.value, base_timestamp + self.delta_timestamp as i64)
+ }
+}
+
+fn find_index(slice: &[Datapoint], base_timestamp: i64, timestamp: i64) -> usize {
+ slice.binary_search_by_key(×tamp, |datapoint| {
+ base_timestamp + datapoint.delta_timestamp as i64
+ }).unwrap_or_else(|index| index)
+}
+
+/// Get the current unix timestamp in milliseconds.
+fn now_millis() -> i64 {
+ chrono::Utc::now().timestamp_millis()
+}
+
+#[test]
+fn test() {
+ let mut database = Database::new();
+ let start = now_millis();
+
+ database.push("test", 1.0).unwrap();
+ database.push("test", 2.5).unwrap();
+ database.push("test", 2.0).unwrap();
+ database.push("test 2", 1.0).unwrap();
+
+ let mut keys: Vec<_> = database.keys_starting_with("test").collect();
+ keys.sort();
+
+ assert_eq!(keys, ["test", "test 2"]);
+ assert_eq!(database.keys_starting_with("test ").collect::>(), ["test 2"]);
+
+ assert_eq!(
+ database.datapoints_between("test", start - 1000, start + 1000, 4),
+ Some(vec![(1.0, start), (2.5, start), (2.0, start)])
+ );
+
+ assert_eq!(
+ database.datapoints_between("test", start - 1000, start + 1000, 3),
+ Some(vec![(1.0, start), (2.5, start), (2.0, start)])
+ );
+
+ assert_eq!(
+ database.datapoints_between("test", start - 1000, start + 1000, 2),
+ Some(vec![(1.0, start), (2.0, start)])
+ );
+}
diff --git a/substrate/client/grafana-data-source/src/lib.rs b/substrate/client/grafana-data-source/src/lib.rs
index 09822adb58..a1841178e9 100644
--- a/substrate/client/grafana-data-source/src/lib.rs
+++ b/substrate/client/grafana-data-source/src/lib.rs
@@ -23,36 +23,65 @@
//! [Grafana]: https://grafana.com/
//! [`grafana-json-data-source`]: https://github.com/simPod/grafana-json-datasource
+#![warn(missing_docs)]
+
use lazy_static::lazy_static;
-use std::collections::HashMap;
use parking_lot::RwLock;
mod types;
mod server;
-mod util;
#[cfg(not(target_os = "unknown"))]
mod networking;
+mod database;
+use database::Database;
pub use server::run_server;
-pub use util::now_millis;
-
-type Metrics = HashMap>;
+use std::num::TryFromIntError;
lazy_static! {
- /// The `RwLock` wrapping the metrics. Not intended to be used directly.
- #[doc(hidden)]
- pub static ref METRICS: RwLock = RwLock::new(Metrics::new());
+ // The `RwLock` wrapping the metrics database.
+ static ref DATABASE: RwLock = RwLock::new(Database::new());
}
/// Write metrics to `METRICS`.
#[macro_export]
macro_rules! record_metrics(
- ($($key:expr => $value:expr),*) => {
- use $crate::{METRICS, now_millis};
- let mut metrics = METRICS.write();
- let now = now_millis();
- $(
- metrics.entry($key).or_insert_with(Vec::new).push(($value as f32, now));
- )*
+ ($($key:expr => $value:expr,)*) => {
+ $crate::record_metrics_slice(&[
+ $( ($key, $value as f32), )*
+ ]);
}
);
+
+/// Write metrics to `METRICS` as a slice. Intended to be only used via `record_metrics!`.
+pub fn record_metrics_slice(metrics: &[(&str, f32)]) -> Result<(), Error> {
+ let mut database = crate::DATABASE.write();
+
+ for &(key, value) in metrics.iter() {
+ database.push(key, value)?;
+ }
+
+ Ok(())
+}
+
+/// Error type that can be returned by either `record_metrics` or `run_server`.
+#[derive(Debug, derive_more::Display, derive_more::From)]
+pub enum Error {
+ Hyper(hyper::Error),
+ Serde(serde_json::Error),
+ Http(hyper::http::Error),
+ Timestamp(TryFromIntError),
+ Io(std::io::Error)
+}
+
+impl std::error::Error for Error {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ Error::Hyper(error) => Some(error),
+ Error::Serde(error) => Some(error),
+ Error::Http(error) => Some(error),
+ Error::Timestamp(error) => Some(error),
+ Error::Io(error) => Some(error)
+ }
+ }
+}
diff --git a/substrate/client/grafana-data-source/src/server.rs b/substrate/client/grafana-data-source/src/server.rs
index cb1efd67bd..37717fa244 100644
--- a/substrate/client/grafana-data-source/src/server.rs
+++ b/substrate/client/grafana-data-source/src/server.rs
@@ -19,45 +19,31 @@ use hyper::{Body, Request, Response, header, service::{service_fn, make_service_
use chrono::{Duration, Utc};
use futures_util::{FutureExt, future::{Future, select, Either}};
use futures_timer::Delay;
-use crate::{METRICS, util, types::{Target, Query, TimeseriesData}};
-
-#[derive(Debug, derive_more::Display)]
-enum Error {
- Hyper(hyper::Error),
- Serde(serde_json::Error),
- Http(hyper::http::Error)
-}
-
-impl std::error::Error for Error {}
+use crate::{DATABASE, Error, types::{Target, Query, TimeseriesData, Range}};
async fn api_response(req: Request) -> Result, Error> {
match req.uri().path() {
"/search" => {
map_request_to_response(req, |target: Target| {
// Filter and return metrics relating to the target
- METRICS.read()
- .keys()
- .filter(|key| key.starts_with(&target.target))
- .cloned()
+ DATABASE.read()
+ .keys_starting_with(&target.target)
.collect::>()
}).await
},
"/query" => {
map_request_to_response(req, |query: Query| {
- let metrics = METRICS.read();
+ let metrics = DATABASE.read();
+
+ let Query {
+ range: Range { from, to },
+ max_datapoints, ..
+ } = query;
// Return timeseries data related to the specified metrics
query.targets.iter()
.map(|target| {
- let datapoints = metrics.get(target.target.as_str())
- .map(|metric| {
- let from = util::find_index(&metric, query.range.from);
- let to = util::find_index(&metric, query.range.to);
-
- // Avoid returning more than `max_datapoints` (mostly to stop
- // the web browser from having to do a ton of work)
- util::select_points(&metric[from .. to], query.max_datapoints)
- })
+ let datapoints = metrics.datapoints_between(&target.target, from, to, max_datapoints)
.unwrap_or_else(Vec::new);
TimeseriesData {
@@ -110,27 +96,9 @@ impl tokio_executor::TypedExecutor for Executor
}
}
-/// An error that may occur during server runtime.
-#[derive(Debug, derive_more::Display, derive_more::From)]
-pub enum RunError {
- /// Propagated hyper server error.
- Hyper(hyper::Error),
- /// Initial bind IO error.
- Io(std::io::Error),
-}
-
-impl std::error::Error for RunError {
- fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
- match *self {
- Self::Hyper(ref e) => Some(e),
- Self::Io(ref e) => Some(e),
- }
- }
-}
-
/// Start the data source server.
#[cfg(not(target_os = "unknown"))]
-pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), RunError> {
+pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), Error> {
use async_std::{net, io};
use crate::networking::Incoming;
@@ -172,33 +140,23 @@ pub async fn run_server(mut address: std::net::SocketAddr) -> Result<(), RunErro
let result = match select(server, clean).await {
Either::Left((result, _)) => result.map_err(Into::into),
- Either::Right(_) => Ok(())
+ Either::Right((result, _)) => result
};
result
}
#[cfg(target_os = "unknown")]
-pub async fn run_server(_: std::net::SocketAddr) -> Result<(), RunError> {
+pub async fn run_server(_: std::net::SocketAddr) -> Result<(), Error> {
Ok(())
}
/// Periodically remove old metrics.
-async fn clean_up(every: std::time::Duration, before: Duration) {
+async fn clean_up(every: std::time::Duration, before: Duration) -> Result<(), Error> {
loop {
Delay::new(every).await;
let oldest_allowed = (Utc::now() - before).timestamp_millis();
-
- let mut metrics = METRICS.write();
-
- for metric in metrics.values_mut() {
- // Find the index of the oldest allowed timestamp and cut out all those before it.
- let index = util::find_index(&metric, oldest_allowed);
-
- if index > 0 {
- *metric = metric[index..].to_vec();
- }
- }
+ DATABASE.write().truncate(oldest_allowed)?;
}
}
diff --git a/substrate/client/grafana-data-source/src/util.rs b/substrate/client/grafana-data-source/src/util.rs
deleted file mode 100644
index cd27c440f7..0000000000
--- a/substrate/client/grafana-data-source/src/util.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2019 Parity Technologies (UK) Ltd.
-// This file is part of Substrate.
-
-// Substrate is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Substrate is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Substrate. If not, see .
-
-/// Get the current unix timestamp in milliseconds.
-pub fn now_millis() -> i64 {
- chrono::Utc::now().timestamp_millis()
-}
-
-// find the index of a timestamp
-pub fn find_index(slice: &[(f32, i64)], timestamp: i64) -> usize {
- slice.binary_search_by_key(×tamp, |&(_, timestamp)| timestamp)
- .unwrap_or_else(|index| index)
-}
-
-// Evenly select up to `num_points` points from a slice
-pub fn select_points(slice: &[T], num_points: usize) -> Vec {
- if num_points == 0 {
- return Vec::new();
- } else if num_points >= slice.len() {
- return slice.to_owned();
- }
-
- (0 .. num_points - 1)
- .map(|i| slice[i * slice.len() / (num_points - 1)])
- .chain(slice.last().cloned())
- .collect()
-}
-
-#[test]
-fn test_select_points() {
- let array = [1, 2, 3, 4, 5];
- assert_eq!(select_points(&array, 0), Vec::::new());
- assert_eq!(select_points(&array, 1), vec![5]);
- assert_eq!(select_points(&array, 2), vec![1, 5]);
- assert_eq!(select_points(&array, 3), vec![1, 3, 5]);
- assert_eq!(select_points(&array, 4), vec![1, 2, 4, 5]);
- assert_eq!(select_points(&array, 5), vec![1, 2, 3, 4, 5]);
- assert_eq!(select_points(&array, 6), vec![1, 2, 3, 4, 5]);
-}
diff --git a/substrate/client/grafana-data-source/test/src/main.rs b/substrate/client/grafana-data-source/test/src/main.rs
index a4769e2bbb..a723e7e476 100644
--- a/substrate/client/grafana-data-source/test/src/main.rs
+++ b/substrate/client/grafana-data-source/test/src/main.rs
@@ -25,10 +25,14 @@ async fn randomness() {
let random = rand::thread_rng().gen_range(0.0, 1000.0);
- record_metrics!(
- "random data".to_owned() => random,
- "random^2".to_owned() => random * random
+ let result = record_metrics!(
+ "random data" => random,
+ "random^2" => random * random,
);
+
+ if let Err(error) = result {
+ eprintln!("{}", error);
+ }
}
}
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 0ca842e4bc..157af7d296 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -954,16 +954,16 @@ ServiceBuilder<
"bandwidth_upload" => bandwidth_upload,
"used_state_cache_size" => used_state_cache_size,
);
- record_metrics!(
- "peers".to_owned() => num_peers,
- "height".to_owned() => best_number,
- "txcount".to_owned() => txpool_status.ready,
- "cpu".to_owned() => cpu_usage,
- "memory".to_owned() => memory,
- "finalized_height".to_owned() => finalized_number,
- "bandwidth_download".to_owned() => bandwidth_download,
- "bandwidth_upload".to_owned() => bandwidth_upload,
- "used_state_cache_size".to_owned() => used_state_cache_size
+ let _ = record_metrics!(
+ "peers" => num_peers,
+ "height" => best_number,
+ "txcount" => txpool_status.ready,
+ "cpu" => cpu_usage,
+ "memory" => memory,
+ "finalized_height" => finalized_number,
+ "bandwidth_download" => bandwidth_download,
+ "bandwidth_upload" => bandwidth_upload,
+ "used_state_cache_size" => used_state_cache_size,
);
Ok(())
diff --git a/substrate/client/tracing/src/lib.rs b/substrate/client/tracing/src/lib.rs
index b87273bdaa..4be87bc2f7 100644
--- a/substrate/client/tracing/src/lib.rs
+++ b/substrate/client/tracing/src/lib.rs
@@ -293,5 +293,5 @@ fn send_telemetry(span_datum: SpanDatum) {
fn send_grafana(span_datum: SpanDatum) {
let name = format!("{}::{}", span_datum.target, span_datum.name);
- record_metrics!(name => span_datum.overall_time.as_nanos());
+ record_metrics!(&name => span_datum.overall_time.as_nanos(),);
}