Node Uptime (#196)

* fix: node stats updating live
* fix: Propagation time for first node to hit a block
* chore: Leaner feed serialization
* fix: Handle old nodes with stringified network_state
* feat: Add Node Uptime to the list
* chore: Remove old backend from test pipeline
This commit is contained in:
Maciej Hirsz
2019-11-09 12:16:39 +01:00
committed by GitHub
parent b69adbb096
commit 3e34720f66
20 changed files with 126 additions and 85 deletions
+1 -1
View File
@@ -177,7 +177,7 @@ impl Handler<Connect> for Aggregator {
connector.do_send(Connected(fid));
self.serializer.push(feed::Version(27));
self.serializer.push(feed::Version(28));
// TODO: keep track on number of nodes connected to each chain
for (_, entry) in self.chains.iter() {
+3 -16
View File
@@ -182,14 +182,7 @@ impl Handler<AddNode> for Chain {
if let Err(_) = msg.rec.do_send(Initialize(nid, ctx.address())) {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(
nid,
node.details(),
node.stats(),
node.hardware(),
node.block_details(),
node.location(),
));
self.serializer.push(feed::AddedNode(nid, node));
self.broadcast();
}
@@ -222,6 +215,7 @@ impl Handler<UpdateNode> for Chain {
self.update_average_block_time(now);
self.timestamp = Some(now);
self.serializer.push(feed::BestBlock(self.best.height, now, self.average_block_time));
propagation_time = Some(0);
} else if block.height == self.best.height {
if let Some(timestamp) = self.timestamp {
propagation_time = Some(now - timestamp);
@@ -333,14 +327,7 @@ impl Handler<Subscribe> for Chain {
self.serializer.push(feed::BestFinalized(self.finalized.height, self.finalized.hash));
for (nid, node) in self.nodes.iter() {
self.serializer.push(feed::AddedNode(
nid,
node.details(),
node.stats(),
node.hardware(),
node.block_details(),
node.location(),
));
self.serializer.push(feed::AddedNode(nid, node));
self.serializer.push(feed::FinalizedBlock(nid, node.finalized().height, node.finalized().hash));
if node.stale() {
self.serializer.push(feed::StaleNode(nid));
+24 -6
View File
@@ -1,8 +1,10 @@
use serde::Serialize;
use serde::ser::{Serializer, SerializeTuple};
use serde_json::to_writer;
use crate::node::Node;
use crate::types::{
NodeId, NodeDetails, NodeStats, NodeHardware, NodeLocation,
BlockNumber, BlockHash, BlockDetails, Timestamp,
NodeId, NodeStats, NodeHardware, BlockNumber, BlockHash, BlockDetails, Timestamp,
};
pub mod connector;
@@ -96,9 +98,7 @@ pub struct BestBlock(pub BlockNumber, pub Timestamp, pub Option<u64>);
#[derive(Serialize)]
pub struct BestFinalized(pub BlockNumber, pub BlockHash);
#[derive(Serialize)]
pub struct AddedNode<'a>(pub NodeId, pub &'a NodeDetails, pub &'a NodeStats, pub NodeHardware<'a>,
pub &'a BlockDetails, pub Option<&'a NodeLocation>);
pub struct AddedNode<'a>(pub NodeId, pub &'a Node);
#[derive(Serialize)]
pub struct RemovedNode(pub NodeId);
@@ -116,7 +116,7 @@ pub struct FinalizedBlock(pub NodeId, pub BlockNumber, pub BlockHash);
pub struct NodeStatsUpdate<'a>(pub NodeId, pub &'a NodeStats);
#[derive(Serialize)]
pub struct Hardware<'a>(pub NodeId, pub NodeHardware<'a>);
pub struct Hardware<'a>(pub NodeId, pub &'a NodeHardware);
#[derive(Serialize)]
pub struct TimeSync(pub u64);
@@ -138,3 +138,21 @@ pub struct Pong<'a>(pub &'a str);
#[derive(Serialize)]
pub struct StaleNode(pub NodeId);
impl Serialize for AddedNode<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let AddedNode(nid, node) = self;
let mut tup = serializer.serialize_tuple(7)?;
tup.serialize_element(nid)?;
tup.serialize_element(node.details())?;
tup.serialize_element(node.stats())?;
tup.serialize_element(node.hardware())?;
tup.serialize_element(node.block_details())?;
tup.serialize_element(&node.location())?;
tup.serialize_element(&node.connected_at())?;
tup.end()
}
}
+30 -36
View File
@@ -1,8 +1,8 @@
use bytes::Bytes;
use std::sync::Arc;
use crate::types::{NodeId, NodeDetails, NodeStats, NodeHardware, NodeLocation, BlockDetails, Block};
use crate::util::{MeanList, now};
use crate::types::{NodeId, NodeDetails, NodeStats, NodeHardware, NodeLocation, BlockDetails, Block, Timestamp};
use crate::util::now;
pub mod message;
pub mod connector;
@@ -25,27 +25,22 @@ pub struct Node {
finalized: Block,
/// Timer for throttling block updates
throttle: u64,
/// CPU use means
cpu: MeanList<f32>,
/// Memory use means
memory: MeanList<f32>,
/// Upload uses means
upload: MeanList<f64>,
/// Download uses means
download: MeanList<f64>,
/// Stampchange uses means
chart_stamps: MeanList<f64>,
/// Hardware stats over time
hardware: NodeHardware,
/// Physical location details
location: Option<Arc<NodeLocation>>,
/// Flag marking if the node is stale (not syncing or producing blocks)
stale: bool,
/// Connected at timestamp
connected_at: Timestamp,
/// Network state
pub network_state: Option<Bytes>,
network_state: Option<Bytes>,
}
impl Node {
pub fn new(details: NodeDetails) -> Self {
Node {
details,
stats: NodeStats {
txcount: 0,
@@ -59,13 +54,10 @@ impl Node {
},
finalized: Block::zero(),
throttle: 0,
cpu: MeanList::new(),
memory: MeanList::new(),
upload: MeanList::new(),
download: MeanList::new(),
chart_stamps: MeanList::new(),
hardware: NodeHardware::default(),
location: None,
stale: false,
connected_at: now(),
network_state: None,
}
}
@@ -86,14 +78,8 @@ impl Node {
&self.finalized
}
pub fn hardware(&self) -> NodeHardware {
(
self.memory.slice(),
self.cpu.slice(),
self.upload.slice(),
self.download.slice(),
self.chart_stamps.slice(),
)
pub fn hardware(&self) -> &NodeHardware {
&self.hardware
}
pub fn location(&self) -> Option<&NodeLocation> {
@@ -135,18 +121,18 @@ impl Node {
let mut changed = false;
if let Some(cpu) = interval.cpu {
changed |= self.cpu.push(cpu);
changed |= self.hardware.cpu.push(cpu);
}
if let Some(memory) = interval.memory {
changed |= self.memory.push(memory);
changed |= self.hardware.memory.push(memory);
}
if let Some(upload) = interval.bandwidth_upload {
changed |= self.upload.push(upload);
changed |= self.hardware.upload.push(upload);
}
if let Some(download) = interval.bandwidth_download {
changed |= self.download.push(download);
changed |= self.hardware.download.push(download);
}
self.chart_stamps.push(now() as f64);
self.hardware.chart_stamps.push(now() as f64);
changed
}
@@ -196,15 +182,23 @@ impl Node {
#[derive(Deserialize)]
struct Wrapper<'a> {
#[serde(borrow)]
state: Option<&'a RawValue>,
#[serde(borrow)]
network_state: Option<&'a RawValue>,
#[serde(alias = "network_state")]
state: &'a RawValue,
}
let raw = self.network_state.as_ref()?;
let wrap: Wrapper = serde_json::from_slice(raw).ok()?;
let state = wrap.state.or(wrap.network_state)?;
let json = wrap.state.get();
Some(state.get().into())
// Handle old nodes that exposed network_state as stringified JSON
if let Ok(stringified) = serde_json::from_str::<String>(json) {
Some(stringified.into())
} else {
Some(json.into())
}
}
pub fn connected_at(&self) -> Timestamp {
self.connected_at
}
}
+32 -4
View File
@@ -1,6 +1,8 @@
use serde::ser::{Serialize, Serializer, SerializeTuple};
use serde::Deserialize;
use crate::util::MeanList;
pub type NodeId = usize;
pub type BlockNumber = u64;
pub type Timestamp = u64;
@@ -36,7 +38,19 @@ pub struct BlockDetails {
pub propagation_time: Option<u64>,
}
pub type NodeHardware<'a> = (&'a [f32], &'a [f32], &'a [f64], &'a [f64], &'a [f64]);
#[derive(Default)]
pub struct NodeHardware {
/// CPU use means
pub cpu: MeanList<f32>,
/// Memory use means
pub memory: MeanList<f32>,
/// Upload uses means
pub upload: MeanList<f64>,
/// Download uses means
pub download: MeanList<f64>,
/// Stampchange uses means
pub chart_stamps: MeanList<f64>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct NodeLocation {
@@ -54,9 +68,8 @@ impl Serialize for NodeDetails {
tup.serialize_element(&self.name)?;
tup.serialize_element(&self.implementation)?;
tup.serialize_element(&self.version)?;
tup.serialize_element(&self.validator)?; // TODO Maybe<Address>
tup.serialize_element(&self.network_id)?; // TODO Maybe<NetworkId>
tup.serialize_element("")?; // TODO Address
tup.serialize_element(&self.validator)?;
tup.serialize_element(&self.network_id)?;
tup.end()
}
}
@@ -100,3 +113,18 @@ impl Serialize for NodeLocation {
tup.end()
}
}
impl Serialize for NodeHardware {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(5)?;
tup.serialize_element(self.memory.slice())?;
tup.serialize_element(self.cpu.slice())?;
tup.serialize_element(self.upload.slice())?;
tup.serialize_element(self.download.slice())?;
tup.serialize_element(self.chart_stamps.slice())?;
tup.end()
}
}
+7 -2
View File
@@ -9,8 +9,11 @@ pub struct MeanList<T> where T: Float + AddAssign + Zero + From<u8> {
ticks_per_mean: u8,
}
impl<T> MeanList<T> where T: Float + AddAssign + Zero + From<u8> {
pub fn new() -> MeanList<T> {
impl<T> Default for MeanList<T>
where
T: Float + AddAssign + Zero + From<u8>,
{
fn default() -> MeanList<T> {
MeanList {
period_sum: T::zero(),
period_count: 0,
@@ -19,7 +22,9 @@ impl<T> MeanList<T> where T: Float + AddAssign + Zero + From<u8> {
ticks_per_mean: 1,
}
}
}
impl<T> MeanList<T> where T: Float + AddAssign + Zero + From<u8> {
pub fn slice(&self) -> &[T] {
&self.means[..usize::from(self.mean_index)]
}