Move a bunch of things around and flatten common crate

This commit is contained in:
James Wilson
2021-06-30 17:27:10 +01:00
parent 4308359feb
commit f7ab3292c2
21 changed files with 217 additions and 228 deletions
@@ -4,7 +4,7 @@ pub struct DenseMap<Id, T> {
/// All items
items: Vec<Option<T>>,
/// Our ID type
_id_ty: std::marker::PhantomData<Id>
_id_type: std::marker::PhantomData<Id>
}
impl<Id, T> DenseMap<Id, T>
@@ -16,7 +16,7 @@ where
DenseMap {
retired: Vec::new(),
items: Vec::new(),
_id_ty: std::marker::PhantomData
_id_type: std::marker::PhantomData
}
}
+2 -2
View File
@@ -1,7 +1,7 @@
use std::net::IpAddr;
use crate::node::Payload;
use crate::types::{NodeDetails, BlockHash};
use crate::node_message::Payload;
use crate::node_types::{NodeDetails, BlockHash};
use crate::id_type;
use serde::{Deserialize, Serialize};
+19 -8
View File
@@ -1,9 +1,20 @@
pub mod node;
pub mod node_message;
pub mod internal_messages;
pub mod types;
pub mod util;
pub mod json;
pub mod log_level;
pub mod assign_id;
pub mod most_seen;
pub mod id_type;
pub mod node_types;
pub mod id_type;
pub mod time;
mod log_level;
mod assign_id;
mod most_seen;
mod dense_map;
mod mean_list;
mod num_stats;
// Export a bunch of common bits at the top level for ease of import:
pub use assign_id::AssignId;
pub use dense_map::DenseMap;
pub use mean_list::MeanList;
pub use num_stats::NumStats;
pub use most_seen::MostSeen;
pub use log_level::LogLevel;
@@ -1,5 +1,4 @@
use crate::types::{Block, BlockHash, BlockNumber, NodeDetails};
use crate::json;
use crate::node_types::{Block, BlockHash, BlockNumber, NodeDetails};
use serde::{Deserialize, Serialize};
pub type NodeMessageId = u64;
@@ -39,19 +38,6 @@ impl From<NodeMessage> for Payload {
}
}
impl From<json::NodeMessage> for NodeMessage {
fn from(msg: json::NodeMessage) -> Self {
match msg {
json::NodeMessage::V1 { payload } => {
NodeMessage::V1 { payload: payload.into() }
},
json::NodeMessage::V2 { id, payload } => {
NodeMessage::V2 { id, payload: payload.into() }
},
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Payload {
SystemConnected(SystemConnected),
@@ -69,67 +55,12 @@ pub enum Payload {
PreparedBlockForProposing,
}
impl From<json::Payload> for Payload {
fn from(msg: json::Payload) -> Self {
match msg {
json::Payload::SystemConnected(m) => {
Payload::SystemConnected(m.into())
},
json::Payload::SystemInterval(m) => {
Payload::SystemInterval(m.into())
},
json::Payload::BlockImport(m) => {
Payload::BlockImport(m.into())
},
json::Payload::NotifyFinalized(m) => {
Payload::NotifyFinalized(m.into())
},
json::Payload::TxPoolImport => {
Payload::TxPoolImport
},
json::Payload::AfgFinalized(m) => {
Payload::AfgFinalized(m.into())
},
json::Payload::AfgReceivedPrecommit(m) => {
Payload::AfgReceivedPrecommit(m.into())
},
json::Payload::AfgReceivedPrevote(m) => {
Payload::AfgReceivedPrevote(m.into())
},
json::Payload::AfgReceivedCommit(m) => {
Payload::AfgReceivedCommit(m.into())
},
json::Payload::AfgAuthoritySet(m) => {
Payload::AfgAuthoritySet(m.into())
},
json::Payload::AfgFinalizedBlocksUpTo => {
Payload::AfgFinalizedBlocksUpTo
},
json::Payload::AuraPreSealedBlock => {
Payload::AuraPreSealedBlock
},
json::Payload::PreparedBlockForProposing => {
Payload::PreparedBlockForProposing
},
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemConnected {
pub genesis_hash: BlockHash,
pub node: NodeDetails,
}
impl From<json::SystemConnected> for SystemConnected {
fn from(msg: json::SystemConnected) -> Self {
SystemConnected {
genesis_hash: msg.genesis_hash.into(),
node: msg.node.into()
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemInterval {
pub peers: Option<u64>,
@@ -142,51 +73,18 @@ pub struct SystemInterval {
pub used_state_cache_size: Option<f32>,
}
impl From<json::SystemInterval> for SystemInterval {
fn from(msg: json::SystemInterval) -> Self {
SystemInterval {
peers: msg.peers,
txcount: msg.txcount,
bandwidth_upload: msg.bandwidth_upload,
bandwidth_download: msg.bandwidth_download,
finalized_height: msg.finalized_height,
finalized_hash: msg.finalized_hash.map(|h| h.into()),
block: msg.block.map(|b| b.into()),
used_state_cache_size: msg.used_state_cache_size,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Finalized {
pub hash: BlockHash,
pub height: Box<str>,
}
impl From<json::Finalized> for Finalized {
fn from(msg: json::Finalized) -> Self {
Finalized {
hash: msg.hash.into(),
height: msg.height,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AfgFinalized {
pub finalized_hash: BlockHash,
pub finalized_number: Box<str>,
}
impl From<json::AfgFinalized> for AfgFinalized {
fn from(msg: json::AfgFinalized) -> Self {
AfgFinalized {
finalized_hash: msg.finalized_hash.into(),
finalized_number: msg.finalized_number,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AfgReceived {
pub target_hash: BlockHash,
@@ -194,16 +92,6 @@ pub struct AfgReceived {
pub voter: Option<Box<str>>,
}
impl From<json::AfgReceived> for AfgReceived {
fn from(msg: json::AfgReceived) -> Self {
AfgReceived {
target_hash: msg.target_hash.into(),
target_number: msg.target_number,
voter: msg.voter,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AfgAuthoritySet {
pub authority_id: Box<str>,
@@ -211,16 +99,6 @@ pub struct AfgAuthoritySet {
pub authority_set_id: Box<str>,
}
impl From<json::AfgAuthoritySet> for AfgAuthoritySet {
fn from(msg: json::AfgAuthoritySet) -> Self {
AfgAuthoritySet {
authority_id: msg.authority_id,
authorities: msg.authorities,
authority_set_id: msg.authority_set_id,
}
}
}
impl Payload {
pub fn best_block(&self) -> Option<&Block> {
match self {
@@ -1,8 +1,7 @@
use serde::ser::{SerializeTuple, Serializer};
use serde::{Deserialize, Serialize};
use crate::util::{now, MeanList};
use crate::json;
use crate::{time, MeanList};
pub type BlockNumber = u64;
pub type Timestamp = u64;
@@ -19,20 +18,6 @@ pub struct NodeDetails {
pub startup_time: Option<Box<str>>,
}
impl From<json::NodeDetails> for NodeDetails {
fn from(details: json::NodeDetails) -> Self {
NodeDetails {
chain: details.chain,
name: details.name,
implementation: details.implementation,
version: details.version,
validator: details.validator,
network_id: details.network_id,
startup_time: details.startup_time,
}
}
}
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct NodeStats {
pub peers: u64,
@@ -50,15 +35,6 @@ pub struct Block {
pub height: BlockNumber,
}
impl From<json::Block> for Block {
fn from(block: json::Block) -> Self {
Block {
hash: block.hash.into(),
height: block.height
}
}
}
impl Block {
pub fn zero() -> Self {
Block {
@@ -80,7 +56,7 @@ impl Default for BlockDetails {
fn default() -> Self {
BlockDetails {
block: Block::zero(),
block_timestamp: now(),
block_timestamp: time::now(),
block_time: 0,
propagation_time: None,
}
+9
View File
@@ -0,0 +1,9 @@
/// Returns current unix time in ms (compatible with JS Date.now())
pub fn now() -> u64 {
use std::time::SystemTime;
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("System time must be configured to be post Unix Epoch start; qed")
.as_millis() as u64
}
-27
View File
@@ -1,27 +0,0 @@
mod dense_map;
mod mean_list;
mod num_stats;
pub use dense_map::DenseMap;
pub use mean_list::MeanList;
pub use num_stats::NumStats;
pub fn fnv<D: AsRef<[u8]>>(data: D) -> u64 {
use fnv::FnvHasher;
use std::hash::Hasher;
let mut hasher = FnvHasher::default();
hasher.write(data.as_ref());
hasher.finish()
}
/// Returns current unix time in ms (compatible with JS Date.now())
pub fn now() -> u64 {
use std::time::SystemTime;
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("System time must be configured to be post Unix Epoch start; qed")
.as_millis() as u64
}
+5 -5
View File
@@ -1,4 +1,4 @@
use common::{internal_messages::{self, ShardNodeId}, node, assign_id::AssignId, types::BlockHash};
use common::{internal_messages::{self, ShardNodeId}, node_message, AssignId, node_types::BlockHash};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use futures::{channel::mpsc, future};
@@ -39,15 +39,15 @@ pub enum FromWebsocket {
},
/// Tell the aggregator about a new node.
Add {
message_id: node::NodeMessageId,
message_id: node_message::NodeMessageId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
node: common::node_types::NodeDetails,
genesis_hash: BlockHash
},
/// Update/pass through details about a node.
Update {
message_id: node::NodeMessageId,
payload: node::Payload
message_id: node_message::NodeMessageId,
payload: node_message::Payload
},
/// Make a note when the node disconnects.
Disconnected
@@ -10,14 +10,14 @@ const HASH_BYTES: usize = 32;
#[derive(Hash, PartialEq, Eq, Clone, Copy)]
pub struct Hash([u8; HASH_BYTES]);
impl From<Hash> for crate::types::BlockHash {
impl From<Hash> for common::node_types::BlockHash {
fn from(hash: Hash) -> Self {
hash.0.into()
}
}
impl From<crate::types::BlockHash> for Hash {
fn from(hash: crate::types::BlockHash) -> Self {
impl From<common::node_types::BlockHash> for Hash {
fn from(hash: common::node_types::BlockHash) -> Self {
Hash(hash.0)
}
}
@@ -5,6 +5,8 @@
//! structures (for example, to support bincode better).
use super::hash::Hash;
use serde::{Deserialize};
use common::node_message as internal;
use common::node_types;
/// This struct represents a telemetry message sent from a node as
/// a JSON payload. Since JSON is self describing, we can use attributes
@@ -31,6 +33,19 @@ pub enum NodeMessage {
},
}
impl From<NodeMessage> for internal::NodeMessage {
fn from(msg: NodeMessage) -> Self {
match msg {
NodeMessage::V1 { payload } => {
internal::NodeMessage::V1 { payload: payload.into() }
},
NodeMessage::V2 { id, payload } => {
internal::NodeMessage::V2 { id, payload: payload.into() }
},
}
}
}
#[derive(Deserialize, Debug)]
#[serde(tag = "msg")]
pub enum Payload {
@@ -62,6 +77,52 @@ pub enum Payload {
PreparedBlockForProposing,
}
impl From<Payload> for internal::Payload {
fn from(msg: Payload) -> Self {
match msg {
Payload::SystemConnected(m) => {
internal::Payload::SystemConnected(m.into())
},
Payload::SystemInterval(m) => {
internal::Payload::SystemInterval(m.into())
},
Payload::BlockImport(m) => {
internal::Payload::BlockImport(m.into())
},
Payload::NotifyFinalized(m) => {
internal::Payload::NotifyFinalized(m.into())
},
Payload::TxPoolImport => {
internal::Payload::TxPoolImport
},
Payload::AfgFinalized(m) => {
internal::Payload::AfgFinalized(m.into())
},
Payload::AfgReceivedPrecommit(m) => {
internal::Payload::AfgReceivedPrecommit(m.into())
},
Payload::AfgReceivedPrevote(m) => {
internal::Payload::AfgReceivedPrevote(m.into())
},
Payload::AfgReceivedCommit(m) => {
internal::Payload::AfgReceivedCommit(m.into())
},
Payload::AfgAuthoritySet(m) => {
internal::Payload::AfgAuthoritySet(m.into())
},
Payload::AfgFinalizedBlocksUpTo => {
internal::Payload::AfgFinalizedBlocksUpTo
},
Payload::AuraPreSealedBlock => {
internal::Payload::AuraPreSealedBlock
},
Payload::PreparedBlockForProposing => {
internal::Payload::PreparedBlockForProposing
},
}
}
}
#[derive(Deserialize, Debug)]
pub struct SystemConnected {
pub genesis_hash: Hash,
@@ -69,6 +130,15 @@ pub struct SystemConnected {
pub node: NodeDetails,
}
impl From<SystemConnected> for internal::SystemConnected {
fn from(msg: SystemConnected) -> Self {
internal::SystemConnected {
genesis_hash: msg.genesis_hash.into(),
node: msg.node.into()
}
}
}
#[derive(Deserialize, Debug)]
pub struct SystemInterval {
pub peers: Option<u64>,
@@ -82,6 +152,21 @@ pub struct SystemInterval {
pub used_state_cache_size: Option<f32>,
}
impl From<SystemInterval> for internal::SystemInterval {
fn from(msg: SystemInterval) -> Self {
internal::SystemInterval {
peers: msg.peers,
txcount: msg.txcount,
bandwidth_upload: msg.bandwidth_upload,
bandwidth_download: msg.bandwidth_download,
finalized_height: msg.finalized_height,
finalized_hash: msg.finalized_hash.map(|h| h.into()),
block: msg.block.map(|b| b.into()),
used_state_cache_size: msg.used_state_cache_size,
}
}
}
#[derive(Deserialize, Debug)]
pub struct Finalized {
#[serde(rename = "best")]
@@ -89,6 +174,15 @@ pub struct Finalized {
pub height: Box<str>,
}
impl From<Finalized> for internal::Finalized {
fn from(msg: Finalized) -> Self {
internal::Finalized {
hash: msg.hash.into(),
height: msg.height,
}
}
}
#[derive(Deserialize, Debug)]
pub struct AfgAuthoritySet {
pub authority_id: Box<str>,
@@ -96,12 +190,31 @@ pub struct AfgAuthoritySet {
pub authority_set_id: Box<str>,
}
impl From<AfgAuthoritySet> for internal::AfgAuthoritySet {
fn from(msg: AfgAuthoritySet) -> Self {
internal::AfgAuthoritySet {
authority_id: msg.authority_id,
authorities: msg.authorities,
authority_set_id: msg.authority_set_id,
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgFinalized {
pub finalized_hash: Hash,
pub finalized_number: Box<str>,
}
impl From<AfgFinalized> for internal::AfgFinalized {
fn from(msg: AfgFinalized) -> Self {
internal::AfgFinalized {
finalized_hash: msg.finalized_hash.into(),
finalized_number: msg.finalized_number,
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceived {
pub target_hash: Hash,
@@ -109,6 +222,16 @@ pub struct AfgReceived {
pub voter: Option<Box<str>>,
}
impl From<AfgReceived> for internal::AfgReceived {
fn from(msg: AfgReceived) -> Self {
internal::AfgReceived {
target_hash: msg.target_hash.into(),
target_number: msg.target_number,
voter: msg.voter,
}
}
}
#[derive(Deserialize, Debug, Clone, Copy)]
pub struct Block {
#[serde(rename = "best")]
@@ -116,6 +239,15 @@ pub struct Block {
pub height: BlockNumber,
}
impl From<Block> for node_types::Block {
fn from(block: Block) -> Self {
node_types::Block {
hash: block.hash.into(),
height: block.height
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct NodeDetails {
pub chain: Box<str>,
@@ -127,6 +259,20 @@ pub struct NodeDetails {
pub startup_time: Option<Box<str>>,
}
impl From<NodeDetails> for node_types::NodeDetails {
fn from(details: NodeDetails) -> Self {
node_types::NodeDetails {
chain: details.chain,
name: details.name,
implementation: details.implementation,
version: details.version,
validator: details.validator,
network_id: details.network_id,
startup_time: details.startup_time,
}
}
}
type NodeMessageId = u64;
type BlockNumber = u64;
+6 -5
View File
@@ -1,6 +1,7 @@
mod aggregator;
mod connection;
mod real_ip;
mod json_message;
use std::net::IpAddr;
@@ -10,7 +11,7 @@ use simple_logger::SimpleLogger;
use futures::{StreamExt, SinkExt, channel::mpsc};
use warp::Filter;
use warp::filters::ws;
use common::{json, node, log_level::LogLevel};
use common::{node_message, LogLevel};
use aggregator::{ Aggregator, FromWebsocket };
use real_ip::real_ip;
@@ -154,7 +155,7 @@ async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_
// Until the aggregator receives an `Add` message, which we can create once
// we see one of these SystemConnected ones, it will ignore messages with
// the corresponding message_id.
if let node::Payload::SystemConnected(info) = payload {
if let node_message::Payload::SystemConnected(info) = payload {
let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id,
ip: addr,
@@ -179,7 +180,7 @@ async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_
/// Deserialize an incoming websocket message, returning an error if something
/// fatal went wrong, [`Some`] message if all went well, and [`None`] if a non-fatal
/// issue was encountered and the message should simply be ignored.
fn deserialize_ws_message(msg: Result<ws::Message, warp::Error>) -> anyhow::Result<Option<node::NodeMessage>> {
fn deserialize_ws_message(msg: Result<ws::Message, warp::Error>) -> anyhow::Result<Option<node_message::NodeMessage>> {
// If we see any errors, log them and end our loop:
let msg = match msg {
Err(e) => {
@@ -196,7 +197,7 @@ fn deserialize_ws_message(msg: Result<ws::Message, warp::Error>) -> anyhow::Resu
// Deserialize from JSON, warning if deserialization fails:
let bytes = msg.as_bytes();
let node_message: json::NodeMessage = match serde_json::from_slice(bytes) {
let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) {
Ok(node_message) => node_message,
Err(_e) => {
// let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
@@ -207,6 +208,6 @@ fn deserialize_ws_message(msg: Result<ws::Message, warp::Error>) -> anyhow::Resu
};
// Pull relevant details from the message:
let node_message: node::NodeMessage = node_message.into();
let node_message: node_message::NodeMessage = node_message.into();
Ok(Some(node_message))
}
@@ -4,9 +4,9 @@ use common::{
ShardNodeId,
MuteReason
},
types::BlockHash,
node,
util::now
node_types::BlockHash,
node_message,
time
};
use bimap::BiMap;
use std::{net::{IpAddr, Ipv4Addr}, str::FromStr};
@@ -38,13 +38,13 @@ pub enum FromShardWebsocket {
Add {
local_id: ShardNodeId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
genesis_hash: common::types::BlockHash
node: common::node_types::NodeDetails,
genesis_hash: common::node_types::BlockHash
},
/// Update/pass through details about a node.
Update {
local_id: ShardNodeId,
payload: node::Payload
payload: node_message::Payload
},
/// Tell the aggregator that a node has been removed when it disconnects.
Remove {
@@ -379,7 +379,7 @@ impl InnerLoop {
feed_serializer.push(feed_message::UnsubscribedFrom(old_chain.label()));
}
feed_serializer.push(feed_message::SubscribedTo(new_chain.label()));
feed_serializer.push(feed_message::TimeSync(now()));
feed_serializer.push(feed_message::TimeSync(time::now()));
feed_serializer.push(feed_message::BestBlock (
new_chain.best_block().height,
new_chain.timestamp(),
+1 -1
View File
@@ -6,7 +6,7 @@ use std::mem;
use crate::state::Node;
use serde_json::to_writer;
use common::types::{
use common::node_types::{
BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeStats,
Timestamp
};
+1 -1
View File
@@ -7,7 +7,7 @@ use serde::Deserialize;
use futures::{Sink, SinkExt, StreamExt};
use futures::channel::mpsc;
use common::types::NodeLocation;
use common::node_types::NodeLocation;
use tokio::sync::Semaphore;
/// The returned location is optional; it may be None if not found.
+1 -2
View File
@@ -7,13 +7,12 @@ use std::net::SocketAddr;
use std::str::FromStr;
use bincode::Options;
use common::internal_messages;
use structopt::StructOpt;
use simple_logger::SimpleLogger;
use futures::{StreamExt, SinkExt, channel::mpsc};
use warp::Filter;
use warp::filters::ws;
use common::{log_level::LogLevel};
use common::{ internal_messages, LogLevel };
use aggregator::{ Aggregator, FromFeedWebsocket, ToFeedWebsocket, FromShardWebsocket, ToShardWebsocket };
const VERSION: &str = env!("CARGO_PKG_VERSION");
+5 -7
View File
@@ -1,11 +1,9 @@
use std::collections::{ HashSet };
use common::types::{ BlockHash, BlockNumber };
use common::types::{Block, Timestamp};
use common::util::{now, DenseMap, NumStats};
use common::most_seen::MostSeen;
use common::node::Payload;
use common::node_types::{ BlockHash, BlockNumber };
use common::node_types::{Block, Timestamp};
use common::node_message::Payload;
use common::{time, id_type, DenseMap, MostSeen, NumStats};
use once_cell::sync::Lazy;
use common::id_type;
use crate::feed_message::{self, FeedMessageSerializer};
use crate::find_location;
@@ -217,7 +215,7 @@ impl Chain {
fn handle_block(&mut self, block: &Block, nid: ChainNodeId, feed: &mut FeedMessageSerializer) {
let mut propagation_time = None;
let now = now();
let now = time::now();
let nodes_len = self.nodes.len();
self.update_stale_nodes(now, feed);
+4 -4
View File
@@ -1,9 +1,9 @@
use common::types::{
use common::node_types::{
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeLocation, NodeStats,
Timestamp,
};
use common::util::now;
use common::node::SystemInterval;
use common::time;
use common::node_message::SystemInterval;
use crate::find_location;
/// Minimum time between block below broadcasting updates to the browser gets throttled, in ms.
@@ -135,7 +135,7 @@ impl Node {
if let Some(download) = interval.bandwidth_download {
changed |= self.hardware.download.push(download);
}
self.hardware.chart_stamps.push(now() as f64);
self.hardware.chart_stamps.push(time::now() as f64);
changed
}
+3 -5
View File
@@ -1,13 +1,11 @@
use std::collections::{ HashSet, HashMap };
use common::types::{ BlockHash };
use super::node::Node;
use common::types::{Block, NodeDetails, Timestamp};
use common::util::{DenseMap};
use common::node::Payload;
use common::node_types::{Block, BlockHash, NodeDetails, Timestamp};
use common::node_message::Payload;
use common::{ id_type, DenseMap };
use std::iter::IntoIterator;
use crate::feed_message::FeedMessageSerializer;
use crate::find_location;
use common::id_type;
use super::chain::{ self, Chain, ChainNodeId };