Bucket nodes by genesis hash (#335)

* Send unwrapped Payload to Chain + cargo fmt

* Read genesis_hash when connecting

* Group chains by genesis hashes

* Fix typo

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Fix grumbles

* Link up `Hash` for docs

* `hashes` -> `genesis_hashes`

* Typo :)

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Added a doc comment link

* Add comment about why H256 is not used

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Maciej Hirsz
2021-04-27 13:05:58 +02:00
committed by GitHub
parent 93e33fd3f7
commit 05b0afefd3
17 changed files with 439 additions and 237 deletions
+1
View File
@@ -1693,6 +1693,7 @@ dependencies = [
"clap", "clap",
"ctor", "ctor",
"fnv", "fnv",
"hex",
"log", "log",
"num-traits", "num-traits",
"parking_lot", "parking_lot",
+1
View File
@@ -13,6 +13,7 @@ actix-http = "3.0.0-beta.4"
bytes = "1.0.1" bytes = "1.0.1"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
fnv = "1.0.7" fnv = "1.0.7"
hex = "0.4.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] } serde_json = { version = "1.0", features = ["raw_value"] }
primitive-types = { version = "0.9.0", features = ["serde"] } primitive-types = { version = "0.9.0", features = ["serde"] }
+45 -38
View File
@@ -1,29 +1,32 @@
use std::collections::{HashMap, HashSet};
use actix::prelude::*; use actix::prelude::*;
use actix_web_actors::ws::{CloseReason, CloseCode}; use actix_web_actors::ws::{CloseCode, CloseReason};
use ctor::ctor; use ctor::ctor;
use std::collections::{HashMap, HashSet};
use crate::node::connector::{Mute, NodeConnector}; use crate::chain::{self, Chain, ChainId, GetNodeNetworkState, Label};
use crate::feed::connector::{FeedConnector, Connected, FeedId}; use crate::feed::connector::{Connected, FeedConnector, FeedId};
use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::chain::{self, Chain, ChainId, Label, GetNodeNetworkState}; use crate::node::connector::{Mute, NodeConnector};
use crate::types::{ConnId, NodeDetails, NodeId}; use crate::types::{ConnId, NodeDetails, NodeId};
use crate::util::{DenseMap, Hash};
pub struct Aggregator { pub struct Aggregator {
genesis_hashes: HashMap<Hash, ChainId>,
labels: HashMap<Label, ChainId>, labels: HashMap<Label, ChainId>,
networks: HashMap<Label, ChainId>,
chains: DenseMap<ChainEntry>, chains: DenseMap<ChainEntry>,
feeds: DenseMap<Addr<FeedConnector>>, feeds: DenseMap<Addr<FeedConnector>>,
serializer: FeedMessageSerializer, serializer: FeedMessageSerializer,
/// Denylist for networks we do not want to allow connecting. /// Denylist for networks we do not want to allow connecting.
denylist: HashSet<String> denylist: HashSet<String>,
} }
pub struct ChainEntry { pub struct ChainEntry {
/// Address to the `Chain` agent
addr: Addr<Chain>, addr: Addr<Chain>,
/// Genesis [`Hash`] of the chain
genesis_hash: Hash,
/// String name of the chain
label: Label, label: Label,
network_id: Option<Label>,
/// Node count /// Node count
nodes: usize, nodes: usize,
/// Maximum allowed nodes /// Maximum allowed nodes
@@ -48,8 +51,8 @@ const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
impl Aggregator { impl Aggregator {
pub fn new(denylist: HashSet<String>) -> Self { pub fn new(denylist: HashSet<String>) -> Self {
Aggregator { Aggregator {
genesis_hashes: HashMap::new(),
labels: HashMap::new(), labels: HashMap::new(),
networks: HashMap::new(),
chains: DenseMap::new(), chains: DenseMap::new(),
feeds: DenseMap::new(), feeds: DenseMap::new(),
serializer: FeedMessageSerializer::new(), serializer: FeedMessageSerializer::new(),
@@ -61,10 +64,11 @@ impl Aggregator {
/// or the address is disconnected (actor dropped), create a new one. /// or the address is disconnected (actor dropped), create a new one.
pub fn lazy_chain( pub fn lazy_chain(
&mut self, &mut self,
genesis_hash: Hash,
label: &str, label: &str,
ctx: &mut <Self as Actor>::Context, ctx: &mut <Self as Actor>::Context,
) -> ChainId { ) -> ChainId {
let cid = match self.get_chain_id(label, None.as_ref()) { let cid = match self.genesis_hashes.get(&genesis_hash).copied() {
Some(cid) => cid, Some(cid) => cid,
None => { None => {
self.serializer.push(feed::AddedChain(&label, 1)); self.serializer.push(feed::AddedChain(&label, 1));
@@ -72,17 +76,16 @@ impl Aggregator {
let addr = ctx.address(); let addr = ctx.address();
let max_nodes = max_nodes(label); let max_nodes = max_nodes(label);
let label: Label = label.into(); let label: Label = label.into();
let cid = self.chains.add_with(|cid| { let cid = self.chains.add_with(|cid| ChainEntry {
ChainEntry {
addr: Chain::new(cid, addr, label.clone()).start(), addr: Chain::new(cid, addr, label.clone()).start(),
genesis_hash,
label: label.clone(), label: label.clone(),
network_id: None, // TODO: this doesn't seem to be used anywhere. Can it be removed?
nodes: 1, nodes: 1,
max_nodes, max_nodes,
}
}); });
self.labels.insert(label, cid); self.labels.insert(label, cid);
self.genesis_hashes.insert(genesis_hash, cid);
self.broadcast(); self.broadcast();
@@ -93,20 +96,11 @@ impl Aggregator {
cid cid
} }
fn get_chain_id(&self, label: &str, network: Option<&Label>) -> Option<ChainId> {
let labels = &self.labels;
let networks = &self.networks;
if let Some(network) = network {
networks.get(&**network).or_else(|| labels.get(label)).copied()
} else {
labels.get(label).copied()
}
}
fn get_chain(&mut self, label: &str) -> Option<&mut ChainEntry> { fn get_chain(&mut self, label: &str) -> Option<&mut ChainEntry> {
let chains = &mut self.chains; let chains = &mut self.chains;
self.labels.get(label).and_then(move |&cid| chains.get_mut(cid)) self.labels
.get(label)
.and_then(move |&cid| chains.get_mut(cid))
} }
fn broadcast(&mut self) { fn broadcast(&mut self) {
@@ -128,6 +122,8 @@ impl Actor for Aggregator {
pub struct AddNode { pub struct AddNode {
/// Details of the node being added to the aggregator /// Details of the node being added to the aggregator
pub node: NodeDetails, pub node: NodeDetails,
/// Genesis [`Hash`] of the chain the node is being added to.
pub genesis_hash: Hash,
/// Connection id used by the node connector for multiplexing parachains /// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId, pub conn_id: ConnId,
/// Address of the NodeConnector actor /// Address of the NodeConnector actor
@@ -199,15 +195,26 @@ impl Handler<AddNode> for Aggregator {
if self.denylist.contains(&*msg.node.chain) { if self.denylist.contains(&*msg.node.chain) {
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain); log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
let AddNode { node_connector, .. } = msg; let AddNode { node_connector, .. } = msg;
let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) }; let reason = CloseReason {
code: CloseCode::Abnormal,
description: Some("Denied".into()),
};
node_connector.do_send(Mute { reason }); node_connector.do_send(Mute { reason });
return; return;
} }
let AddNode { node, conn_id, node_connector } = msg; let AddNode {
node,
genesis_hash,
conn_id,
node_connector,
} = msg;
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain); log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
let cid = self.lazy_chain(&node.chain, ctx); let cid = self.lazy_chain(genesis_hash, &node.chain, ctx);
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed"); let chain = self
.chains
.get_mut(cid)
.expect("Entry just created above; qed");
if chain.nodes < chain.max_nodes { if chain.nodes < chain.max_nodes {
chain.addr.do_send(chain::AddNode { chain.addr.do_send(chain::AddNode {
node, node,
@@ -216,7 +223,10 @@ impl Handler<AddNode> for Aggregator {
}); });
} else { } else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes); log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) }; let reason = CloseReason {
code: CloseCode::Again,
description: Some("Overquota".into()),
};
node_connector.do_send(Mute { reason }); node_connector.do_send(Mute { reason });
} }
} }
@@ -230,16 +240,12 @@ impl Handler<DropChain> for Aggregator {
if let Some(entry) = self.chains.remove(cid) { if let Some(entry) = self.chains.remove(cid) {
let label = &entry.label; let label = &entry.label;
self.genesis_hashes.remove(&entry.genesis_hash);
self.labels.remove(label); self.labels.remove(label);
if let Some(network) = entry.network_id {
self.networks.remove(&network);
}
self.serializer.push(feed::RemovedChain(label)); self.serializer.push(feed::RemovedChain(label));
log::info!("Dropped chain [{}] from the aggregator", label); log::info!("Dropped chain [{}] from the aggregator", label);
self.broadcast(); self.broadcast();
} }
} }
} }
@@ -323,7 +329,8 @@ impl Handler<Connect> for Aggregator {
// TODO: keep track on number of nodes connected to each chain // TODO: keep track on number of nodes connected to each chain
for (_, entry) in self.chains.iter() { for (_, entry) in self.chains.iter() {
self.serializer.push(feed::AddedChain(&entry.label, entry.nodes)); self.serializer
.push(feed::AddedChain(&entry.label, entry.nodes));
} }
if let Some(msg) = self.serializer.finalize() { if let Some(msg) = self.serializer.finalize() {
+96 -38
View File
@@ -1,15 +1,19 @@
use std::collections::HashMap;
use actix::prelude::*; use actix::prelude::*;
use std::sync::Arc;
use bytes::Bytes; use bytes::Bytes;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use std::collections::HashMap;
use std::sync::Arc;
use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount}; use crate::aggregator::{Aggregator, DropChain, NodeCount, RenameChain};
use crate::node::{Node, connector::{Initialize, NodeConnector}, message::{NodeMessage, Payload}}; use crate::feed::connector::{FeedConnector, FeedId, Subscribed, Unsubscribed};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now}; use crate::node::{
use crate::types::{ConnId, NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber}; connector::{Initialize, NodeConnector},
message::Payload,
Node,
};
use crate::types::{Block, BlockNumber, ConnId, NodeDetails, NodeId, NodeLocation, Timestamp};
use crate::util::{now, DenseMap, NumStats};
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
@@ -70,11 +74,11 @@ impl Chain {
Some(count) => { Some(count) => {
*count += 1; *count += 1;
*count *count
}, }
None => { None => {
self.labels.insert(label.into(), 1); self.labels.insert(label.into(), 1);
1 1
}, }
}; };
if &*self.label.0 == label { if &*self.label.0 == label {
@@ -106,7 +110,8 @@ impl Chain {
fn rename(&mut self, label: Label, count: usize) { fn rename(&mut self, label: Label, count: usize) {
self.label = (label, count); self.label = (label, count);
self.aggregator.do_send(RenameChain(self.cid, self.label.0.clone())); self.aggregator
.do_send(RenameChain(self.cid, self.label.0.clone()));
} }
fn broadcast(&mut self) { fn broadcast(&mut self) {
@@ -128,7 +133,8 @@ impl Chain {
/// Triggered when the number of nodes in this chain has changed, Aggregator will /// Triggered when the number of nodes in this chain has changed, Aggregator will
/// propagate new counts to all connected feeds /// propagate new counts to all connected feeds
fn update_count(&self) { fn update_count(&self) {
self.aggregator.do_send(NodeCount(self.cid, self.nodes.len())); self.aggregator
.do_send(NodeCount(self.cid, self.nodes.len()));
} }
/// Check if the chain is stale (has not received a new best block in a while). /// Check if the chain is stale (has not received a new best block in a while).
@@ -170,8 +176,13 @@ impl Chain {
self.block_times.reset(); self.block_times.reset();
self.timestamp = timestamp; self.timestamp = timestamp;
self.serializer.push(feed::BestBlock(self.best.height, timestamp.unwrap_or(now), None)); self.serializer.push(feed::BestBlock(
self.serializer.push(feed::BestFinalized(finalized.height, finalized.hash)); self.best.height,
timestamp.unwrap_or(now),
None,
));
self.serializer
.push(feed::BestFinalized(finalized.height, finalized.hash));
} }
} }
} }
@@ -205,8 +216,8 @@ pub struct AddNode {
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct UpdateNode { pub struct UpdateNode {
pub nid: NodeId, pub nid: NodeId,
pub msg: NodeMessage,
pub raw: Option<Bytes>, pub raw: Option<Bytes>,
pub payload: Payload,
} }
/// Message sent from the NodeConnector to the Chain when the connector disconnects /// Message sent from the NodeConnector to the Chain when the connector disconnects
@@ -250,14 +261,25 @@ impl Handler<AddNode> for Chain {
type Result = (); type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, conn_id, node_connector } = msg; let AddNode {
node,
conn_id,
node_connector,
} = msg;
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1); log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
self.increment_label_count(&node.chain); self.increment_label_count(&node.chain);
let nid = self.nodes.add(Node::new(node)); let nid = self.nodes.add(Node::new(node));
let chain = ctx.address(); let chain = ctx.address();
if node_connector.try_send(Initialize { nid, conn_id, chain }).is_err() { if node_connector
.try_send(Initialize {
nid,
conn_id,
chain,
})
.is_err()
{
self.nodes.remove(nid); self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) { } else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node)); self.serializer.push(feed::AddedNode(nid, node));
@@ -297,7 +319,11 @@ impl Chain {
self.average_block_time = Some(self.block_times.average()); self.average_block_time = Some(self.block_times.average());
} }
self.timestamp = Some(now); self.timestamp = Some(now);
self.serializer.push(feed::BestBlock(self.best.height, now, self.average_block_time)); self.serializer.push(feed::BestBlock(
self.best.height,
now,
self.average_block_time,
));
propagation_time = Some(0); propagation_time = Some(0);
} else if block.height == self.best.height { } else if block.height == self.best.height {
if let Some(timestamp) = self.timestamp { if let Some(timestamp) = self.timestamp {
@@ -316,14 +342,14 @@ impl Handler<UpdateNode> for Chain {
type Result = (); type Result = ();
fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) { fn handle(&mut self, msg: UpdateNode, _: &mut Self::Context) {
let UpdateNode { nid, msg, raw } = msg; let UpdateNode { nid, payload, raw } = msg;
if let Some(block) = msg.payload().best_block() { if let Some(block) = payload.best_block() {
self.handle_block(block, nid); self.handle_block(block, nid);
} }
if let Some(node) = self.nodes.get_mut(nid) { if let Some(node) = self.nodes.get_mut(nid) {
match msg.payload() { match payload {
Payload::SystemInterval(ref interval) => { Payload::SystemInterval(ref interval) => {
if interval.network_state.is_some() { if interval.network_state.is_some() {
if let Some(raw) = raw { if let Some(raw) = raw {
@@ -354,49 +380,69 @@ impl Handler<UpdateNode> for Chain {
return; return;
} }
Payload::AfgFinalized(finalized) => { Payload::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>() { if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() { if let Some(addr) = node.details().validator.clone() {
self.serializer.push(feed::AfgFinalized(addr, finalized_number, self.serializer.push(feed::AfgFinalized(
finalized.finalized_hash)); addr,
finalized_number,
finalized.finalized_hash,
));
self.broadcast_finality(); self.broadcast_finality();
} }
} }
return; return;
} }
Payload::AfgReceivedPrecommit(precommit) => { Payload::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) = precommit.received.target_number.parse::<BlockNumber>() { if let Ok(finalized_number) =
precommit.received.target_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() { if let Some(addr) = node.details().validator.clone() {
let voter = precommit.received.voter.clone(); let voter = precommit.received.voter.clone();
self.serializer.push(feed::AfgReceivedPrecommit(addr, finalized_number, self.serializer.push(feed::AfgReceivedPrecommit(
precommit.received.target_hash, voter)); addr,
finalized_number,
precommit.received.target_hash,
voter,
));
self.broadcast_finality(); self.broadcast_finality();
} }
} }
return; return;
} }
Payload::AfgReceivedPrevote(prevote) => { Payload::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) = prevote.received.target_number.parse::<BlockNumber>() { if let Ok(finalized_number) =
prevote.received.target_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() { if let Some(addr) = node.details().validator.clone() {
let voter = prevote.received.voter.clone(); let voter = prevote.received.voter.clone();
self.serializer.push(feed::AfgReceivedPrevote(addr, finalized_number, self.serializer.push(feed::AfgReceivedPrevote(
prevote.received.target_hash, voter)); addr,
finalized_number,
prevote.received.target_hash,
voter,
));
self.broadcast_finality(); self.broadcast_finality();
} }
} }
return; return;
} }
Payload::AfgReceivedCommit(_) => { Payload::AfgReceivedCommit(_) => {}
}
_ => (), _ => (),
} }
if let Some(block) = msg.payload().finalized_block() { if let Some(block) = payload.finalized_block() {
if let Some(finalized) = node.update_finalized(block) { if let Some(finalized) = node.update_finalized(block) {
self.serializer.push(feed::FinalizedBlock(nid, finalized.height, finalized.hash)); self.serializer.push(feed::FinalizedBlock(
nid,
finalized.height,
finalized.hash,
));
if finalized.height > self.finalized.height { if finalized.height > self.finalized.height {
self.finalized = *finalized; self.finalized = *finalized;
self.serializer.push(feed::BestFinalized(finalized.height, finalized.hash)); self.serializer
.push(feed::BestFinalized(finalized.height, finalized.hash));
} }
} }
} }
@@ -413,7 +459,12 @@ impl Handler<LocateNode> for Chain {
let LocateNode { nid, location } = msg; let LocateNode { nid, location } = msg;
if let Some(node) = self.nodes.get_mut(nid) { if let Some(node) = self.nodes.get_mut(nid) {
self.serializer.push(feed::LocatedNode(nid, location.latitude, location.longitude, &location.city)); self.serializer.push(feed::LocatedNode(
nid,
location.latitude,
location.longitude,
&location.city,
));
node.update_location(location); node.update_location(location);
} }
@@ -457,10 +508,13 @@ impl Handler<Subscribe> for Chain {
self.timestamp.unwrap_or(0), self.timestamp.unwrap_or(0),
self.average_block_time, self.average_block_time,
)); ));
self.serializer.push(feed::BestFinalized(self.finalized.height, self.finalized.hash)); self.serializer.push(feed::BestFinalized(
self.finalized.height,
self.finalized.hash,
));
for (idx, (nid, node)) in self.nodes.iter().enumerate() { for (idx, (nid, node)) in self.nodes.iter().enumerate() {
// Send subscribtion confirmation and chain head before doing all the nodes, // Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently // and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 { if idx % 32 == 0 {
if let Some(serialized) = self.serializer.finalize() { if let Some(serialized) = self.serializer.finalize() {
@@ -469,7 +523,11 @@ impl Handler<Subscribe> for Chain {
} }
self.serializer.push(feed::AddedNode(nid, node)); self.serializer.push(feed::AddedNode(nid, node));
self.serializer.push(feed::FinalizedBlock(nid, node.finalized().height, node.finalized().hash)); self.serializer.push(feed::FinalizedBlock(
nid,
node.finalized().height,
node.finalized().hash,
));
if node.stale() { if node.stale() {
self.serializer.push(feed::StaleNode(nid)); self.serializer.push(feed::StaleNode(nid));
} }
+24 -7
View File
@@ -1,12 +1,13 @@
use std::mem; use serde::ser::{SerializeTuple, Serializer};
use serde::Serialize; use serde::Serialize;
use serde::ser::{Serializer, SerializeTuple}; use std::mem;
use serde_json::to_writer;
use crate::node::Node; use crate::node::Node;
use crate::types::{ use crate::types::{
NodeId, NodeStats, NodeHardware, NodeIO, BlockNumber, BlockHash, BlockDetails, Timestamp, Address, Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
Timestamp,
}; };
use serde_json::to_writer;
pub mod connector; pub mod connector;
@@ -147,13 +148,29 @@ pub struct Pong<'a>(pub &'a str);
pub struct AfgFinalized(pub Address, pub BlockNumber, pub BlockHash); pub struct AfgFinalized(pub Address, pub BlockNumber, pub BlockHash);
#[derive(Serialize)] #[derive(Serialize)]
pub struct AfgReceivedPrevote(pub Address, pub BlockNumber, pub BlockHash, pub Option<Address>); pub struct AfgReceivedPrevote(
pub Address,
pub BlockNumber,
pub BlockHash,
pub Option<Address>,
);
#[derive(Serialize)] #[derive(Serialize)]
pub struct AfgReceivedPrecommit(pub Address, pub BlockNumber, pub BlockHash, pub Option<Address>); pub struct AfgReceivedPrecommit(
pub Address,
pub BlockNumber,
pub BlockHash,
pub Option<Address>,
);
#[derive(Serialize)] #[derive(Serialize)]
pub struct AfgAuthoritySet(pub Address, pub Address, pub Address, pub BlockNumber, pub BlockHash); pub struct AfgAuthoritySet(
pub Address,
pub Address,
pub Address,
pub BlockNumber,
pub BlockHash,
);
#[derive(Serialize)] #[derive(Serialize)]
pub struct StaleNode(pub NodeId); pub struct StaleNode(pub NodeId);
+14 -13
View File
@@ -1,11 +1,11 @@
use std::time::{Duration, Instant}; use crate::aggregator::{Aggregator, Connect, Disconnect, NoMoreFinality, SendFinality, Subscribe};
use bytes::Bytes;
use actix::prelude::*;
use actix_web_actors::ws;
use crate::aggregator::{Aggregator, Connect, Disconnect, Subscribe, SendFinality, NoMoreFinality};
use crate::chain::Unsubscribe; use crate::chain::Unsubscribe;
use crate::feed::{FeedMessageSerializer, Pong}; use crate::feed::{FeedMessageSerializer, Pong};
use crate::util::fnv; use crate::util::fnv;
use actix::prelude::*;
use actix_web_actors::ws;
use bytes::Bytes;
use std::time::{Duration, Instant};
pub type FeedId = usize; pub type FeedId = usize;
@@ -26,7 +26,7 @@ pub struct FeedConnector {
/// Chain actor address /// Chain actor address
chain: Option<Recipient<Unsubscribe>>, chain: Option<Recipient<Unsubscribe>>,
/// FNV hash of the chain label, optimization to avoid double-subscribing /// FNV hash of the chain label, optimization to avoid double-subscribing
chain_hash: u64, chain_label_hash: u64,
/// Message serializer /// Message serializer
serializer: FeedMessageSerializer, serializer: FeedMessageSerializer,
} }
@@ -58,7 +58,7 @@ impl FeedConnector {
hb: Instant::now(), hb: Instant::now(),
aggregator, aggregator,
chain: None, chain: None,
chain_hash: 0, chain_label_hash: 0,
serializer: FeedMessageSerializer::new(), serializer: FeedMessageSerializer::new(),
} }
} }
@@ -79,11 +79,12 @@ impl FeedConnector {
match cmd { match cmd {
"subscribe" => { "subscribe" => {
match fnv(payload) { match fnv(payload) {
hash if hash == self.chain_hash => return, hash if hash == self.chain_label_hash => return,
hash => self.chain_hash = hash, hash => self.chain_label_hash = hash,
} }
self.aggregator.send(Subscribe { self.aggregator
.send(Subscribe {
chain: payload.into(), chain: payload.into(),
feed: ctx.address(), feed: ctx.address(),
}) })
@@ -92,7 +93,7 @@ impl FeedConnector {
match res { match res {
Ok(true) => (), Ok(true) => (),
// Chain not found, reset hash // Chain not found, reset hash
_ => actor.chain_hash = 0, _ => actor.chain_label_hash = 0,
} }
async {}.into_actor(actor) async {}.into_actor(actor)
}) })
@@ -154,7 +155,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for FeedConnector {
Ok(ws::Message::Text(text)) => { Ok(ws::Message::Text(text)) => {
if let Some(idx) = text.find(':') { if let Some(idx) = text.find(':') {
let cmd = &text[..idx]; let cmd = &text[..idx];
let payload = &text[idx+1..]; let payload = &text[idx + 1..];
log::info!("New FEED message: {}", cmd); log::info!("New FEED message: {}", cmd);
@@ -191,7 +192,7 @@ impl Handler<Unsubscribed> for FeedConnector {
fn handle(&mut self, _: Unsubscribed, _: &mut Self::Context) { fn handle(&mut self, _: Unsubscribed, _: &mut Self::Context) {
self.chain = None; self.chain = None;
self.chain_hash = 0; self.chain_label_hash = 0;
} }
} }
+21 -10
View File
@@ -1,10 +1,10 @@
use std::net::Ipv4Addr;
use std::collections::HashSet; use std::collections::HashSet;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::net::Ipv4Addr;
use actix::prelude::*; use actix::prelude::*;
use actix_http::ws::Codec; use actix_http::ws::Codec;
use actix_web::{web, get, middleware, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web::{get, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws; use actix_web_actors::ws;
use clap::Clap; use clap::Clap;
use simple_logger::SimpleLogger; use simple_logger::SimpleLogger;
@@ -82,7 +82,10 @@ async fn node_route(
aggregator: web::Data<Addr<Aggregator>>, aggregator: web::Data<Addr<Aggregator>>,
locator: web::Data<Addr<Locator>>, locator: web::Data<Addr<Locator>>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let ip = req.connection_info().realip_remote_addr().and_then(|mut addr| { let ip = req
.connection_info()
.realip_remote_addr()
.and_then(|mut addr| {
if let Some(port_idx) = addr.find(':') { if let Some(port_idx) = addr.find(':') {
addr = &addr[..port_idx]; addr = &addr[..port_idx];
} }
@@ -125,16 +128,21 @@ async fn state_route(
let res = match aggregator.send(GetNetworkState(chain, nid)).await { let res = match aggregator.send(GetNetworkState(chain, nid)).await {
Ok(Some(res)) => res.await, Ok(Some(res)) => res.await,
Ok(None) => Ok(None), Ok(None) => Ok(None),
Err(error) => Err(error) Err(error) => Err(error),
}; };
match res { match res {
Ok(Some(body)) => { Ok(Some(body)) => {
HttpResponse::Ok().content_type("application/json").body(body).await HttpResponse::Ok()
}, .content_type("application/json")
.body(body)
.await
}
Ok(None) => { Ok(None) => {
HttpResponse::Ok().body("Node has disconnected or has not submitted its network state yet").await HttpResponse::Ok()
}, .body("Node has disconnected or has not submitted its network state yet")
.await
}
Err(error) => { Err(error) => {
log::error!("Network state mailbox error: {:?}", error); log::error!("Network state mailbox error: {:?}", error);
@@ -151,7 +159,7 @@ async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse,
let body = format!("Connected chains: {}", count); let body = format!("Connected chains: {}", count);
HttpResponse::Ok().body(body).await HttpResponse::Ok().body(body).await
}, }
Err(error) => { Err(error) => {
log::error!("Health check mailbox error: {:?}", error); log::error!("Health check mailbox error: {:?}", error);
@@ -166,7 +174,10 @@ async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse,
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let opts = Opts::parse(); let opts = Opts::parse();
let log_level = &opts.log_level; let log_level = &opts.log_level;
SimpleLogger::new().with_level(log_level.into()).init().expect("Must be able to start a logger"); SimpleLogger::new()
.with_level(log_level.into())
.init()
.expect("Must be able to start a logger");
let denylist = HashSet::from_iter(opts.denylist); let denylist = HashSet::from_iter(opts.denylist);
let aggregator = Aggregator::new(denylist).start(); let aggregator = Aggregator::new(denylist).start();
+12 -4
View File
@@ -1,11 +1,14 @@
use bytes::Bytes; use bytes::Bytes;
use std::sync::Arc; use std::sync::Arc;
use crate::types::{NodeId, NodeDetails, NodeStats, NodeIO, NodeHardware, NodeLocation, BlockDetails, Block, Timestamp}; use crate::types::{
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeId, NodeLocation, NodeStats,
Timestamp,
};
use crate::util::now; use crate::util::now;
pub mod message;
pub mod connector; pub mod connector;
pub mod message;
use message::SystemInterval; use message::SystemInterval;
@@ -41,7 +44,8 @@ pub struct Node {
impl Node { impl Node {
pub fn new(mut details: NodeDetails) -> Self { pub fn new(mut details: NodeDetails) -> Self {
let startup_time = details.startup_time let startup_time = details
.startup_time
.take() .take()
.and_then(|time| time.parse().ok()); .and_then(|time| time.parse().ok());
@@ -111,7 +115,11 @@ impl Node {
} }
} }
pub fn update_details(&mut self, timestamp: u64, propagation_time: Option<u64>) -> Option<&BlockDetails> { pub fn update_details(
&mut self,
timestamp: u64,
propagation_time: Option<u64>,
) -> Option<&BlockDetails> {
self.best.block_time = timestamp - self.best.block_timestamp; self.best.block_time = timestamp - self.best.block_timestamp;
self.best.block_timestamp = timestamp; self.best.block_timestamp = timestamp;
self.best.propagation_time = propagation_time; self.best.propagation_time = propagation_time;
+56 -36
View File
@@ -1,18 +1,18 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use std::net::Ipv4Addr;
use std::mem; use std::mem;
use std::net::Ipv4Addr;
use std::time::{Duration, Instant};
use bytes::{Bytes, BytesMut}; use crate::aggregator::{AddNode, Aggregator};
use actix::prelude::*; use crate::chain::{Chain, RemoveNode, UpdateNode};
use actix_web_actors::ws::{self, CloseReason};
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode};
use crate::node::NodeId;
use crate::node::message::{NodeMessage, Payload}; use crate::node::message::{NodeMessage, Payload};
use crate::util::LocateRequest; use crate::node::NodeId;
use crate::types::ConnId; use crate::types::ConnId;
use crate::util::LocateRequest;
use actix::prelude::*;
use actix_http::ws::Item;
use actix_web_actors::ws::{self, CloseReason};
use bytes::{Bytes, BytesMut};
/// How often heartbeat pings are sent /// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
@@ -45,8 +45,8 @@ enum ConnMultiplex {
}, },
Waiting { Waiting {
/// Backlog of messages to be sent once we get a recipient handle to the chain /// Backlog of messages to be sent once we get a recipient handle to the chain
backlog: Vec<NodeMessage>, backlog: Vec<Payload>,
} },
} }
impl Default for ConnMultiplex { impl Default for ConnMultiplex {
@@ -74,7 +74,11 @@ impl Actor for NodeConnector {
} }
impl NodeConnector { impl NodeConnector {
pub fn new(aggregator: Addr<Aggregator>, locator: Recipient<LocateRequest>, ip: Option<Ipv4Addr>) -> Self { pub fn new(
aggregator: Addr<Aggregator>,
locator: Recipient<LocateRequest>,
ip: Option<Ipv4Addr>,
) -> Self {
Self { Self {
multiplex: BTreeMap::new(), multiplex: BTreeMap::new(),
hb: Instant::now(), hb: Instant::now(),
@@ -90,40 +94,46 @@ impl NodeConnector {
// check client heartbeats // check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// stop actor // stop actor
ctx.close(Some(CloseReason { code: ws::CloseCode::Abnormal, description: Some("Missed heartbeat".into())})); ctx.close(Some(CloseReason {
code: ws::CloseCode::Abnormal,
description: Some("Missed heartbeat".into()),
}));
ctx.stop(); ctx.stop();
} }
}); });
} }
fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) { fn handle_message(
&mut self,
msg: NodeMessage,
data: Bytes,
ctx: &mut <Self as Actor>::Context,
) {
let conn_id = msg.id(); let conn_id = msg.id();
let payload = msg.into();
match self.multiplex.entry(conn_id).or_default() { match self.multiplex.entry(conn_id).or_default() {
ConnMultiplex::Connected { nid, chain } => { ConnMultiplex::Connected { nid, chain } => {
chain.do_send(UpdateNode { chain.do_send(UpdateNode {
nid: *nid, nid: *nid,
msg,
raw: Some(data), raw: Some(data),
payload,
}); });
} }
ConnMultiplex::Waiting { backlog } => { ConnMultiplex::Waiting { backlog } => {
if let Payload::SystemConnected(connected) = msg.payload() { if let Payload::SystemConnected(connected) = payload {
let mut node = connected.node.clone(); self.aggregator.do_send(AddNode {
// FIXME: Use genesis hash instead of names to avoid this mess node: connected.node,
match &*node.chain { genesis_hash: connected.genesis_hash,
"Kusama CC3" => node.chain = "Kusama".into(), conn_id,
"Polkadot CC1" => node.chain = "Polkadot".into(), node_connector: ctx.address(),
_ => () });
}
self.aggregator.do_send(AddNode { node, conn_id, node_connector: ctx.address() });
} else { } else {
if backlog.len() >= 10 { if backlog.len() >= 10 {
backlog.remove(0); backlog.remove(0);
} }
backlog.push(msg); backlog.push(payload);
} }
} }
} }
@@ -180,13 +190,21 @@ impl Handler<Initialize> for NodeConnector {
type Result = (); type Result = ();
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) { fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
let Initialize { nid, conn_id, chain } = msg; let Initialize {
nid,
conn_id,
chain,
} = msg;
log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id); log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id);
let mx = self.multiplex.entry(conn_id).or_default(); let mx = self.multiplex.entry(conn_id).or_default();
if let ConnMultiplex::Waiting { backlog } = mx { if let ConnMultiplex::Waiting { backlog } = mx {
for msg in backlog.drain(..) { for payload in backlog.drain(..) {
chain.do_send(UpdateNode { nid, msg, raw: None }); chain.do_send(UpdateNode {
nid,
raw: None,
payload,
});
} }
*mx = ConnMultiplex::Connected { *mx = ConnMultiplex::Connected {
@@ -233,7 +251,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
self.continue_frame(&bytes); self.continue_frame(&bytes);
self.finish_frame() self.finish_frame()
} }
} },
Err(error) => { Err(error) => {
log::error!("{:?}", error); log::error!("{:?}", error);
ctx.stop(); ctx.stop();
@@ -242,14 +260,16 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
}; };
match serde_json::from_slice(&data) { match serde_json::from_slice(&data) {
Ok(msg) => { Ok(msg) => self.handle_message(msg, data, ctx),
self.handle_message(msg, data, ctx)
},
#[cfg(debug)] #[cfg(debug)]
Err(err) => { Err(err) => {
let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); let data: &[u8] = data.get(..512).unwrap_or_else(|| &data);
log::warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")) log::warn!(
}, "Failed to parse node message: {} {}",
err,
std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")
)
}
#[cfg(not(debug))] #[cfg(not(debug))]
Err(_) => (), Err(_) => (),
} }
+20 -22
View File
@@ -1,8 +1,9 @@
use actix::prelude::*;
use serde::Deserialize;
use serde::de::IgnoredAny;
use crate::node::NodeDetails; use crate::node::NodeDetails;
use crate::types::{Block, BlockNumber, BlockHash, ConnId}; use crate::types::{Block, BlockHash, BlockNumber, ConnId};
use crate::util::Hash;
use actix::prelude::*;
use serde::de::IgnoredAny;
use serde::Deserialize;
#[derive(Deserialize, Debug, Message)] #[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
@@ -19,13 +20,6 @@ pub enum NodeMessage {
} }
impl NodeMessage { impl NodeMessage {
/// Returns a reference to the payload.
pub fn payload(&self) -> &Payload {
match self {
NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload,
}
}
/// Returns the connection ID or 0 if there is no ID. /// Returns the connection ID or 0 if there is no ID.
pub fn id(&self) -> ConnId { pub fn id(&self) -> ConnId {
match self { match self {
@@ -35,6 +29,14 @@ impl NodeMessage {
} }
} }
impl From<NodeMessage> for Payload {
fn from(msg: NodeMessage) -> Payload {
match msg {
NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload,
}
}
}
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(tag = "msg")] #[serde(tag = "msg")]
pub enum Payload { pub enum Payload {
@@ -70,7 +72,7 @@ pub enum Payload {
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct SystemConnected { pub struct SystemConnected {
pub network_id: Option<Box<str>>, pub genesis_hash: Hash,
#[serde(flatten)] #[serde(flatten)]
pub node: NodeDetails, pub node: NodeDetails,
} }
@@ -154,19 +156,15 @@ impl Payload {
pub fn finalized_block(&self) -> Option<Block> { pub fn finalized_block(&self) -> Option<Block> {
match self { match self {
Payload::SystemInterval(ref interval) => { Payload::SystemInterval(ref interval) => Some(Block {
Some(Block {
hash: interval.finalized_hash?, hash: interval.finalized_hash?,
height: interval.finalized_height?, height: interval.finalized_height?,
}) }),
}, Payload::NotifyFinalized(ref finalized) => Some(Block {
Payload::NotifyFinalized(ref finalized) => {
Some(Block {
hash: finalized.hash, hash: finalized.hash,
height: finalized.height.parse().ok()? height: finalized.height.parse().ok()?,
}) }),
}, _ => None,
_ => None
} }
} }
} }
+2 -2
View File
@@ -1,7 +1,7 @@
use serde::ser::{Serialize, Serializer, SerializeTuple}; use serde::ser::{Serialize, SerializeTuple, Serializer};
use serde::Deserialize; use serde::Deserialize;
use crate::util::{MeanList, now}; use crate::util::{now, MeanList};
pub type NodeId = usize; pub type NodeId = usize;
pub type ConnId = u64; pub type ConnId = u64;
+5 -3
View File
@@ -1,11 +1,13 @@
mod dense_map; mod dense_map;
mod hash;
mod location;
mod mean_list; mod mean_list;
mod num_stats; mod num_stats;
mod location;
pub use mean_list::MeanList;
pub use location::{Locator, LocatorFactory, LocateRequest};
pub use dense_map::DenseMap; pub use dense_map::DenseMap;
pub use hash::Hash;
pub use location::{LocateRequest, Locator, LocatorFactory};
pub use mean_list::MeanList;
pub use num_stats::NumStats; pub use num_stats::NumStats;
pub fn fnv<D: AsRef<[u8]>>(data: D) -> u64 { pub fn fnv<D: AsRef<[u8]>>(data: D) -> u64 {
+10 -8
View File
@@ -27,12 +27,12 @@ impl<T> DenseMap<T> {
Some(id) => { Some(id) => {
self.items[id] = Some(f(id)); self.items[id] = Some(f(id));
id id
}, }
None => { None => {
let id = self.items.len(); let id = self.items.len();
self.items.push(Some(f(id))); self.items.push(Some(f(id)));
id id
}, }
} }
} }
@@ -57,15 +57,17 @@ impl<T> DenseMap<T> {
} }
pub fn iter(&self) -> impl Iterator<Item = (Id, &T)> + '_ { pub fn iter(&self) -> impl Iterator<Item = (Id, &T)> + '_ {
self.items.iter().enumerate().filter_map(|(id, item)| { self.items
Some((id, item.as_ref()?)) .iter()
}) .enumerate()
.filter_map(|(id, item)| Some((id, item.as_ref()?)))
} }
pub fn iter_mut(&mut self) -> impl Iterator<Item = (Id, &mut T)> + '_ { pub fn iter_mut(&mut self) -> impl Iterator<Item = (Id, &mut T)> + '_ {
self.items.iter_mut().enumerate().filter_map(|(id, item)| { self.items
Some((id, item.as_mut()?)) .iter_mut()
}) .enumerate()
.filter_map(|(id, item)| Some((id, item.as_mut()?)))
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
+58
View File
@@ -0,0 +1,58 @@
use std::fmt::{self, Debug};
use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor};
const HASH_BYTES: usize = 32;
/// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`.
// We could use primitive_types::H256 here, but opted for a custom type to aboid more dependencies.
#[derive(Hash, PartialEq, Eq, Clone, Copy)]
pub struct Hash([u8; HASH_BYTES]);
struct HashVisitor;
impl<'de> Visitor<'de> for HashVisitor {
type Value = Hash;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("hexidecimal string of 32 bytes beginning with 0x")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
if !value.starts_with("0x") {
return Err(de::Error::invalid_value(Unexpected::Str(value), &self));
}
let mut hash = [0; HASH_BYTES];
hex::decode_to_slice(&value[2..], &mut hash)
.map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self))?;
Ok(Hash(hash))
}
}
impl<'de> Deserialize<'de> for Hash {
fn deserialize<D>(deserializer: D) -> Result<Hash, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(HashVisitor)
}
}
impl Debug for Hash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("0x")?;
let mut ascii = [0; HASH_BYTES * 2];
hex::encode_to_slice(self.0, &mut ascii)
.expect("Encoding 32 bytes into 64 bytes of ascii; qed");
f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed"))
}
}
+21 -9
View File
@@ -2,8 +2,8 @@ use std::net::Ipv4Addr;
use std::sync::Arc; use std::sync::Arc;
use actix::prelude::*; use actix::prelude::*;
use rustc_hash::FxHashMap;
use parking_lot::RwLock; use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use serde::Deserialize; use serde::Deserialize;
use crate::chain::{Chain, LocateNode}; use crate::chain::{Chain, LocateNode};
@@ -26,7 +26,11 @@ impl LocatorFactory {
// Default entry for localhost // Default entry for localhost
cache.insert( cache.insert(
Ipv4Addr::new(127, 0, 0, 1), Ipv4Addr::new(127, 0, 0, 1),
Some(Arc::new(NodeLocation { latitude: 52.516_6667, longitude: 13.4, city: "Berlin".into() })), Some(Arc::new(NodeLocation {
latitude: 52.516_6667,
longitude: 13.4,
city: "Berlin".into(),
})),
); );
LocatorFactory { LocatorFactory {
@@ -90,10 +94,13 @@ impl Handler<LocateRequest> for Locator {
if let Some(item) = self.cache.read().get(&ip) { if let Some(item) = self.cache.read().get(&ip) {
if let Some(location) = item { if let Some(location) = item {
return chain.do_send(LocateNode { nid, location: location.clone() }); return chain.do_send(LocateNode {
nid,
location: location.clone(),
});
} }
return return;
} }
let location = match self.iplocate(ip) { let location = match self.iplocate(ip) {
@@ -120,15 +127,20 @@ impl Locator {
} }
fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> { fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
let location = self.query(&format!("https://ipapi.co/{}/json", ip))?.map(Arc::new); let location = self
.query(&format!("https://ipapi.co/{}/json", ip))?
.map(Arc::new);
Ok(location) Ok(location)
} }
fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> { fn iplocate_ipinfo_io(
let location = self.query(&format!("https://ipinfo.io/{}/json", ip))?.and_then(|loc: IPApiLocate| { &self,
loc.into_node_location().map(Arc::new) ip: Ipv4Addr,
}); ) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
let location = self
.query(&format!("https://ipinfo.io/{}/json", ip))?
.and_then(|loc: IPApiLocate| loc.into_node_location().map(Arc::new));
Ok(location) Ok(location)
} }
+8 -2
View File
@@ -1,7 +1,10 @@
use num_traits::{Float, Zero}; use num_traits::{Float, Zero};
use std::ops::AddAssign; use std::ops::AddAssign;
pub struct MeanList<T> where T: Float + AddAssign + Zero + From<u8> { pub struct MeanList<T>
where
T: Float + AddAssign + Zero + From<u8>,
{
period_sum: T, period_sum: T,
period_count: u8, period_count: u8,
mean_index: u8, mean_index: u8,
@@ -24,7 +27,10 @@ where
} }
} }
impl<T> MeanList<T> where T: Float + AddAssign + Zero + From<u8> { impl<T> MeanList<T>
where
T: Float + AddAssign + Zero + From<u8>,
{
pub fn slice(&self) -> &[T] { pub fn slice(&self) -> &[T] {
&self.means[..usize::from(self.mean_index)] &self.means[..usize::from(self.mean_index)]
} }
+2 -2
View File
@@ -1,6 +1,6 @@
use num_traits::{Zero, NumOps, Bounded}; use num_traits::{Bounded, NumOps, Zero};
use std::iter::Sum;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::iter::Sum;
/// Keep track of last N numbers pushed onto internal stack. /// Keep track of last N numbers pushed onto internal stack.
/// Provides means to get an average of said numbers. /// Provides means to get an average of said numbers.