mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-12 21:31:01 +00:00
Revert "Preparing backend to receive data from shards (#337)"
This reverts commit f8b7128dca.
This commit is contained in:
@@ -1,398 +0,0 @@
|
||||
use actix::prelude::*;
|
||||
use actix_web_actors::ws::{CloseCode, CloseReason};
|
||||
use ctor::ctor;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::chain::{self, Chain, ChainId, GetNodeNetworkState, Label};
|
||||
use crate::feed::connector::{Connected, FeedConnector, FeedId};
|
||||
use crate::feed::{self, FeedMessageSerializer};
|
||||
use crate::node::connector::{Mute, NodeConnector};
|
||||
use crate::types::{ConnId, NodeDetails, NodeId};
|
||||
use crate::util::{DenseMap, Hash};
|
||||
|
||||
pub struct Aggregator {
|
||||
genesis_hashes: HashMap<Hash, ChainId>,
|
||||
labels: HashMap<Label, ChainId>,
|
||||
chains: DenseMap<ChainEntry>,
|
||||
feeds: DenseMap<Addr<FeedConnector>>,
|
||||
serializer: FeedMessageSerializer,
|
||||
/// Denylist for networks we do not want to allow connecting.
|
||||
denylist: HashSet<String>,
|
||||
}
|
||||
|
||||
pub struct ChainEntry {
|
||||
/// Address to the `Chain` agent
|
||||
addr: Addr<Chain>,
|
||||
/// Genesis [`Hash`] of the chain
|
||||
genesis_hash: Hash,
|
||||
/// String name of the chain
|
||||
label: Label,
|
||||
/// Node count
|
||||
nodes: usize,
|
||||
/// Maximum allowed nodes
|
||||
max_nodes: usize,
|
||||
}
|
||||
|
||||
#[ctor]
|
||||
/// Labels of chains we consider "first party". These chains allow any
|
||||
/// number of nodes to connect.
|
||||
static FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
|
||||
let mut set = HashSet::new();
|
||||
set.insert("Polkadot");
|
||||
set.insert("Kusama");
|
||||
set.insert("Westend");
|
||||
set.insert("Rococo");
|
||||
set
|
||||
};
|
||||
|
||||
/// Max number of nodes allowed to connect to the telemetry server.
|
||||
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
|
||||
|
||||
impl Aggregator {
|
||||
pub fn new(denylist: HashSet<String>) -> Self {
|
||||
Aggregator {
|
||||
genesis_hashes: HashMap::new(),
|
||||
labels: HashMap::new(),
|
||||
chains: DenseMap::new(),
|
||||
feeds: DenseMap::new(),
|
||||
serializer: FeedMessageSerializer::new(),
|
||||
denylist,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an address to the chain actor by name. If the address is not found,
|
||||
/// or the address is disconnected (actor dropped), create a new one.
|
||||
pub fn lazy_chain(
|
||||
&mut self,
|
||||
genesis_hash: Hash,
|
||||
label: &str,
|
||||
ctx: &mut <Self as Actor>::Context,
|
||||
) -> ChainId {
|
||||
let cid = match self.genesis_hashes.get(&genesis_hash).copied() {
|
||||
Some(cid) => cid,
|
||||
None => {
|
||||
self.serializer.push(feed::AddedChain(&label, 1));
|
||||
|
||||
let addr = ctx.address();
|
||||
let max_nodes = max_nodes(label);
|
||||
let label: Label = label.into();
|
||||
let cid = self.chains.add_with(|cid| ChainEntry {
|
||||
addr: Chain::new(cid, addr, label.clone()).start(),
|
||||
genesis_hash,
|
||||
label: label.clone(),
|
||||
nodes: 1,
|
||||
max_nodes,
|
||||
});
|
||||
|
||||
self.labels.insert(label, cid);
|
||||
self.genesis_hashes.insert(genesis_hash, cid);
|
||||
|
||||
self.broadcast();
|
||||
|
||||
cid
|
||||
}
|
||||
};
|
||||
|
||||
cid
|
||||
}
|
||||
|
||||
fn get_chain(&mut self, label: &str) -> Option<&mut ChainEntry> {
|
||||
let chains = &mut self.chains;
|
||||
self.labels
|
||||
.get(label)
|
||||
.and_then(move |&cid| chains.get_mut(cid))
|
||||
}
|
||||
|
||||
fn broadcast(&mut self) {
|
||||
if let Some(msg) = self.serializer.finalize() {
|
||||
for (_, feed) in self.feeds.iter() {
|
||||
feed.do_send(msg.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Aggregator {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
/// Message sent from the NodeConnector to the Aggregator upon getting all node details
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct AddNode {
|
||||
/// Details of the node being added to the aggregator
|
||||
pub node: NodeDetails,
|
||||
/// Genesis [`Hash`] of the chain the node is being added to.
|
||||
pub genesis_hash: Hash,
|
||||
/// Connection id used by the node connector for multiplexing parachains
|
||||
pub conn_id: ConnId,
|
||||
/// Address of the NodeConnector actor
|
||||
pub node_connector: Addr<NodeConnector>,
|
||||
}
|
||||
|
||||
/// Message sent from the Chain to the Aggregator when the Chain loses all nodes
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct DropChain(pub ChainId);
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct RenameChain(pub ChainId, pub Label);
|
||||
|
||||
/// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "bool")]
|
||||
pub struct Subscribe {
|
||||
pub chain: Label,
|
||||
pub feed: Addr<FeedConnector>,
|
||||
}
|
||||
|
||||
/// Message sent from the FeedConnector to the Aggregator consensus requested
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct SendFinality {
|
||||
pub chain: Label,
|
||||
pub fid: FeedId,
|
||||
}
|
||||
|
||||
/// Message sent from the FeedConnector to the Aggregator no more consensus required
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct NoMoreFinality {
|
||||
pub chain: Label,
|
||||
pub fid: FeedId,
|
||||
}
|
||||
|
||||
/// Message sent from the FeedConnector to the Aggregator when first connected
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Connect(pub Addr<FeedConnector>);
|
||||
|
||||
/// Message sent from the FeedConnector to the Aggregator when disconnecting
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Disconnect(pub FeedId);
|
||||
|
||||
/// Message sent from the Chain to the Aggergator when the node count on the chain changes
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct NodeCount(pub ChainId, pub usize);
|
||||
|
||||
/// Message sent to the Aggregator to get the network state of a particular node
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Option<Request<Chain, GetNodeNetworkState>>")]
|
||||
pub struct GetNetworkState(pub Box<str>, pub NodeId);
|
||||
|
||||
/// Message sent to the Aggregator to get a health check
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "usize")]
|
||||
pub struct GetHealth;
|
||||
|
||||
impl Handler<AddNode> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||
if self.denylist.contains(&*msg.node.chain) {
|
||||
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
|
||||
let AddNode { node_connector, .. } = msg;
|
||||
let reason = CloseReason {
|
||||
code: CloseCode::Abnormal,
|
||||
description: Some("Denied".into()),
|
||||
};
|
||||
node_connector.do_send(Mute { reason });
|
||||
return;
|
||||
}
|
||||
let AddNode {
|
||||
node,
|
||||
genesis_hash,
|
||||
conn_id,
|
||||
node_connector,
|
||||
} = msg;
|
||||
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
|
||||
|
||||
let cid = self.lazy_chain(genesis_hash, &node.chain, ctx);
|
||||
let chain = self
|
||||
.chains
|
||||
.get_mut(cid)
|
||||
.expect("Entry just created above; qed");
|
||||
if chain.nodes < chain.max_nodes {
|
||||
chain.addr.do_send(chain::AddNode {
|
||||
node,
|
||||
conn_id,
|
||||
node_connector,
|
||||
});
|
||||
} else {
|
||||
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
|
||||
let reason = CloseReason {
|
||||
code: CloseCode::Again,
|
||||
description: Some("Overquota".into()),
|
||||
};
|
||||
node_connector.do_send(Mute { reason });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<DropChain> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: DropChain, _: &mut Self::Context) {
|
||||
let DropChain(cid) = msg;
|
||||
|
||||
if let Some(entry) = self.chains.remove(cid) {
|
||||
let label = &entry.label;
|
||||
self.genesis_hashes.remove(&entry.genesis_hash);
|
||||
self.labels.remove(label);
|
||||
self.serializer.push(feed::RemovedChain(label));
|
||||
log::info!("Dropped chain [{}] from the aggregator", label);
|
||||
self.broadcast();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<RenameChain> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: RenameChain, _: &mut Self::Context) {
|
||||
let RenameChain(cid, new) = msg;
|
||||
|
||||
if let Some(entry) = self.chains.get_mut(cid) {
|
||||
if entry.label == new {
|
||||
return;
|
||||
}
|
||||
|
||||
// Update UI
|
||||
self.serializer.push(feed::RemovedChain(&entry.label));
|
||||
self.serializer.push(feed::AddedChain(&new, entry.nodes));
|
||||
|
||||
// Update labels -> cid map
|
||||
self.labels.remove(&entry.label);
|
||||
self.labels.insert(new.clone(), cid);
|
||||
|
||||
// Update entry
|
||||
entry.label = new;
|
||||
|
||||
self.broadcast();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Subscribe> for Aggregator {
|
||||
type Result = bool;
|
||||
|
||||
fn handle(&mut self, msg: Subscribe, _: &mut Self::Context) -> bool {
|
||||
let Subscribe { chain, feed } = msg;
|
||||
|
||||
if let Some(chain) = self.get_chain(&chain) {
|
||||
chain.addr.do_send(chain::Subscribe(feed));
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<SendFinality> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: SendFinality, _: &mut Self::Context) {
|
||||
let SendFinality { chain, fid } = msg;
|
||||
if let Some(chain) = self.get_chain(&chain) {
|
||||
chain.addr.do_send(chain::SendFinality(fid));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<NoMoreFinality> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: NoMoreFinality, _: &mut Self::Context) {
|
||||
let NoMoreFinality { chain, fid } = msg;
|
||||
if let Some(chain) = self.get_chain(&chain) {
|
||||
chain.addr.do_send(chain::NoMoreFinality(fid));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Connect> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Connect, _: &mut Self::Context) {
|
||||
let Connect(connector) = msg;
|
||||
|
||||
let fid = self.feeds.add(connector.clone());
|
||||
|
||||
log::info!("Feed #{} connected", fid);
|
||||
|
||||
connector.do_send(Connected(fid));
|
||||
|
||||
self.serializer.push(feed::Version(31));
|
||||
|
||||
// TODO: keep track on number of nodes connected to each chain
|
||||
for (_, entry) in self.chains.iter() {
|
||||
self.serializer
|
||||
.push(feed::AddedChain(&entry.label, entry.nodes));
|
||||
}
|
||||
|
||||
if let Some(msg) = self.serializer.finalize() {
|
||||
connector.do_send(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Disconnect> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) {
|
||||
let Disconnect(fid) = msg;
|
||||
|
||||
log::info!("Feed #{} disconnected", fid);
|
||||
|
||||
self.feeds.remove(fid);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<NodeCount> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: NodeCount, _: &mut Self::Context) {
|
||||
let NodeCount(cid, count) = msg;
|
||||
|
||||
if let Some(entry) = self.chains.get_mut(cid) {
|
||||
entry.nodes = count;
|
||||
|
||||
if count != 0 {
|
||||
self.serializer.push(feed::AddedChain(&entry.label, count));
|
||||
self.broadcast();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<GetNetworkState> for Aggregator {
|
||||
type Result = <GetNetworkState as Message>::Result;
|
||||
|
||||
fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result {
|
||||
let GetNetworkState(chain, nid) = msg;
|
||||
|
||||
Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid)))
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<GetHealth> for Aggregator {
|
||||
type Result = usize;
|
||||
|
||||
fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result {
|
||||
self.chains.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
|
||||
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
|
||||
/// no more.
|
||||
fn max_nodes(label: &str) -> usize {
|
||||
if FIRST_PARTY_NETWORKS.contains(label) {
|
||||
usize::MAX
|
||||
} else {
|
||||
THIRD_PARTY_NETWORKS_MAX_NODES
|
||||
}
|
||||
}
|
||||
@@ -1,593 +0,0 @@
|
||||
use actix::prelude::*;
|
||||
use bytes::Bytes;
|
||||
use rustc_hash::FxHashMap;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::aggregator::{Aggregator, DropChain, NodeCount, RenameChain};
|
||||
use crate::feed::connector::{FeedConnector, FeedId, Subscribed, Unsubscribed};
|
||||
use crate::feed::{self, FeedMessageSerializer};
|
||||
use crate::node::{
|
||||
connector::{Initialize, NodeConnector},
|
||||
message::Payload,
|
||||
Node,
|
||||
};
|
||||
use crate::types::{Block, BlockNumber, ConnId, NodeDetails, NodeId, NodeLocation, Timestamp};
|
||||
use crate::util::{now, DenseMap, NumStats};
|
||||
|
||||
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
|
||||
|
||||
pub type ChainId = usize;
|
||||
pub type Label = Arc<str>;
|
||||
|
||||
pub struct Chain {
|
||||
cid: ChainId,
|
||||
/// Who to inform if the Chain drops itself
|
||||
aggregator: Addr<Aggregator>,
|
||||
/// Label of this chain, along with count of nodes that use this label
|
||||
label: (Label, usize),
|
||||
/// Dense mapping of NodeId -> Node
|
||||
nodes: DenseMap<Node>,
|
||||
/// Dense mapping of FeedId -> Addr<FeedConnector>,
|
||||
feeds: DenseMap<Addr<FeedConnector>>,
|
||||
/// Mapping of FeedId -> Addr<FeedConnector> for feeds requiring finality info,
|
||||
finality_feeds: FxHashMap<FeedId, Addr<FeedConnector>>,
|
||||
/// Best block
|
||||
best: Block,
|
||||
/// Finalized block
|
||||
finalized: Block,
|
||||
/// Block times history, stored so we can calculate averages
|
||||
block_times: NumStats<u64>,
|
||||
/// Calculated average block time
|
||||
average_block_time: Option<u64>,
|
||||
/// Message serializer
|
||||
serializer: FeedMessageSerializer,
|
||||
/// When the best block first arrived
|
||||
timestamp: Option<Timestamp>,
|
||||
/// Some nodes might manifest a different label, note them here
|
||||
labels: HashMap<Label, usize>,
|
||||
}
|
||||
|
||||
impl Chain {
|
||||
pub fn new(cid: ChainId, aggregator: Addr<Aggregator>, label: Label) -> Self {
|
||||
log::info!("[{}] Created", label);
|
||||
|
||||
Chain {
|
||||
cid,
|
||||
aggregator,
|
||||
label: (label, 0),
|
||||
nodes: DenseMap::new(),
|
||||
feeds: DenseMap::new(),
|
||||
finality_feeds: FxHashMap::default(),
|
||||
best: Block::zero(),
|
||||
finalized: Block::zero(),
|
||||
block_times: NumStats::new(50),
|
||||
average_block_time: None,
|
||||
serializer: FeedMessageSerializer::new(),
|
||||
timestamp: None,
|
||||
labels: HashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn increment_label_count(&mut self, label: &str) {
|
||||
let count = match self.labels.get_mut(label) {
|
||||
Some(count) => {
|
||||
*count += 1;
|
||||
*count
|
||||
}
|
||||
None => {
|
||||
self.labels.insert(label.into(), 1);
|
||||
1
|
||||
}
|
||||
};
|
||||
|
||||
if &*self.label.0 == label {
|
||||
self.label.1 += 1;
|
||||
} else if count > self.label.1 {
|
||||
self.rename(label.into(), count);
|
||||
}
|
||||
}
|
||||
|
||||
fn decrement_label_count(&mut self, label: &str) {
|
||||
match self.labels.get_mut(label) {
|
||||
Some(count) => *count -= 1,
|
||||
None => return,
|
||||
};
|
||||
|
||||
if &*self.label.0 == label {
|
||||
self.label.1 -= 1;
|
||||
|
||||
for (label, &count) in self.labels.iter() {
|
||||
if count > self.label.1 {
|
||||
let label: Arc<_> = label.clone();
|
||||
self.rename(label, count);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn rename(&mut self, label: Label, count: usize) {
|
||||
self.label = (label, count);
|
||||
|
||||
self.aggregator
|
||||
.do_send(RenameChain(self.cid, self.label.0.clone()));
|
||||
}
|
||||
|
||||
fn broadcast(&mut self) {
|
||||
if let Some(msg) = self.serializer.finalize() {
|
||||
for (_, feed) in self.feeds.iter() {
|
||||
feed.do_send(msg.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn broadcast_finality(&mut self) {
|
||||
if let Some(msg) = self.serializer.finalize() {
|
||||
for feed in self.finality_feeds.values() {
|
||||
feed.do_send(msg.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Triggered when the number of nodes in this chain has changed, Aggregator will
|
||||
/// propagate new counts to all connected feeds
|
||||
fn update_count(&self) {
|
||||
self.aggregator
|
||||
.do_send(NodeCount(self.cid, self.nodes.len()));
|
||||
}
|
||||
|
||||
/// Check if the chain is stale (has not received a new best block in a while).
|
||||
/// If so, find a new best block, ignoring any stale nodes and marking them as such.
|
||||
fn update_stale_nodes(&mut self, now: u64) {
|
||||
let threshold = now - STALE_TIMEOUT;
|
||||
let timestamp = match self.timestamp {
|
||||
Some(ts) => ts,
|
||||
None => return,
|
||||
};
|
||||
|
||||
if timestamp > threshold {
|
||||
// Timestamp is in range, nothing to do
|
||||
return;
|
||||
}
|
||||
|
||||
let mut best = Block::zero();
|
||||
let mut finalized = Block::zero();
|
||||
let mut timestamp = None;
|
||||
|
||||
for (nid, node) in self.nodes.iter_mut() {
|
||||
if !node.update_stale(threshold) {
|
||||
if node.best().height > best.height {
|
||||
best = *node.best();
|
||||
timestamp = Some(node.best_timestamp());
|
||||
}
|
||||
|
||||
if node.finalized().height > finalized.height {
|
||||
finalized = *node.finalized();
|
||||
}
|
||||
} else {
|
||||
self.serializer.push(feed::StaleNode(nid));
|
||||
}
|
||||
}
|
||||
|
||||
if self.best.height != 0 || self.finalized.height != 0 {
|
||||
self.best = best;
|
||||
self.finalized = finalized;
|
||||
self.block_times.reset();
|
||||
self.timestamp = timestamp;
|
||||
|
||||
self.serializer.push(feed::BestBlock(
|
||||
self.best.height,
|
||||
timestamp.unwrap_or(now),
|
||||
None,
|
||||
));
|
||||
self.serializer
|
||||
.push(feed::BestFinalized(finalized.height, finalized.hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Chain {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn stopped(&mut self, _: &mut Self::Context) {
|
||||
self.aggregator.do_send(DropChain(self.cid));
|
||||
|
||||
for (_, feed) in self.feeds.iter() {
|
||||
feed.do_send(Unsubscribed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Message sent from the Aggregator to the Chain when new Node is connected
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct AddNode {
|
||||
/// Details of the node being added to the aggregator
|
||||
pub node: NodeDetails,
|
||||
/// Connection id used by the node connector for multiplexing parachains
|
||||
pub conn_id: ConnId,
|
||||
/// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages.
|
||||
pub node_connector: Addr<NodeConnector>,
|
||||
}
|
||||
|
||||
/// Message sent from the NodeConnector to the Chain when it receives new telemetry data
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct UpdateNode {
|
||||
pub nid: NodeId,
|
||||
pub raw: Option<Bytes>,
|
||||
pub payload: Payload,
|
||||
}
|
||||
|
||||
/// Message sent from the NodeConnector to the Chain when the connector disconnects
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct RemoveNode(pub NodeId);
|
||||
|
||||
/// Message sent from the Aggregator to the Chain when the connector wants to subscribe to that chain
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Subscribe(pub Addr<FeedConnector>);
|
||||
|
||||
/// Message sent from the FeedConnector before it subscribes to a new chain, or if it disconnects
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Unsubscribe(pub FeedId);
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct SendFinality(pub FeedId);
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct NoMoreFinality(pub FeedId);
|
||||
|
||||
/// Message sent from the NodeConnector to the Chain when it receives location data
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct LocateNode {
|
||||
pub nid: NodeId,
|
||||
pub location: Arc<NodeLocation>,
|
||||
}
|
||||
|
||||
pub struct GetNodeNetworkState(pub NodeId);
|
||||
|
||||
impl Message for GetNodeNetworkState {
|
||||
type Result = Option<Bytes>;
|
||||
}
|
||||
|
||||
impl Handler<AddNode> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||
let AddNode {
|
||||
node,
|
||||
conn_id,
|
||||
node_connector,
|
||||
} = msg;
|
||||
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
|
||||
self.increment_label_count(&node.chain);
|
||||
|
||||
let nid = self.nodes.add(Node::new(node));
|
||||
let chain = ctx.address();
|
||||
|
||||
if node_connector
|
||||
.try_send(Initialize {
|
||||
nid,
|
||||
conn_id,
|
||||
chain,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
self.nodes.remove(nid);
|
||||
} else if let Some(node) = self.nodes.get(nid) {
|
||||
self.serializer.push(feed::AddedNode(nid, node));
|
||||
self.broadcast();
|
||||
}
|
||||
|
||||
self.update_count();
|
||||
}
|
||||
}
|
||||
|
||||
impl Chain {
|
||||
fn handle_block(&mut self, block: &Block, nid: NodeId) {
|
||||
let mut propagation_time = None;
|
||||
let now = now();
|
||||
let nodes_len = self.nodes.len();
|
||||
|
||||
self.update_stale_nodes(now);
|
||||
|
||||
let node = match self.nodes.get_mut(nid) {
|
||||
Some(node) => node,
|
||||
None => return,
|
||||
};
|
||||
|
||||
if node.update_block(*block) {
|
||||
if block.height > self.best.height {
|
||||
self.best = *block;
|
||||
log::debug!(
|
||||
"[{}] [nodes={}/feeds={}] new best block={}/{:?}",
|
||||
self.label.0,
|
||||
nodes_len,
|
||||
self.feeds.len(),
|
||||
self.best.height,
|
||||
self.best.hash,
|
||||
);
|
||||
if let Some(timestamp) = self.timestamp {
|
||||
self.block_times.push(now - timestamp);
|
||||
self.average_block_time = Some(self.block_times.average());
|
||||
}
|
||||
self.timestamp = Some(now);
|
||||
self.serializer.push(feed::BestBlock(
|
||||
self.best.height,
|
||||
now,
|
||||
self.average_block_time,
|
||||
));
|
||||
propagation_time = Some(0);
|
||||
} else if block.height == self.best.height {
|
||||
if let Some(timestamp) = self.timestamp {
|
||||
propagation_time = Some(now - timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(details) = node.update_details(now, propagation_time) {
|
||||
self.serializer.push(feed::ImportedBlock(nid, details));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<UpdateNode> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) {
|
||||
let UpdateNode { nid, payload, raw } = msg;
|
||||
|
||||
if let Some(block) = payload.best_block() {
|
||||
self.handle_block(block, nid);
|
||||
}
|
||||
|
||||
if let Some(node) = self.nodes.get_mut(nid) {
|
||||
match payload {
|
||||
Payload::SystemInterval(ref interval) => {
|
||||
if interval.network_state.is_some() {
|
||||
if let Some(raw) = raw {
|
||||
node.set_network_state(raw);
|
||||
}
|
||||
}
|
||||
|
||||
if node.update_hardware(interval) {
|
||||
self.serializer.push(feed::Hardware(nid, node.hardware()));
|
||||
}
|
||||
|
||||
if let Some(stats) = node.update_stats(interval) {
|
||||
self.serializer.push(feed::NodeStatsUpdate(nid, stats));
|
||||
}
|
||||
|
||||
if let Some(io) = node.update_io(interval) {
|
||||
self.serializer.push(feed::NodeIOUpdate(nid, io));
|
||||
}
|
||||
}
|
||||
Payload::SystemNetworkState(_) => {
|
||||
if let Some(raw) = raw {
|
||||
node.set_network_state(raw);
|
||||
}
|
||||
}
|
||||
Payload::AfgAuthoritySet(authority) => {
|
||||
node.set_validator_address(authority.authority_id.clone());
|
||||
self.broadcast();
|
||||
return;
|
||||
}
|
||||
Payload::AfgFinalized(finalized) => {
|
||||
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
|
||||
{
|
||||
if let Some(addr) = node.details().validator.clone() {
|
||||
self.serializer.push(feed::AfgFinalized(
|
||||
addr,
|
||||
finalized_number,
|
||||
finalized.finalized_hash,
|
||||
));
|
||||
self.broadcast_finality();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
Payload::AfgReceivedPrecommit(precommit) => {
|
||||
if let Ok(finalized_number) =
|
||||
precommit.received.target_number.parse::<BlockNumber>()
|
||||
{
|
||||
if let Some(addr) = node.details().validator.clone() {
|
||||
let voter = precommit.received.voter.clone();
|
||||
self.serializer.push(feed::AfgReceivedPrecommit(
|
||||
addr,
|
||||
finalized_number,
|
||||
precommit.received.target_hash,
|
||||
voter,
|
||||
));
|
||||
self.broadcast_finality();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
Payload::AfgReceivedPrevote(prevote) => {
|
||||
if let Ok(finalized_number) =
|
||||
prevote.received.target_number.parse::<BlockNumber>()
|
||||
{
|
||||
if let Some(addr) = node.details().validator.clone() {
|
||||
let voter = prevote.received.voter.clone();
|
||||
self.serializer.push(feed::AfgReceivedPrevote(
|
||||
addr,
|
||||
finalized_number,
|
||||
prevote.received.target_hash,
|
||||
voter,
|
||||
));
|
||||
self.broadcast_finality();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
Payload::AfgReceivedCommit(_) => {}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
if let Some(block) = payload.finalized_block() {
|
||||
if let Some(finalized) = node.update_finalized(block) {
|
||||
self.serializer.push(feed::FinalizedBlock(
|
||||
nid,
|
||||
finalized.height,
|
||||
finalized.hash,
|
||||
));
|
||||
|
||||
if finalized.height > self.finalized.height {
|
||||
self.finalized = *finalized;
|
||||
self.serializer
|
||||
.push(feed::BestFinalized(finalized.height, finalized.hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.broadcast();
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<LocateNode> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: LocateNode, _: &mut Self::Context) {
|
||||
let LocateNode { nid, location } = msg;
|
||||
|
||||
if let Some(node) = self.nodes.get_mut(nid) {
|
||||
self.serializer.push(feed::LocatedNode(
|
||||
nid,
|
||||
location.latitude,
|
||||
location.longitude,
|
||||
&location.city,
|
||||
));
|
||||
|
||||
node.update_location(location);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<RemoveNode> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: RemoveNode, ctx: &mut Self::Context) {
|
||||
let RemoveNode(nid) = msg;
|
||||
|
||||
if let Some(node) = self.nodes.remove(nid) {
|
||||
self.decrement_label_count(&node.details().chain);
|
||||
}
|
||||
|
||||
if self.nodes.is_empty() {
|
||||
log::info!("[{}] Lost all nodes, dropping...", self.label.0);
|
||||
ctx.stop();
|
||||
}
|
||||
|
||||
self.serializer.push(feed::RemovedNode(nid));
|
||||
self.broadcast();
|
||||
self.update_count();
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Subscribe> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Subscribe, ctx: &mut Self::Context) {
|
||||
let Subscribe(feed) = msg;
|
||||
let fid = self.feeds.add(feed.clone());
|
||||
|
||||
feed.do_send(Subscribed(fid, ctx.address().recipient()));
|
||||
|
||||
self.serializer.push(feed::SubscribedTo(&self.label.0));
|
||||
self.serializer.push(feed::TimeSync(now()));
|
||||
self.serializer.push(feed::BestBlock(
|
||||
self.best.height,
|
||||
self.timestamp.unwrap_or(0),
|
||||
self.average_block_time,
|
||||
));
|
||||
self.serializer.push(feed::BestFinalized(
|
||||
self.finalized.height,
|
||||
self.finalized.hash,
|
||||
));
|
||||
|
||||
for (idx, (nid, node)) in self.nodes.iter().enumerate() {
|
||||
// Send subscription confirmation and chain head before doing all the nodes,
|
||||
// and continue sending batches of 32 nodes a time over the wire subsequently
|
||||
if idx % 32 == 0 {
|
||||
if let Some(serialized) = self.serializer.finalize() {
|
||||
feed.do_send(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
self.serializer.push(feed::AddedNode(nid, node));
|
||||
self.serializer.push(feed::FinalizedBlock(
|
||||
nid,
|
||||
node.finalized().height,
|
||||
node.finalized().hash,
|
||||
));
|
||||
if node.stale() {
|
||||
self.serializer.push(feed::StaleNode(nid));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(serialized) = self.serializer.finalize() {
|
||||
feed.do_send(serialized);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<SendFinality> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: SendFinality, _ctx: &mut Self::Context) {
|
||||
let SendFinality(fid) = msg;
|
||||
if let Some(feed) = self.feeds.get(fid) {
|
||||
self.finality_feeds.insert(fid, feed.clone());
|
||||
}
|
||||
|
||||
// info!("Added new finality feed {}", fid);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<NoMoreFinality> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: NoMoreFinality, _: &mut Self::Context) {
|
||||
let NoMoreFinality(fid) = msg;
|
||||
|
||||
// info!("Removed finality feed {}", fid);
|
||||
self.finality_feeds.remove(&fid);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Unsubscribe> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Unsubscribe, _: &mut Self::Context) {
|
||||
let Unsubscribe(fid) = msg;
|
||||
|
||||
if let Some(feed) = self.feeds.get(fid) {
|
||||
self.serializer.push(feed::UnsubscribedFrom(&self.label.0));
|
||||
|
||||
if let Some(serialized) = self.serializer.finalize() {
|
||||
feed.do_send(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
self.feeds.remove(fid);
|
||||
self.finality_feeds.remove(&fid);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<GetNodeNetworkState> for Chain {
|
||||
type Result = <GetNodeNetworkState as Message>::Result;
|
||||
|
||||
fn handle(&mut self, msg: GetNodeNetworkState, _: &mut Self::Context) -> Self::Result {
|
||||
let GetNodeNetworkState(nid) = msg;
|
||||
|
||||
self.nodes.get(nid)?.network_state()
|
||||
}
|
||||
}
|
||||
@@ -1,195 +0,0 @@
|
||||
use serde::ser::{SerializeTuple, Serializer};
|
||||
use serde::Serialize;
|
||||
use std::mem;
|
||||
|
||||
use crate::node::Node;
|
||||
use crate::types::{
|
||||
Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
|
||||
Timestamp,
|
||||
};
|
||||
use serde_json::to_writer;
|
||||
|
||||
pub mod connector;
|
||||
|
||||
use connector::Serialized;
|
||||
|
||||
pub trait FeedMessage: Serialize {
|
||||
const ACTION: u8;
|
||||
}
|
||||
|
||||
pub struct FeedMessageSerializer {
|
||||
/// Current buffer,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
const BUFCAP: usize = 128;
|
||||
|
||||
impl FeedMessageSerializer {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
buffer: Vec::with_capacity(BUFCAP),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push<Message>(&mut self, msg: Message)
|
||||
where
|
||||
Message: FeedMessage,
|
||||
{
|
||||
let glue = match self.buffer.len() {
|
||||
0 => b'[',
|
||||
_ => b',',
|
||||
};
|
||||
|
||||
self.buffer.push(glue);
|
||||
let _ = to_writer(&mut self.buffer, &Message::ACTION);
|
||||
self.buffer.push(b',');
|
||||
let _ = to_writer(&mut self.buffer, &msg);
|
||||
}
|
||||
|
||||
pub fn finalize(&mut self) -> Option<Serialized> {
|
||||
if self.buffer.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.buffer.push(b']');
|
||||
|
||||
let bytes = mem::replace(&mut self.buffer, Vec::with_capacity(BUFCAP)).into();
|
||||
|
||||
Some(Serialized(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! actions {
|
||||
($($action:literal: $t:ty,)*) => {
|
||||
$(
|
||||
impl FeedMessage for $t {
|
||||
const ACTION: u8 = $action;
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
actions! {
|
||||
0x00: Version,
|
||||
0x01: BestBlock,
|
||||
0x02: BestFinalized,
|
||||
0x03: AddedNode<'_>,
|
||||
0x04: RemovedNode,
|
||||
0x05: LocatedNode<'_>,
|
||||
0x06: ImportedBlock<'_>,
|
||||
0x07: FinalizedBlock,
|
||||
0x08: NodeStatsUpdate<'_>,
|
||||
0x09: Hardware<'_>,
|
||||
0x0A: TimeSync,
|
||||
0x0B: AddedChain<'_>,
|
||||
0x0C: RemovedChain<'_>,
|
||||
0x0D: SubscribedTo<'_>,
|
||||
0x0E: UnsubscribedFrom<'_>,
|
||||
0x0F: Pong<'_>,
|
||||
0x10: AfgFinalized,
|
||||
0x11: AfgReceivedPrevote,
|
||||
0x12: AfgReceivedPrecommit,
|
||||
0x13: AfgAuthoritySet,
|
||||
0x14: StaleNode,
|
||||
0x15: NodeIOUpdate<'_>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct Version(pub usize);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct BestBlock(pub BlockNumber, pub Timestamp, pub Option<u64>);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct BestFinalized(pub BlockNumber, pub BlockHash);
|
||||
|
||||
pub struct AddedNode<'a>(pub NodeId, pub &'a Node);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct RemovedNode(pub NodeId);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct LocatedNode<'a>(pub NodeId, pub f32, pub f32, pub &'a str);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct ImportedBlock<'a>(pub NodeId, pub &'a BlockDetails);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FinalizedBlock(pub NodeId, pub BlockNumber, pub BlockHash);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct NodeStatsUpdate<'a>(pub NodeId, pub &'a NodeStats);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct NodeIOUpdate<'a>(pub NodeId, pub &'a NodeIO);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct Hardware<'a>(pub NodeId, pub &'a NodeHardware);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TimeSync(pub u64);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AddedChain<'a>(pub &'a str, pub usize);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct RemovedChain<'a>(pub &'a str);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct SubscribedTo<'a>(pub &'a str);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct UnsubscribedFrom<'a>(pub &'a str);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct Pong<'a>(pub &'a str);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AfgFinalized(pub Address, pub BlockNumber, pub BlockHash);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AfgReceivedPrevote(
|
||||
pub Address,
|
||||
pub BlockNumber,
|
||||
pub BlockHash,
|
||||
pub Option<Address>,
|
||||
);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AfgReceivedPrecommit(
|
||||
pub Address,
|
||||
pub BlockNumber,
|
||||
pub BlockHash,
|
||||
pub Option<Address>,
|
||||
);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AfgAuthoritySet(
|
||||
pub Address,
|
||||
pub Address,
|
||||
pub Address,
|
||||
pub BlockNumber,
|
||||
pub BlockHash,
|
||||
);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct StaleNode(pub NodeId);
|
||||
|
||||
impl Serialize for AddedNode<'_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let AddedNode(nid, node) = self;
|
||||
let mut tup = serializer.serialize_tuple(8)?;
|
||||
tup.serialize_element(nid)?;
|
||||
tup.serialize_element(node.details())?;
|
||||
tup.serialize_element(node.stats())?;
|
||||
tup.serialize_element(node.io())?;
|
||||
tup.serialize_element(node.hardware())?;
|
||||
tup.serialize_element(node.block_details())?;
|
||||
tup.serialize_element(&node.location())?;
|
||||
tup.serialize_element(&node.startup_time())?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
@@ -1,217 +0,0 @@
|
||||
use crate::aggregator::{Aggregator, Connect, Disconnect, NoMoreFinality, SendFinality, Subscribe};
|
||||
use crate::chain::Unsubscribe;
|
||||
use crate::feed::{FeedMessageSerializer, Pong};
|
||||
use crate::util::fnv;
|
||||
use actix::prelude::*;
|
||||
use actix_web_actors::ws;
|
||||
use bytes::Bytes;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub type FeedId = usize;
|
||||
|
||||
/// How often heartbeat pings are sent
|
||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
|
||||
/// How long before lack of client response causes a timeout
|
||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
pub struct FeedConnector {
|
||||
/// FeedId that Aggregator holds of this actor
|
||||
fid_aggregator: FeedId,
|
||||
/// FeedId that Chain holds of this actor
|
||||
fid_chain: FeedId,
|
||||
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
|
||||
hb: Instant,
|
||||
/// Aggregator actor address
|
||||
aggregator: Addr<Aggregator>,
|
||||
/// Chain actor address
|
||||
chain: Option<Recipient<Unsubscribe>>,
|
||||
/// FNV hash of the chain label, optimization to avoid double-subscribing
|
||||
chain_label_hash: u64,
|
||||
/// Message serializer
|
||||
serializer: FeedMessageSerializer,
|
||||
}
|
||||
|
||||
impl Actor for FeedConnector {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.heartbeat(ctx);
|
||||
self.aggregator.do_send(Connect(ctx.address()));
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _: &mut Self::Context) {
|
||||
if let Some(chain) = self.chain.take() {
|
||||
let _ = chain.do_send(Unsubscribe(self.fid_chain));
|
||||
}
|
||||
|
||||
self.aggregator.do_send(Disconnect(self.fid_aggregator));
|
||||
}
|
||||
}
|
||||
|
||||
impl FeedConnector {
|
||||
pub fn new(aggregator: Addr<Aggregator>) -> Self {
|
||||
Self {
|
||||
// Garbage id, will be replaced by the Connected message
|
||||
fid_aggregator: !0,
|
||||
// Garbage id, will be replaced by the Subscribed message
|
||||
fid_chain: !0,
|
||||
hb: Instant::now(),
|
||||
aggregator,
|
||||
chain: None,
|
||||
chain_label_hash: 0,
|
||||
serializer: FeedMessageSerializer::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn heartbeat(&self, ctx: &mut <Self as Actor>::Context) {
|
||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
||||
// check client heartbeats
|
||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
||||
// stop actor
|
||||
ctx.stop();
|
||||
} else {
|
||||
ctx.ping(b"")
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_cmd(&mut self, cmd: &str, payload: &str, ctx: &mut <Self as Actor>::Context) {
|
||||
match cmd {
|
||||
"subscribe" => {
|
||||
match fnv(payload) {
|
||||
hash if hash == self.chain_label_hash => return,
|
||||
hash => self.chain_label_hash = hash,
|
||||
}
|
||||
|
||||
self.aggregator
|
||||
.send(Subscribe {
|
||||
chain: payload.into(),
|
||||
feed: ctx.address(),
|
||||
})
|
||||
.into_actor(self)
|
||||
.then(|res, actor, _| {
|
||||
match res {
|
||||
Ok(true) => (),
|
||||
// Chain not found, reset hash
|
||||
_ => actor.chain_label_hash = 0,
|
||||
}
|
||||
async {}.into_actor(actor)
|
||||
})
|
||||
.wait(ctx);
|
||||
}
|
||||
"send-finality" => {
|
||||
self.aggregator.do_send(SendFinality {
|
||||
chain: payload.into(),
|
||||
fid: self.fid_chain,
|
||||
});
|
||||
}
|
||||
"no-more-finality" => {
|
||||
self.aggregator.do_send(NoMoreFinality {
|
||||
chain: payload.into(),
|
||||
fid: self.fid_chain,
|
||||
});
|
||||
}
|
||||
"ping" => {
|
||||
self.serializer.push(Pong(payload));
|
||||
if let Some(serialized) = self.serializer.finalize() {
|
||||
ctx.binary(serialized.0);
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Message sent form Chain to the FeedConnector upon successful subscription
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Subscribed(pub FeedId, pub Recipient<Unsubscribe>);
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Unsubscribed;
|
||||
|
||||
/// Message sent from Aggregator to FeedConnector upon successful connection
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Connected(pub FeedId);
|
||||
|
||||
/// Message sent from either Aggregator or Chain to FeedConnector containing
|
||||
/// serialized message(s) for the frontend
|
||||
///
|
||||
/// Since Bytes is ARC'ed, this is cheap to clone
|
||||
#[derive(Message, Clone)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Serialized(pub Bytes);
|
||||
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for FeedConnector {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
match msg {
|
||||
Ok(ws::Message::Ping(msg)) => {
|
||||
self.hb = Instant::now();
|
||||
ctx.pong(&msg);
|
||||
}
|
||||
Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
|
||||
Ok(ws::Message::Text(text)) => {
|
||||
if let Some(idx) = text.find(':') {
|
||||
let cmd = &text[..idx];
|
||||
let payload = &text[idx + 1..];
|
||||
|
||||
log::info!("New FEED message: {}", cmd);
|
||||
|
||||
self.handle_cmd(cmd, payload, ctx);
|
||||
}
|
||||
}
|
||||
Ok(ws::Message::Close(_)) => ctx.stop(),
|
||||
Ok(_) => (),
|
||||
Err(error) => {
|
||||
log::error!("{:?}", error);
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Subscribed> for FeedConnector {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Subscribed, _: &mut Self::Context) {
|
||||
let Subscribed(fid_chain, chain) = msg;
|
||||
|
||||
if let Some(current) = self.chain.take() {
|
||||
let _ = current.do_send(Unsubscribe(self.fid_chain));
|
||||
}
|
||||
|
||||
self.fid_chain = fid_chain;
|
||||
self.chain = Some(chain);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Unsubscribed> for FeedConnector {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, _: Unsubscribed, _: &mut Self::Context) {
|
||||
self.chain = None;
|
||||
self.chain_label_hash = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Connected> for FeedConnector {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Connected, _: &mut Self::Context) {
|
||||
let Connected(fid_aggregator) = msg;
|
||||
|
||||
self.fid_aggregator = fid_aggregator;
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Serialized> for FeedConnector {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Serialized, ctx: &mut Self::Context) {
|
||||
let Serialized(bytes) = msg;
|
||||
|
||||
ctx.binary(bytes);
|
||||
}
|
||||
}
|
||||
@@ -1,223 +0,0 @@
|
||||
use std::collections::HashSet;
|
||||
use std::iter::FromIterator;
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
use actix::prelude::*;
|
||||
use actix_http::ws::Codec;
|
||||
use actix_web::{get, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
|
||||
use actix_web_actors::ws;
|
||||
use clap::Clap;
|
||||
use simple_logger::SimpleLogger;
|
||||
|
||||
mod aggregator;
|
||||
mod chain;
|
||||
mod feed;
|
||||
mod node;
|
||||
mod shard;
|
||||
mod types;
|
||||
mod util;
|
||||
|
||||
use aggregator::{Aggregator, GetHealth, GetNetworkState};
|
||||
use feed::connector::FeedConnector;
|
||||
use node::connector::NodeConnector;
|
||||
use shard::connector::ShardConnector;
|
||||
use types::NodeId;
|
||||
use util::{Locator, LocatorFactory};
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||
const NAME: &str = "Substrate Telemetry Backend";
|
||||
const ABOUT: &str = "This is the Telemetry Backend that injects and provide the data sent by Substrate/Polkadot nodes";
|
||||
|
||||
#[derive(Clap, Debug)]
|
||||
#[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)]
|
||||
struct Opts {
|
||||
#[clap(
|
||||
short = 'l',
|
||||
long = "listen",
|
||||
default_value = "127.0.0.1:8000",
|
||||
about = "This is the socket address Telemetry is listening to. This is restricted to localhost (127.0.0.1) by default and should be fine for most use cases. If you are using Telemetry in a container, you likely want to set this to '0.0.0.0:8000'"
|
||||
)]
|
||||
socket: std::net::SocketAddr,
|
||||
#[clap(
|
||||
required = false,
|
||||
long = "denylist",
|
||||
about = "Space delimited list of chains that are not allowed to connect to telemetry. Case sensitive."
|
||||
)]
|
||||
denylist: Vec<String>,
|
||||
#[clap(
|
||||
arg_enum,
|
||||
required = false,
|
||||
long = "log",
|
||||
default_value = "info",
|
||||
about = "Log level."
|
||||
)]
|
||||
log_level: LogLevel,
|
||||
}
|
||||
|
||||
#[derive(Clap, Debug, PartialEq)]
|
||||
enum LogLevel {
|
||||
Error,
|
||||
Warn,
|
||||
Info,
|
||||
Debug,
|
||||
Trace,
|
||||
}
|
||||
|
||||
impl From<&LogLevel> for log::LevelFilter {
|
||||
fn from(log_level: &LogLevel) -> Self {
|
||||
match log_level {
|
||||
LogLevel::Error => log::LevelFilter::Error,
|
||||
LogLevel::Warn => log::LevelFilter::Warn,
|
||||
LogLevel::Info => log::LevelFilter::Info,
|
||||
LogLevel::Debug => log::LevelFilter::Debug,
|
||||
LogLevel::Trace => log::LevelFilter::Trace,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Entry point for connecting nodes
|
||||
#[get("/submit")]
|
||||
async fn node_route(
|
||||
req: HttpRequest,
|
||||
stream: web::Payload,
|
||||
aggregator: web::Data<Addr<Aggregator>>,
|
||||
locator: web::Data<Addr<Locator>>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let ip = req
|
||||
.connection_info()
|
||||
.realip_remote_addr()
|
||||
.and_then(|mut addr| {
|
||||
if let Some(port_idx) = addr.find(':') {
|
||||
addr = &addr[..port_idx];
|
||||
}
|
||||
addr.parse::<Ipv4Addr>().ok()
|
||||
});
|
||||
|
||||
let mut res = ws::handshake(&req)?;
|
||||
let aggregator = aggregator.get_ref().clone();
|
||||
let locator = locator.get_ref().clone().recipient();
|
||||
|
||||
Ok(res.streaming(ws::WebsocketContext::with_codec(
|
||||
NodeConnector::new(aggregator, locator, ip),
|
||||
stream,
|
||||
Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit
|
||||
)))
|
||||
}
|
||||
|
||||
#[get("/shard_submit/{chain_hash}")]
|
||||
async fn shard_route(
|
||||
req: HttpRequest,
|
||||
stream: web::Payload,
|
||||
aggregator: web::Data<Addr<Aggregator>>,
|
||||
path: web::Path<Box<str>>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let hash_str = path.into_inner();
|
||||
let genesis_hash = hash_str.parse()?;
|
||||
|
||||
let mut res = ws::handshake(&req)?;
|
||||
|
||||
let aggregator = aggregator.get_ref().clone();
|
||||
|
||||
Ok(res.streaming(ws::WebsocketContext::with_codec(
|
||||
ShardConnector::new(aggregator, genesis_hash),
|
||||
stream,
|
||||
Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit
|
||||
)))
|
||||
}
|
||||
|
||||
/// Entry point for connecting feeds
|
||||
#[get("/feed")]
|
||||
async fn feed_route(
|
||||
req: HttpRequest,
|
||||
stream: web::Payload,
|
||||
aggregator: web::Data<Addr<Aggregator>>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
ws::start(
|
||||
FeedConnector::new(aggregator.get_ref().clone()),
|
||||
&req,
|
||||
stream,
|
||||
)
|
||||
}
|
||||
|
||||
/// Entry point for network state dump
|
||||
#[get("/network_state/{chain}/{nid}")]
|
||||
async fn state_route(
|
||||
path: web::Path<(Box<str>, NodeId)>,
|
||||
aggregator: web::Data<Addr<Aggregator>>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let (chain, nid) = path.into_inner();
|
||||
|
||||
let res = match aggregator.send(GetNetworkState(chain, nid)).await {
|
||||
Ok(Some(res)) => res.await,
|
||||
Ok(None) => Ok(None),
|
||||
Err(error) => Err(error),
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(Some(body)) => {
|
||||
HttpResponse::Ok()
|
||||
.content_type("application/json")
|
||||
.body(body)
|
||||
.await
|
||||
}
|
||||
Ok(None) => {
|
||||
HttpResponse::Ok()
|
||||
.body("Node has disconnected or has not submitted its network state yet")
|
||||
.await
|
||||
}
|
||||
Err(error) => {
|
||||
log::error!("Network state mailbox error: {:?}", error);
|
||||
|
||||
HttpResponse::InternalServerError().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Entry point for health check monitoring bots
|
||||
#[get("/health")]
|
||||
async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse, Error> {
|
||||
match aggregator.send(GetHealth).await {
|
||||
Ok(count) => {
|
||||
let body = format!("Connected chains: {}", count);
|
||||
|
||||
HttpResponse::Ok().body(body).await
|
||||
}
|
||||
Err(error) => {
|
||||
log::error!("Health check mailbox error: {:?}", error);
|
||||
|
||||
HttpResponse::InternalServerError().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Telemetry entry point. Listening by default on 127.0.0.1:8000.
|
||||
/// This can be changed using the `PORT` and `BIND` ENV variables.
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let opts = Opts::parse();
|
||||
let log_level = &opts.log_level;
|
||||
SimpleLogger::new()
|
||||
.with_level(log_level.into())
|
||||
.init()
|
||||
.expect("Must be able to start a logger");
|
||||
|
||||
let denylist = HashSet::from_iter(opts.denylist);
|
||||
let aggregator = Aggregator::new(denylist).start();
|
||||
let factory = LocatorFactory::new();
|
||||
let locator = SyncArbiter::start(4, move || factory.create());
|
||||
log::info!("Starting telemetry version: {}", env!("CARGO_PKG_VERSION"));
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(middleware::NormalizePath::default())
|
||||
.data(aggregator.clone())
|
||||
.data(locator.clone())
|
||||
.service(node_route)
|
||||
.service(feed_route)
|
||||
.service(state_route)
|
||||
.service(health)
|
||||
})
|
||||
.bind(opts.socket)?
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
@@ -1,244 +0,0 @@
|
||||
use bytes::Bytes;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::types::{
|
||||
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeId, NodeLocation, NodeStats,
|
||||
Timestamp,
|
||||
};
|
||||
use crate::util::now;
|
||||
|
||||
pub mod connector;
|
||||
pub mod message;
|
||||
|
||||
use message::SystemInterval;
|
||||
|
||||
/// Minimum time between block below broadcasting updates to the browser gets throttled, in ms.
|
||||
const THROTTLE_THRESHOLD: u64 = 100;
|
||||
/// Minimum time of intervals for block updates sent to the browser when throttled, in ms.
|
||||
const THROTTLE_INTERVAL: u64 = 1000;
|
||||
|
||||
pub struct Node {
|
||||
/// Static details
|
||||
details: NodeDetails,
|
||||
/// Basic stats
|
||||
stats: NodeStats,
|
||||
/// Node IO stats
|
||||
io: NodeIO,
|
||||
/// Best block
|
||||
best: BlockDetails,
|
||||
/// Finalized block
|
||||
finalized: Block,
|
||||
/// Timer for throttling block updates
|
||||
throttle: u64,
|
||||
/// Hardware stats over time
|
||||
hardware: NodeHardware,
|
||||
/// Physical location details
|
||||
location: Option<Arc<NodeLocation>>,
|
||||
/// Flag marking if the node is stale (not syncing or producing blocks)
|
||||
stale: bool,
|
||||
/// Unix timestamp for when node started up (falls back to connection time)
|
||||
startup_time: Option<Timestamp>,
|
||||
/// Network state
|
||||
network_state: Option<Bytes>,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub fn new(mut details: NodeDetails) -> Self {
|
||||
let startup_time = details
|
||||
.startup_time
|
||||
.take()
|
||||
.and_then(|time| time.parse().ok());
|
||||
|
||||
Node {
|
||||
details,
|
||||
stats: NodeStats::default(),
|
||||
io: NodeIO::default(),
|
||||
best: BlockDetails::default(),
|
||||
finalized: Block::zero(),
|
||||
throttle: 0,
|
||||
hardware: NodeHardware::default(),
|
||||
location: None,
|
||||
stale: false,
|
||||
startup_time,
|
||||
network_state: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn details(&self) -> &NodeDetails {
|
||||
&self.details
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> &NodeStats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
pub fn io(&self) -> &NodeIO {
|
||||
&self.io
|
||||
}
|
||||
|
||||
pub fn best(&self) -> &Block {
|
||||
&self.best.block
|
||||
}
|
||||
|
||||
pub fn best_timestamp(&self) -> u64 {
|
||||
self.best.block_timestamp
|
||||
}
|
||||
|
||||
pub fn finalized(&self) -> &Block {
|
||||
&self.finalized
|
||||
}
|
||||
|
||||
pub fn hardware(&self) -> &NodeHardware {
|
||||
&self.hardware
|
||||
}
|
||||
|
||||
pub fn location(&self) -> Option<&NodeLocation> {
|
||||
self.location.as_deref()
|
||||
}
|
||||
|
||||
pub fn update_location(&mut self, location: Arc<NodeLocation>) {
|
||||
self.location = Some(location);
|
||||
}
|
||||
|
||||
pub fn block_details(&self) -> &BlockDetails {
|
||||
&self.best
|
||||
}
|
||||
|
||||
pub fn update_block(&mut self, block: Block) -> bool {
|
||||
if block.height > self.best.block.height {
|
||||
self.stale = false;
|
||||
self.best.block = block;
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_details(
|
||||
&mut self,
|
||||
timestamp: u64,
|
||||
propagation_time: Option<u64>,
|
||||
) -> Option<&BlockDetails> {
|
||||
self.best.block_time = timestamp - self.best.block_timestamp;
|
||||
self.best.block_timestamp = timestamp;
|
||||
self.best.propagation_time = propagation_time;
|
||||
|
||||
if self.throttle < timestamp {
|
||||
if self.best.block_time <= THROTTLE_THRESHOLD {
|
||||
self.throttle = timestamp + THROTTLE_INTERVAL;
|
||||
}
|
||||
|
||||
Some(&self.best)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_hardware(&mut self, interval: &SystemInterval) -> bool {
|
||||
let mut changed = false;
|
||||
|
||||
if let Some(upload) = interval.bandwidth_upload {
|
||||
changed |= self.hardware.upload.push(upload);
|
||||
}
|
||||
if let Some(download) = interval.bandwidth_download {
|
||||
changed |= self.hardware.download.push(download);
|
||||
}
|
||||
self.hardware.chart_stamps.push(now() as f64);
|
||||
|
||||
changed
|
||||
}
|
||||
|
||||
pub fn update_stats(&mut self, interval: &SystemInterval) -> Option<&NodeStats> {
|
||||
let mut changed = false;
|
||||
|
||||
if let Some(peers) = interval.peers {
|
||||
if peers != self.stats.peers {
|
||||
self.stats.peers = peers;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
if let Some(txcount) = interval.txcount {
|
||||
if txcount != self.stats.txcount {
|
||||
self.stats.txcount = txcount;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
Some(&self.stats)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_io(&mut self, interval: &SystemInterval) -> Option<&NodeIO> {
|
||||
let mut changed = false;
|
||||
|
||||
if let Some(size) = interval.used_state_cache_size {
|
||||
changed |= self.io.used_state_cache_size.push(size);
|
||||
}
|
||||
|
||||
if changed {
|
||||
Some(&self.io)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_finalized(&mut self, block: Block) -> Option<&Block> {
|
||||
if block.height > self.finalized.height {
|
||||
self.finalized = block;
|
||||
Some(self.finalized())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_stale(&mut self, threshold: u64) -> bool {
|
||||
if self.best.block_timestamp < threshold {
|
||||
self.stale = true;
|
||||
}
|
||||
|
||||
self.stale
|
||||
}
|
||||
|
||||
pub fn stale(&self) -> bool {
|
||||
self.stale
|
||||
}
|
||||
|
||||
pub fn set_validator_address(&mut self, addr: Box<str>) {
|
||||
self.details.validator = Some(addr);
|
||||
}
|
||||
|
||||
pub fn set_network_state(&mut self, state: Bytes) {
|
||||
self.network_state = Some(state);
|
||||
}
|
||||
|
||||
pub fn network_state(&self) -> Option<Bytes> {
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Wrapper<'a> {
|
||||
#[serde(borrow)]
|
||||
#[serde(alias = "network_state")]
|
||||
state: &'a RawValue,
|
||||
}
|
||||
|
||||
let raw = self.network_state.as_ref()?;
|
||||
let wrap: Wrapper = serde_json::from_slice(raw).ok()?;
|
||||
let json = wrap.state.get();
|
||||
|
||||
// Handle old nodes that exposed network_state as stringified JSON
|
||||
if let Ok(stringified) = serde_json::from_str::<String>(json) {
|
||||
Some(stringified.into())
|
||||
} else {
|
||||
Some(json.to_owned().into())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn startup_time(&self) -> Option<Timestamp> {
|
||||
self.startup_time
|
||||
}
|
||||
}
|
||||
@@ -1,277 +0,0 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::mem;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::aggregator::{AddNode, Aggregator};
|
||||
use crate::chain::{Chain, RemoveNode, UpdateNode};
|
||||
use crate::node::message::{NodeMessage, Payload};
|
||||
use crate::node::NodeId;
|
||||
use crate::types::ConnId;
|
||||
use crate::util::LocateRequest;
|
||||
use actix::prelude::*;
|
||||
use actix_http::ws::Item;
|
||||
use actix_web_actors::ws::{self, CloseReason};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
|
||||
/// How often heartbeat pings are sent
|
||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
|
||||
/// How long before lack of client response causes a timeout
|
||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// Continuation buffer limit, 10mb
|
||||
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
|
||||
|
||||
pub struct NodeConnector {
|
||||
/// Multiplexing connections by id
|
||||
multiplex: BTreeMap<ConnId, ConnMultiplex>,
|
||||
/// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT),
|
||||
hb: Instant,
|
||||
/// Aggregator actor address
|
||||
aggregator: Addr<Aggregator>,
|
||||
/// IP address of the node this connector is responsible for
|
||||
ip: Option<Ipv4Addr>,
|
||||
/// Actix address of location services
|
||||
locator: Recipient<LocateRequest>,
|
||||
/// Buffer for constructing continuation messages
|
||||
contbuf: BytesMut,
|
||||
}
|
||||
|
||||
enum ConnMultiplex {
|
||||
Connected {
|
||||
/// Id of the node this multiplex connector is responsible for handling
|
||||
nid: NodeId,
|
||||
/// Chain address to which this multiplex connector is delegating messages
|
||||
chain: Addr<Chain>,
|
||||
},
|
||||
Waiting {
|
||||
/// Backlog of messages to be sent once we get a recipient handle to the chain
|
||||
backlog: Vec<Payload>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for ConnMultiplex {
|
||||
fn default() -> Self {
|
||||
ConnMultiplex::Waiting {
|
||||
backlog: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for NodeConnector {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.heartbeat(ctx);
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _: &mut Self::Context) {
|
||||
for mx in self.multiplex.values() {
|
||||
if let ConnMultiplex::Connected { chain, nid } = mx {
|
||||
chain.do_send(RemoveNode(*nid));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeConnector {
|
||||
pub fn new(
|
||||
aggregator: Addr<Aggregator>,
|
||||
locator: Recipient<LocateRequest>,
|
||||
ip: Option<Ipv4Addr>,
|
||||
) -> Self {
|
||||
Self {
|
||||
multiplex: BTreeMap::new(),
|
||||
hb: Instant::now(),
|
||||
aggregator,
|
||||
ip,
|
||||
locator,
|
||||
contbuf: BytesMut::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn heartbeat(&self, ctx: &mut <Self as Actor>::Context) {
|
||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
||||
// check client heartbeats
|
||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
||||
// stop actor
|
||||
ctx.close(Some(CloseReason {
|
||||
code: ws::CloseCode::Abnormal,
|
||||
description: Some("Missed heartbeat".into()),
|
||||
}));
|
||||
ctx.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
msg: NodeMessage,
|
||||
data: Bytes,
|
||||
ctx: &mut <Self as Actor>::Context,
|
||||
) {
|
||||
let conn_id = msg.id();
|
||||
let payload = msg.into();
|
||||
|
||||
match self.multiplex.entry(conn_id).or_default() {
|
||||
ConnMultiplex::Connected { nid, chain } => {
|
||||
chain.do_send(UpdateNode {
|
||||
nid: *nid,
|
||||
raw: Some(data),
|
||||
payload,
|
||||
});
|
||||
}
|
||||
ConnMultiplex::Waiting { backlog } => {
|
||||
if let Payload::SystemConnected(connected) = payload {
|
||||
self.aggregator.do_send(AddNode {
|
||||
node: connected.node,
|
||||
genesis_hash: connected.genesis_hash,
|
||||
conn_id,
|
||||
node_connector: ctx.address(),
|
||||
});
|
||||
} else {
|
||||
if backlog.len() >= 10 {
|
||||
backlog.remove(0);
|
||||
}
|
||||
|
||||
backlog.push(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_frame(&mut self, bytes: &[u8]) {
|
||||
if !self.contbuf.is_empty() {
|
||||
log::error!("Unused continuation buffer");
|
||||
self.contbuf.clear();
|
||||
}
|
||||
self.continue_frame(bytes);
|
||||
}
|
||||
|
||||
fn continue_frame(&mut self, bytes: &[u8]) {
|
||||
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
|
||||
self.contbuf.extend_from_slice(&bytes);
|
||||
} else {
|
||||
log::error!("Continuation buffer overflow");
|
||||
self.contbuf = BytesMut::new();
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_frame(&mut self) -> Bytes {
|
||||
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Mute {
|
||||
pub reason: CloseReason,
|
||||
}
|
||||
|
||||
impl Handler<Mute> for NodeConnector {
|
||||
type Result = ();
|
||||
fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) {
|
||||
let Mute { reason } = msg;
|
||||
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description);
|
||||
|
||||
ctx.close(Some(reason));
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Initialize {
|
||||
pub nid: NodeId,
|
||||
pub conn_id: ConnId,
|
||||
pub chain: Addr<Chain>,
|
||||
}
|
||||
|
||||
impl Handler<Initialize> for NodeConnector {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
|
||||
let Initialize {
|
||||
nid,
|
||||
conn_id,
|
||||
chain,
|
||||
} = msg;
|
||||
log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id);
|
||||
let mx = self.multiplex.entry(conn_id).or_default();
|
||||
|
||||
if let ConnMultiplex::Waiting { backlog } = mx {
|
||||
for payload in backlog.drain(..) {
|
||||
chain.do_send(UpdateNode {
|
||||
nid,
|
||||
raw: None,
|
||||
payload,
|
||||
});
|
||||
}
|
||||
|
||||
*mx = ConnMultiplex::Connected {
|
||||
nid,
|
||||
chain: chain.clone(),
|
||||
};
|
||||
};
|
||||
|
||||
// Acquire the node's physical location
|
||||
if let Some(ip) = self.ip {
|
||||
let _ = self.locator.do_send(LocateRequest { ip, nid, chain });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
self.hb = Instant::now();
|
||||
|
||||
let data = match msg {
|
||||
Ok(ws::Message::Ping(msg)) => {
|
||||
ctx.pong(&msg);
|
||||
return;
|
||||
}
|
||||
Ok(ws::Message::Pong(_)) => return,
|
||||
Ok(ws::Message::Text(text)) => text.into_bytes(),
|
||||
Ok(ws::Message::Binary(data)) => data,
|
||||
Ok(ws::Message::Close(reason)) => {
|
||||
ctx.close(reason);
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
Ok(ws::Message::Nop) => return,
|
||||
Ok(ws::Message::Continuation(cont)) => match cont {
|
||||
Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
|
||||
self.start_frame(&bytes);
|
||||
return;
|
||||
}
|
||||
Item::Continue(bytes) => {
|
||||
self.continue_frame(&bytes);
|
||||
return;
|
||||
}
|
||||
Item::Last(bytes) => {
|
||||
self.continue_frame(&bytes);
|
||||
self.finish_frame()
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
log::error!("{:?}", error);
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::from_slice(&data) {
|
||||
Ok(msg) => self.handle_message(msg, data, ctx),
|
||||
#[cfg(debug)]
|
||||
Err(err) => {
|
||||
let data: &[u8] = data.get(..512).unwrap_or_else(|| &data);
|
||||
log::warn!(
|
||||
"Failed to parse node message: {} {}",
|
||||
err,
|
||||
std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")
|
||||
)
|
||||
}
|
||||
#[cfg(not(debug))]
|
||||
Err(_) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,199 +0,0 @@
|
||||
use crate::node::NodeDetails;
|
||||
use crate::types::{Block, BlockHash, BlockNumber, ConnId};
|
||||
use crate::util::Hash;
|
||||
use actix::prelude::*;
|
||||
use serde::de::IgnoredAny;
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Deserialize, Debug, Message)]
|
||||
#[rtype(result = "()")]
|
||||
#[serde(untagged)]
|
||||
pub enum NodeMessage {
|
||||
V1 {
|
||||
#[serde(flatten)]
|
||||
payload: Payload,
|
||||
},
|
||||
V2 {
|
||||
id: ConnId,
|
||||
payload: Payload,
|
||||
},
|
||||
}
|
||||
|
||||
impl NodeMessage {
|
||||
/// Returns the connection ID or 0 if there is no ID.
|
||||
pub fn id(&self) -> ConnId {
|
||||
match self {
|
||||
NodeMessage::V1 { .. } => 0,
|
||||
NodeMessage::V2 { id, .. } => *id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NodeMessage> for Payload {
|
||||
fn from(msg: NodeMessage) -> Payload {
|
||||
match msg {
|
||||
NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(tag = "msg")]
|
||||
pub enum Payload {
|
||||
#[serde(rename = "system.connected")]
|
||||
SystemConnected(SystemConnected),
|
||||
#[serde(rename = "system.interval")]
|
||||
SystemInterval(SystemInterval),
|
||||
#[serde(rename = "system.network_state")]
|
||||
SystemNetworkState(IgnoredAny),
|
||||
#[serde(rename = "block.import")]
|
||||
BlockImport(Block),
|
||||
#[serde(rename = "notify.finalized")]
|
||||
NotifyFinalized(Finalized),
|
||||
#[serde(rename = "txpool.import")]
|
||||
TxPoolImport(IgnoredAny),
|
||||
#[serde(rename = "afg.finalized")]
|
||||
AfgFinalized(AfgFinalized),
|
||||
#[serde(rename = "afg.received_precommit")]
|
||||
AfgReceivedPrecommit(AfgReceivedPrecommit),
|
||||
#[serde(rename = "afg.received_prevote")]
|
||||
AfgReceivedPrevote(AfgReceivedPrevote),
|
||||
#[serde(rename = "afg.received_commit")]
|
||||
AfgReceivedCommit(AfgReceivedCommit),
|
||||
#[serde(rename = "afg.authority_set")]
|
||||
AfgAuthoritySet(AfgAuthoritySet),
|
||||
#[serde(rename = "afg.finalized_blocks_up_to")]
|
||||
AfgFinalizedBlocksUpTo(IgnoredAny),
|
||||
#[serde(rename = "aura.pre_sealed_block")]
|
||||
AuraPreSealedBlock(IgnoredAny),
|
||||
#[serde(rename = "prepared_block_for_proposing")]
|
||||
PreparedBlockForProposing(IgnoredAny),
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct SystemConnected {
|
||||
pub genesis_hash: Hash,
|
||||
#[serde(flatten)]
|
||||
pub node: NodeDetails,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct SystemInterval {
|
||||
pub peers: Option<u64>,
|
||||
pub txcount: Option<u64>,
|
||||
pub bandwidth_upload: Option<f64>,
|
||||
pub bandwidth_download: Option<f64>,
|
||||
pub finalized_height: Option<BlockNumber>,
|
||||
pub finalized_hash: Option<BlockHash>,
|
||||
#[serde(flatten)]
|
||||
pub block: Option<Block>,
|
||||
pub network_state: Option<IgnoredAny>,
|
||||
pub used_state_cache_size: Option<f32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct Finalized {
|
||||
#[serde(rename = "best")]
|
||||
pub hash: BlockHash,
|
||||
pub height: Box<str>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct AfgAuthoritySet {
|
||||
pub authority_id: Box<str>,
|
||||
pub authorities: Box<str>,
|
||||
pub authority_set_id: Box<str>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct AfgFinalized {
|
||||
pub finalized_hash: BlockHash,
|
||||
pub finalized_number: Box<str>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct AfgReceived {
|
||||
pub target_hash: BlockHash,
|
||||
pub target_number: Box<str>,
|
||||
pub voter: Option<Box<str>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct AfgReceivedPrecommit {
|
||||
#[serde(flatten)]
|
||||
pub received: AfgReceived,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct AfgReceivedPrevote {
|
||||
#[serde(flatten)]
|
||||
pub received: AfgReceived,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct AfgReceivedCommit {
|
||||
#[serde(flatten)]
|
||||
pub received: AfgReceived,
|
||||
}
|
||||
|
||||
impl Block {
|
||||
pub fn zero() -> Self {
|
||||
Block {
|
||||
hash: BlockHash::from([0; 32]),
|
||||
height: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Payload {
|
||||
pub fn best_block(&self) -> Option<&Block> {
|
||||
match self {
|
||||
Payload::BlockImport(block) => Some(block),
|
||||
Payload::SystemInterval(SystemInterval { block, .. }) => block.as_ref(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalized_block(&self) -> Option<Block> {
|
||||
match self {
|
||||
Payload::SystemInterval(ref interval) => Some(Block {
|
||||
hash: interval.finalized_hash?,
|
||||
height: interval.finalized_height?,
|
||||
}),
|
||||
Payload::NotifyFinalized(ref finalized) => Some(Block {
|
||||
hash: finalized.hash,
|
||||
height: finalized.height.parse().ok()?,
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn message_v1() {
|
||||
let json = r#"{"msg":"notify.finalized","level":"INFO","ts":"2021-01-13T12:38:25.410794650+01:00","best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb","height":"50"}"#;
|
||||
assert!(
|
||||
matches!(
|
||||
serde_json::from_str::<NodeMessage>(json).unwrap(),
|
||||
NodeMessage::V1 { .. },
|
||||
),
|
||||
"message did not match variant V1",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn message_v2() {
|
||||
let json = r#"{"id":1,"ts":"2021-01-13T12:22:20.053527101+01:00","payload":{"best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d","height":"209","msg":"notify.finalized"}}"#;
|
||||
assert!(
|
||||
matches!(
|
||||
serde_json::from_str::<NodeMessage>(json).unwrap(),
|
||||
NodeMessage::V2 { .. },
|
||||
),
|
||||
"message did not match variant V2",
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
use crate::node::message::Payload;
|
||||
use serde::Deserialize;
|
||||
|
||||
pub mod connector;
|
||||
|
||||
/// Alias for the ID of the node connection
|
||||
type ShardConnId = usize;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ShardMessage {
|
||||
pub conn_id: ShardConnId,
|
||||
pub payload: Payload,
|
||||
}
|
||||
@@ -1,156 +0,0 @@
|
||||
use std::mem;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::aggregator::{AddNode, Aggregator};
|
||||
use crate::chain::{Chain, RemoveNode, UpdateNode};
|
||||
use crate::shard::ShardMessage;
|
||||
use crate::types::NodeId;
|
||||
use crate::util::{DenseMap, Hash};
|
||||
use actix::prelude::*;
|
||||
use actix_http::ws::Item;
|
||||
use actix_web_actors::ws::{self, CloseReason};
|
||||
use bincode::Options;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
|
||||
/// How often heartbeat pings are sent
|
||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
|
||||
/// How long before lack of client response causes a timeout
|
||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// Continuation buffer limit, 10mb
|
||||
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
|
||||
|
||||
pub struct ShardConnector {
|
||||
/// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT),
|
||||
hb: Instant,
|
||||
/// Aggregator actor address
|
||||
aggregator: Addr<Aggregator>,
|
||||
/// Genesis hash of the chain this connection will be submitting data for
|
||||
genesis_hash: Hash,
|
||||
/// Chain address to which this multiplex connector is delegating messages
|
||||
chain: Option<Addr<Chain>>,
|
||||
/// Mapping `ShardConnId` to `NodeId`
|
||||
nodes: DenseMap<NodeId>,
|
||||
/// Buffer for constructing continuation messages
|
||||
contbuf: BytesMut,
|
||||
}
|
||||
|
||||
impl Actor for ShardConnector {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.heartbeat(ctx);
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _: &mut Self::Context) {
|
||||
if let Some(ref chain) = self.chain {
|
||||
for (_, nid) in self.nodes.iter() {
|
||||
chain.do_send(RemoveNode(*nid))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShardConnector {
|
||||
pub fn new(aggregator: Addr<Aggregator>, genesis_hash: Hash) -> Self {
|
||||
Self {
|
||||
hb: Instant::now(),
|
||||
aggregator,
|
||||
genesis_hash,
|
||||
chain: None,
|
||||
nodes: DenseMap::new(),
|
||||
contbuf: BytesMut::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn heartbeat(&self, ctx: &mut <Self as Actor>::Context) {
|
||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
||||
// check client heartbeats
|
||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
||||
// stop actor
|
||||
ctx.close(Some(CloseReason {
|
||||
code: ws::CloseCode::Abnormal,
|
||||
description: Some("Missed heartbeat".into()),
|
||||
}));
|
||||
ctx.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_message(&mut self, msg: ShardMessage, ctx: &mut <Self as Actor>::Context) {
|
||||
let ShardMessage { conn_id, payload } = msg;
|
||||
|
||||
// TODO: get `NodeId` for `ShardConnId` and proxy payload to `self.chain`.
|
||||
}
|
||||
|
||||
fn start_frame(&mut self, bytes: &[u8]) {
|
||||
if !self.contbuf.is_empty() {
|
||||
log::error!("Unused continuation buffer");
|
||||
self.contbuf.clear();
|
||||
}
|
||||
self.continue_frame(bytes);
|
||||
}
|
||||
|
||||
fn continue_frame(&mut self, bytes: &[u8]) {
|
||||
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
|
||||
self.contbuf.extend_from_slice(&bytes);
|
||||
} else {
|
||||
log::error!("Continuation buffer overflow");
|
||||
self.contbuf = BytesMut::new();
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_frame(&mut self) -> Bytes {
|
||||
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ShardConnector {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
self.hb = Instant::now();
|
||||
|
||||
let data = match msg {
|
||||
Ok(ws::Message::Ping(msg)) => {
|
||||
ctx.pong(&msg);
|
||||
return;
|
||||
}
|
||||
Ok(ws::Message::Pong(_)) => return,
|
||||
Ok(ws::Message::Text(text)) => text.into_bytes(),
|
||||
Ok(ws::Message::Binary(data)) => data,
|
||||
Ok(ws::Message::Close(reason)) => {
|
||||
ctx.close(reason);
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
Ok(ws::Message::Nop) => return,
|
||||
Ok(ws::Message::Continuation(cont)) => match cont {
|
||||
Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
|
||||
self.start_frame(&bytes);
|
||||
return;
|
||||
}
|
||||
Item::Continue(bytes) => {
|
||||
self.continue_frame(&bytes);
|
||||
return;
|
||||
}
|
||||
Item::Last(bytes) => {
|
||||
self.continue_frame(&bytes);
|
||||
self.finish_frame()
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
log::error!("{:?}", error);
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match bincode::options().deserialize(&data) {
|
||||
Ok(msg) => self.handle_message(msg, ctx),
|
||||
#[cfg(debug)]
|
||||
Err(err) => {
|
||||
log::warn!("Failed to parse shard message: {}", err,)
|
||||
}
|
||||
#[cfg(not(debug))]
|
||||
Err(_) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,155 +0,0 @@
|
||||
use serde::ser::{Serialize, SerializeTuple, Serializer};
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::util::{now, MeanList};
|
||||
|
||||
pub type NodeId = usize;
|
||||
pub type ConnId = u64;
|
||||
pub type BlockNumber = u64;
|
||||
pub type Timestamp = u64;
|
||||
pub type Address = Box<str>;
|
||||
pub use primitive_types::H256 as BlockHash;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct NodeDetails {
|
||||
pub chain: Box<str>,
|
||||
pub name: Box<str>,
|
||||
pub implementation: Box<str>,
|
||||
pub version: Box<str>,
|
||||
pub validator: Option<Box<str>>,
|
||||
pub network_id: Option<Box<str>>,
|
||||
pub startup_time: Option<Box<str>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct NodeStats {
|
||||
pub peers: u64,
|
||||
pub txcount: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NodeIO {
|
||||
pub used_state_cache_size: MeanList<f32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Copy)]
|
||||
pub struct Block {
|
||||
#[serde(rename = "best")]
|
||||
pub hash: BlockHash,
|
||||
pub height: BlockNumber,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct BlockDetails {
|
||||
pub block: Block,
|
||||
pub block_time: u64,
|
||||
pub block_timestamp: u64,
|
||||
pub propagation_time: Option<u64>,
|
||||
}
|
||||
|
||||
impl Default for BlockDetails {
|
||||
fn default() -> Self {
|
||||
BlockDetails {
|
||||
block: Block::zero(),
|
||||
block_timestamp: now(),
|
||||
block_time: 0,
|
||||
propagation_time: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NodeHardware {
|
||||
/// Upload uses means
|
||||
pub upload: MeanList<f64>,
|
||||
/// Download uses means
|
||||
pub download: MeanList<f64>,
|
||||
/// Stampchange uses means
|
||||
pub chart_stamps: MeanList<f64>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct NodeLocation {
|
||||
pub latitude: f32,
|
||||
pub longitude: f32,
|
||||
pub city: Box<str>,
|
||||
}
|
||||
|
||||
impl Serialize for NodeDetails {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut tup = serializer.serialize_tuple(6)?;
|
||||
tup.serialize_element(&self.name)?;
|
||||
tup.serialize_element(&self.implementation)?;
|
||||
tup.serialize_element(&self.version)?;
|
||||
tup.serialize_element(&self.validator)?;
|
||||
tup.serialize_element(&self.network_id)?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for NodeStats {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut tup = serializer.serialize_tuple(2)?;
|
||||
tup.serialize_element(&self.peers)?;
|
||||
tup.serialize_element(&self.txcount)?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for NodeIO {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut tup = serializer.serialize_tuple(1)?;
|
||||
tup.serialize_element(self.used_state_cache_size.slice())?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for BlockDetails {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut tup = serializer.serialize_tuple(5)?;
|
||||
tup.serialize_element(&self.block.height)?;
|
||||
tup.serialize_element(&self.block.hash)?;
|
||||
tup.serialize_element(&self.block_time)?;
|
||||
tup.serialize_element(&self.block_timestamp)?;
|
||||
tup.serialize_element(&self.propagation_time)?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for NodeLocation {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut tup = serializer.serialize_tuple(3)?;
|
||||
tup.serialize_element(&self.latitude)?;
|
||||
tup.serialize_element(&self.longitude)?;
|
||||
tup.serialize_element(&&*self.city)?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for NodeHardware {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut tup = serializer.serialize_tuple(3)?;
|
||||
tup.serialize_element(self.upload.slice())?;
|
||||
tup.serialize_element(self.download.slice())?;
|
||||
tup.serialize_element(self.chart_stamps.slice())?;
|
||||
tup.end()
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
mod dense_map;
|
||||
mod hash;
|
||||
mod location;
|
||||
mod mean_list;
|
||||
mod num_stats;
|
||||
|
||||
pub use dense_map::DenseMap;
|
||||
pub use hash::Hash;
|
||||
pub use location::{LocateRequest, Locator, LocatorFactory};
|
||||
pub use mean_list::MeanList;
|
||||
pub use num_stats::NumStats;
|
||||
|
||||
pub fn fnv<D: AsRef<[u8]>>(data: D) -> u64 {
|
||||
use fnv::FnvHasher;
|
||||
use std::hash::Hasher;
|
||||
|
||||
let mut hasher = FnvHasher::default();
|
||||
|
||||
hasher.write(data.as_ref());
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
/// Returns current unix time in ms (compatible with JS Date.now())
|
||||
pub fn now() -> u64 {
|
||||
use std::time::SystemTime;
|
||||
|
||||
SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("System time must be configured to be post Unix Epoch start; qed")
|
||||
.as_millis() as u64
|
||||
}
|
||||
@@ -1,80 +0,0 @@
|
||||
pub type Id = usize;
|
||||
|
||||
pub struct DenseMap<T> {
|
||||
/// List of retired indexes that can be re-used
|
||||
retired: Vec<Id>,
|
||||
/// All items
|
||||
items: Vec<Option<T>>,
|
||||
}
|
||||
|
||||
impl<T> DenseMap<T> {
|
||||
pub fn new() -> Self {
|
||||
DenseMap {
|
||||
retired: Vec::new(),
|
||||
items: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, item: T) -> Id {
|
||||
self.add_with(|_| item)
|
||||
}
|
||||
|
||||
pub fn add_with<F>(&mut self, f: F) -> Id
|
||||
where
|
||||
F: FnOnce(Id) -> T,
|
||||
{
|
||||
match self.retired.pop() {
|
||||
Some(id) => {
|
||||
self.items[id] = Some(f(id));
|
||||
id
|
||||
}
|
||||
None => {
|
||||
let id = self.items.len();
|
||||
self.items.push(Some(f(id)));
|
||||
id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, id: Id) -> Option<&T> {
|
||||
self.items.get(id).and_then(|item| item.as_ref())
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self, id: Id) -> Option<&mut T> {
|
||||
self.items.get_mut(id).and_then(|item| item.as_mut())
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, id: Id) -> Option<T> {
|
||||
let old = self.items.get_mut(id).and_then(|item| item.take());
|
||||
|
||||
if old.is_some() {
|
||||
// something was actually removed, so lets add the id to
|
||||
// the list of retired ids!
|
||||
self.retired.push(id);
|
||||
}
|
||||
|
||||
old
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = (Id, &T)> + '_ {
|
||||
self.items
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(id, item)| Some((id, item.as_ref()?)))
|
||||
}
|
||||
|
||||
pub fn iter_mut(&mut self) -> impl Iterator<Item = (Id, &mut T)> + '_ {
|
||||
self.items
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.filter_map(|(id, item)| Some((id, item.as_mut()?)))
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.items.len() - self.retired.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
use std::fmt::{self, Debug, Display};
|
||||
use std::str::FromStr;
|
||||
|
||||
use actix_web::error::ResponseError;
|
||||
use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor};
|
||||
|
||||
const HASH_BYTES: usize = 32;
|
||||
|
||||
/// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`.
|
||||
// We could use primitive_types::H256 here, but opted for a custom type to avoid more dependencies.
|
||||
#[derive(Hash, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct Hash([u8; HASH_BYTES]);
|
||||
|
||||
struct HashVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for HashVisitor {
|
||||
type Value = Hash;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("hexidecimal string of 32 bytes beginning with 0x")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
value
|
||||
.parse()
|
||||
.map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self))
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Hash {
|
||||
type Err = HashParseError;
|
||||
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
if !value.starts_with("0x") {
|
||||
return Err(HashParseError::InvalidPrefix);
|
||||
}
|
||||
|
||||
let mut hash = [0; HASH_BYTES];
|
||||
|
||||
hex::decode_to_slice(&value[2..], &mut hash).map_err(HashParseError::HexError)?;
|
||||
|
||||
Ok(Hash(hash))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Hash {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Hash, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_str(HashVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Hash {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.write_str("0x")?;
|
||||
|
||||
let mut ascii = [0; HASH_BYTES * 2];
|
||||
|
||||
hex::encode_to_slice(self.0, &mut ascii)
|
||||
.expect("Encoding 32 bytes into 64 bytes of ascii; qed");
|
||||
|
||||
f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed"))
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Hash {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
Display::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum HashParseError {
|
||||
HexError(hex::FromHexError),
|
||||
InvalidPrefix,
|
||||
}
|
||||
|
||||
impl Display for HashParseError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
Debug::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseError for HashParseError {}
|
||||
@@ -1,191 +0,0 @@
|
||||
use std::net::Ipv4Addr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use actix::prelude::*;
|
||||
use parking_lot::RwLock;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::chain::{Chain, LocateNode};
|
||||
use crate::types::{NodeId, NodeLocation};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Locator {
|
||||
client: reqwest::blocking::Client,
|
||||
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>>>>,
|
||||
}
|
||||
|
||||
pub struct LocatorFactory {
|
||||
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>>>>,
|
||||
}
|
||||
|
||||
impl LocatorFactory {
|
||||
pub fn new() -> Self {
|
||||
let mut cache = FxHashMap::default();
|
||||
|
||||
// Default entry for localhost
|
||||
cache.insert(
|
||||
Ipv4Addr::new(127, 0, 0, 1),
|
||||
Some(Arc::new(NodeLocation {
|
||||
latitude: 52.516_6667,
|
||||
longitude: 13.4,
|
||||
city: "Berlin".into(),
|
||||
})),
|
||||
);
|
||||
|
||||
LocatorFactory {
|
||||
cache: Arc::new(RwLock::new(cache)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(&self) -> Locator {
|
||||
Locator {
|
||||
client: reqwest::blocking::Client::new(),
|
||||
cache: self.cache.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Locator {
|
||||
type Context = SyncContext<Self>;
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct LocateRequest {
|
||||
pub ip: Ipv4Addr,
|
||||
pub nid: NodeId,
|
||||
pub chain: Addr<Chain>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct IPApiLocate {
|
||||
city: Box<str>,
|
||||
loc: Box<str>,
|
||||
}
|
||||
|
||||
impl IPApiLocate {
|
||||
fn into_node_location(self) -> Option<NodeLocation> {
|
||||
let IPApiLocate { city, loc } = self;
|
||||
|
||||
let mut loc = loc.split(',').map(|n| n.parse());
|
||||
|
||||
let latitude = loc.next()?.ok()?;
|
||||
let longitude = loc.next()?.ok()?;
|
||||
|
||||
// Guarantee that the iterator has been exhausted
|
||||
if loc.next().is_some() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(NodeLocation {
|
||||
latitude,
|
||||
longitude,
|
||||
city,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<LocateRequest> for Locator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: LocateRequest, _: &mut Self::Context) {
|
||||
let LocateRequest { ip, nid, chain } = msg;
|
||||
|
||||
if let Some(item) = self.cache.read().get(&ip) {
|
||||
if let Some(location) = item {
|
||||
return chain.do_send(LocateNode {
|
||||
nid,
|
||||
location: location.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
let location = match self.iplocate(ip) {
|
||||
Ok(location) => location,
|
||||
Err(err) => return log::debug!("GET error for ip location: {:?}", err),
|
||||
};
|
||||
|
||||
self.cache.write().insert(ip, location.clone());
|
||||
|
||||
if let Some(location) = location {
|
||||
chain.do_send(LocateNode { nid, location });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Locator {
|
||||
fn iplocate(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
|
||||
let location = self.iplocate_ipapi_co(ip)?;
|
||||
|
||||
match location {
|
||||
Some(location) => Ok(Some(location)),
|
||||
None => self.iplocate_ipinfo_io(ip),
|
||||
}
|
||||
}
|
||||
|
||||
fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
|
||||
let location = self
|
||||
.query(&format!("https://ipapi.co/{}/json", ip))?
|
||||
.map(Arc::new);
|
||||
|
||||
Ok(location)
|
||||
}
|
||||
|
||||
fn iplocate_ipinfo_io(
|
||||
&self,
|
||||
ip: Ipv4Addr,
|
||||
) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
|
||||
let location = self
|
||||
.query(&format!("https://ipinfo.io/{}/json", ip))?
|
||||
.and_then(|loc: IPApiLocate| loc.into_node_location().map(Arc::new));
|
||||
|
||||
Ok(location)
|
||||
}
|
||||
|
||||
fn query<T>(&self, url: &str) -> Result<Option<T>, reqwest::Error>
|
||||
where
|
||||
for<'de> T: Deserialize<'de>,
|
||||
{
|
||||
match self.client.get(url).send()?.json::<T>() {
|
||||
Ok(result) => Ok(Some(result)),
|
||||
Err(err) => {
|
||||
log::debug!("JSON error for ip location: {:?}", err);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn ipapi_locate_to_node_location() {
|
||||
let ipapi = IPApiLocate {
|
||||
loc: "12.5,56.25".into(),
|
||||
city: "Foobar".into(),
|
||||
};
|
||||
|
||||
let location = ipapi.into_node_location().unwrap();
|
||||
|
||||
assert_eq!(location.latitude, 12.5);
|
||||
assert_eq!(location.longitude, 56.25);
|
||||
assert_eq!(&*location.city, "Foobar");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipapi_locate_to_node_location_too_many() {
|
||||
let ipapi = IPApiLocate {
|
||||
loc: "12.5,56.25,1.0".into(),
|
||||
city: "Foobar".into(),
|
||||
};
|
||||
|
||||
let location = ipapi.into_node_location();
|
||||
|
||||
assert!(location.is_none());
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
use num_traits::{Float, Zero};
|
||||
use std::ops::AddAssign;
|
||||
|
||||
pub struct MeanList<T>
|
||||
where
|
||||
T: Float + AddAssign + Zero + From<u8>,
|
||||
{
|
||||
period_sum: T,
|
||||
period_count: u8,
|
||||
mean_index: u8,
|
||||
means: [T; 20],
|
||||
ticks_per_mean: u8,
|
||||
}
|
||||
|
||||
impl<T> Default for MeanList<T>
|
||||
where
|
||||
T: Float + AddAssign + Zero + From<u8>,
|
||||
{
|
||||
fn default() -> MeanList<T> {
|
||||
MeanList {
|
||||
period_sum: T::zero(),
|
||||
period_count: 0,
|
||||
mean_index: 0,
|
||||
means: [T::zero(); 20],
|
||||
ticks_per_mean: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> MeanList<T>
|
||||
where
|
||||
T: Float + AddAssign + Zero + From<u8>,
|
||||
{
|
||||
pub fn slice(&self) -> &[T] {
|
||||
&self.means[..usize::from(self.mean_index)]
|
||||
}
|
||||
|
||||
pub fn push(&mut self, val: T) -> bool {
|
||||
if self.mean_index == 20 && self.ticks_per_mean < 32 {
|
||||
self.squash_means();
|
||||
}
|
||||
|
||||
self.period_sum += val;
|
||||
self.period_count += 1;
|
||||
|
||||
if self.period_count == self.ticks_per_mean {
|
||||
self.push_mean();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn push_mean(&mut self) {
|
||||
let mean = self.period_sum / std::convert::From::from(self.period_count);
|
||||
|
||||
if self.mean_index == 20 && self.ticks_per_mean == 32 {
|
||||
self.means.rotate_left(1);
|
||||
self.means[19] = mean;
|
||||
} else {
|
||||
self.means[usize::from(self.mean_index)] = mean;
|
||||
self.mean_index += 1;
|
||||
}
|
||||
|
||||
self.period_sum = T::zero();
|
||||
self.period_count = 0;
|
||||
}
|
||||
|
||||
fn squash_means(&mut self) {
|
||||
self.ticks_per_mean *= 2;
|
||||
self.mean_index = 10;
|
||||
|
||||
for i in 0..10 {
|
||||
let i2 = i * 2;
|
||||
|
||||
self.means[i] = (self.means[i2] + self.means[i2 + 1]) / std::convert::From::from(2)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,104 +0,0 @@
|
||||
use num_traits::{Bounded, NumOps, Zero};
|
||||
use std::convert::TryFrom;
|
||||
use std::iter::Sum;
|
||||
|
||||
/// Keep track of last N numbers pushed onto internal stack.
|
||||
/// Provides means to get an average of said numbers.
|
||||
pub struct NumStats<T> {
|
||||
stack: Box<[T]>,
|
||||
index: usize,
|
||||
sum: T,
|
||||
}
|
||||
|
||||
impl<T: NumOps + Zero + Bounded + Copy + Sum + TryFrom<usize>> NumStats<T> {
|
||||
pub fn new(size: usize) -> Self {
|
||||
NumStats {
|
||||
stack: vec![T::zero(); size].into_boxed_slice(),
|
||||
index: 0,
|
||||
sum: T::zero(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&mut self, val: T) {
|
||||
let slot = &mut self.stack[self.index % self.stack.len()];
|
||||
|
||||
self.sum = (self.sum + val) - *slot;
|
||||
|
||||
*slot = val;
|
||||
|
||||
self.index += 1;
|
||||
}
|
||||
|
||||
pub fn average(&self) -> T {
|
||||
let cap = std::cmp::min(self.index, self.stack.len());
|
||||
|
||||
if cap == 0 {
|
||||
return T::zero();
|
||||
}
|
||||
|
||||
let cap = T::try_from(cap).unwrap_or_else(|_| T::max_value());
|
||||
|
||||
self.sum / cap
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.index = 0;
|
||||
self.sum = T::zero();
|
||||
|
||||
for val in self.stack.iter_mut() {
|
||||
*val = T::zero();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn calculates_correct_average() {
|
||||
let mut stats: NumStats<u64> = NumStats::new(10);
|
||||
|
||||
stats.push(3);
|
||||
stats.push(7);
|
||||
|
||||
assert_eq!(stats.average(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn calculates_correct_average_over_bounds() {
|
||||
let mut stats: NumStats<u64> = NumStats::new(10);
|
||||
|
||||
stats.push(100);
|
||||
|
||||
for _ in 0..9 {
|
||||
stats.push(0);
|
||||
}
|
||||
|
||||
assert_eq!(stats.average(), 10);
|
||||
|
||||
stats.push(0);
|
||||
|
||||
assert_eq!(stats.average(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resets_properly() {
|
||||
let mut stats: NumStats<u64> = NumStats::new(10);
|
||||
|
||||
for _ in 0..10 {
|
||||
stats.push(100);
|
||||
}
|
||||
|
||||
assert_eq!(stats.average(), 100);
|
||||
|
||||
stats.reset();
|
||||
|
||||
assert_eq!(stats.average(), 0);
|
||||
|
||||
stats.push(7);
|
||||
stats.push(3);
|
||||
|
||||
assert_eq!(stats.average(), 5);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user