This commit is contained in:
Maciej Hirsz
2020-09-25 14:03:35 +02:00
parent b98c816a84
commit 4a5bafcd41
11 changed files with 1469 additions and 1040 deletions
+1363 -962
View File
File diff suppressed because it is too large Load Diff
+14 -9
View File
@@ -6,11 +6,16 @@ edition = "2018"
license = "GPL-3.0" license = "GPL-3.0"
[dependencies] [dependencies]
actix = "0.8" actix = "0.10.0"
actix-web = { git = "https://github.com/maciejhirsz/actix-web" } actix-web = "3.0.1"
actix-web-actors = { git = "https://github.com/maciejhirsz/actix-web" } actix-web-actors = "3.0.0"
actix-http = { git = "https://github.com/maciejhirsz/actix-web" } actix-http = "2.0.0"
bytes = "0.4" # actix = "0.8"
# actix-web = { git = "https://github.com/maciejhirsz/actix-web" }
# actix-web-actors = { git = "https://github.com/maciejhirsz/actix-web" }
# actix-http = { git = "https://github.com/maciejhirsz/actix-web" }
# bytes = "0.4"
bytes = "0.5.6"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
fnv = "1.0.6" fnv = "1.0.6"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
@@ -28,7 +33,7 @@ clap = "3.0.0-beta.1"
lto = true lto = true
panic = "abort" panic = "abort"
[patch.crates-io] # [patch.crates-io]
actix-web = { git = "https://github.com/maciejhirsz/actix-web" } # actix-web = { git = "https://github.com/maciejhirsz/actix-web" }
actix-web-actors = { git = "https://github.com/maciejhirsz/actix-web" } # actix-web-actors = { git = "https://github.com/maciejhirsz/actix-web" }
actix-http = { git = "https://github.com/maciejhirsz/actix-web" } # actix-http = { git = "https://github.com/maciejhirsz/actix-web" }
+29 -21
View File
@@ -104,6 +104,7 @@ impl Actor for Aggregator {
/// Message sent from the NodeConnector to the Aggregator upon getting all node details /// Message sent from the NodeConnector to the Aggregator upon getting all node details
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct AddNode { pub struct AddNode {
pub node: NodeDetails, pub node: NodeDetails,
pub network_id: Option<Label>, pub network_id: Option<Label>,
@@ -112,9 +113,11 @@ pub struct AddNode {
/// Message sent from the Chain to the Aggregator when the Chain loses all nodes /// Message sent from the Chain to the Aggregator when the Chain loses all nodes
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct DropChain(pub ChainId); pub struct DropChain(pub ChainId);
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct RenameChain(pub ChainId, pub Label); pub struct RenameChain(pub ChainId, pub Label);
/// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain /// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain
@@ -129,6 +132,7 @@ impl Message for Subscribe {
/// Message sent from the FeedConnector to the Aggregator consensus requested /// Message sent from the FeedConnector to the Aggregator consensus requested
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct SendFinality { pub struct SendFinality {
pub chain: Label, pub chain: Label,
pub fid: FeedId, pub fid: FeedId,
@@ -136,6 +140,7 @@ pub struct SendFinality {
/// Message sent from the FeedConnector to the Aggregator no more consensus required /// Message sent from the FeedConnector to the Aggregator no more consensus required
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct NoMoreFinality { pub struct NoMoreFinality {
pub chain: Label, pub chain: Label,
pub fid: FeedId, pub fid: FeedId,
@@ -143,29 +148,32 @@ pub struct NoMoreFinality {
/// Message sent from the FeedConnector to the Aggregator when first connected /// Message sent from the FeedConnector to the Aggregator when first connected
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Connect(pub Addr<FeedConnector>); pub struct Connect(pub Addr<FeedConnector>);
/// Message sent from the FeedConnector to the Aggregator when disconnecting /// Message sent from the FeedConnector to the Aggregator when disconnecting
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect(pub FeedId); pub struct Disconnect(pub FeedId);
/// Message sent from the Chain to the Aggergator when the node count on the chain changes /// Message sent from the Chain to the Aggergator when the node count on the chain changes
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct NodeCount(pub ChainId, pub usize); pub struct NodeCount(pub ChainId, pub usize);
/// Message sent to the Aggregator to get the network state of a particular node /// Message sent to the Aggregator to get the network state of a particular node
pub struct GetNetworkState(pub Box<str>, pub NodeId); // pub struct GetNetworkState(pub Box<str>, pub NodeId);
/// Message sent to the Aggregator to get a health check /// Message sent to the Aggregator to get a health check
pub struct GetHealth; // pub struct GetHealth;
impl Message for GetNetworkState { // impl Message for GetNetworkState {
type Result = Option<Request<Chain, GetNodeNetworkState>>; // type Result = Option<Request<Chain, GetNodeNetworkState>>;
} // }
impl Message for GetHealth { // impl Message for GetHealth {
type Result = usize; // type Result = usize;
} // }
impl Handler<AddNode> for Aggregator { impl Handler<AddNode> for Aggregator {
type Result = (); type Result = ();
@@ -330,20 +338,20 @@ impl Handler<NodeCount> for Aggregator {
} }
} }
impl Handler<GetNetworkState> for Aggregator { // impl Handler<GetNetworkState> for Aggregator {
type Result = <GetNetworkState as Message>::Result; // type Result = <GetNetworkState as Message>::Result;
fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result { // fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result {
let GetNetworkState(chain, nid) = msg; // let GetNetworkState(chain, nid) = msg;
Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid))) // Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid)))
} // }
} // }
impl Handler<GetHealth> for Aggregator { // impl Handler<GetHealth> for Aggregator {
type Result = usize; // type Result = usize;
fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result { // fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result {
self.chains.len() // self.chains.len()
} // }
} // }
+8
View File
@@ -192,6 +192,7 @@ impl Actor for Chain {
/// Message sent from the Aggregator to the Chain when new Node is connected /// Message sent from the Aggregator to the Chain when new Node is connected
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct AddNode { pub struct AddNode {
pub node: NodeDetails, pub node: NodeDetails,
pub rec: Recipient<Initialize>, pub rec: Recipient<Initialize>,
@@ -199,6 +200,7 @@ pub struct AddNode {
/// Message sent from the NodeConnector to the Chain when it receives new telemetry data /// Message sent from the NodeConnector to the Chain when it receives new telemetry data
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct UpdateNode { pub struct UpdateNode {
pub nid: NodeId, pub nid: NodeId,
pub msg: NodeMessage, pub msg: NodeMessage,
@@ -207,24 +209,30 @@ pub struct UpdateNode {
/// Message sent from the NodeConnector to the Chain when the connector disconnects /// Message sent from the NodeConnector to the Chain when the connector disconnects
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct RemoveNode(pub NodeId); pub struct RemoveNode(pub NodeId);
/// Message sent from the Aggregator to the Chain when the connector wants to subscribe to that chain /// Message sent from the Aggregator to the Chain when the connector wants to subscribe to that chain
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribe(pub Addr<FeedConnector>); pub struct Subscribe(pub Addr<FeedConnector>);
/// Message sent from the FeedConnector before it subscribes to a new chain, or if it disconnects /// Message sent from the FeedConnector before it subscribes to a new chain, or if it disconnects
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Unsubscribe(pub FeedId); pub struct Unsubscribe(pub FeedId);
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct SendFinality(pub FeedId); pub struct SendFinality(pub FeedId);
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct NoMoreFinality(pub FeedId); pub struct NoMoreFinality(pub FeedId);
/// Message sent from the NodeConnector to the Chain when it receives location data /// Message sent from the NodeConnector to the Chain when it receives location data
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct LocateNode { pub struct LocateNode {
pub nid: NodeId, pub nid: NodeId,
pub location: Arc<NodeLocation>, pub location: Arc<NodeLocation>,
+6 -7
View File
@@ -1,3 +1,4 @@
use std::mem;
use serde::Serialize; use serde::Serialize;
use serde::ser::{Serializer, SerializeTuple}; use serde::ser::{Serializer, SerializeTuple};
@@ -20,17 +21,15 @@ pub struct FeedMessageSerializer {
buffer: Vec<u8>, buffer: Vec<u8>,
} }
const BUFCAP: usize = 128;
impl FeedMessageSerializer { impl FeedMessageSerializer {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
buffer: Vec::new(), buffer: Vec::with_capacity(BUFCAP),
} }
} }
pub fn clear(&mut self) {
self.buffer.clear();
}
pub fn push<Message>(&mut self, msg: Message) pub fn push<Message>(&mut self, msg: Message)
where where
Message: FeedMessage, Message: FeedMessage,
@@ -52,8 +51,8 @@ impl FeedMessageSerializer {
} }
self.buffer.push(b']'); self.buffer.push(b']');
let bytes = self.buffer[..].into();
self.clear(); let bytes = mem::replace(&mut self.buffer, Vec::with_capacity(BUFCAP)).into();
Some(Serialized(bytes)) Some(Serialized(bytes))
} }
+6 -2
View File
@@ -70,7 +70,7 @@ impl FeedConnector {
// stop actor // stop actor
ctx.stop(); ctx.stop();
} else { } else {
ctx.ping("") ctx.ping(b"")
} }
}); });
} }
@@ -124,13 +124,16 @@ impl FeedConnector {
/// Message sent form Chain to the FeedConnector upon successful subscription /// Message sent form Chain to the FeedConnector upon successful subscription
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribed(pub FeedId, pub Recipient<Unsubscribe>); pub struct Subscribed(pub FeedId, pub Recipient<Unsubscribe>);
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Unsubscribed; pub struct Unsubscribed;
/// Message sent from Aggregator to FeedConnector upon successful connection /// Message sent from Aggregator to FeedConnector upon successful connection
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Connected(pub FeedId); pub struct Connected(pub FeedId);
/// Message sent from either Aggregator or Chain to FeedConnector containing /// Message sent from either Aggregator or Chain to FeedConnector containing
@@ -138,9 +141,10 @@ pub struct Connected(pub FeedId);
/// ///
/// Since Bytes is ARC'ed, this is cheap to clone /// Since Bytes is ARC'ed, this is cheap to clone
#[derive(Message, Clone)] #[derive(Message, Clone)]
#[rtype(result = "()")]
pub struct Serialized(pub Bytes); pub struct Serialized(pub Bytes);
impl StreamHandler<ws::Message, ws::ProtocolError> for FeedConnector { impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for FeedConnector {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg { match msg {
ws::Message::Ping(msg) => { ws::Message::Ping(msg) => {
+30 -30
View File
@@ -46,7 +46,7 @@ fn node_route(
aggregator: web::Data<Addr<Aggregator>>, aggregator: web::Data<Addr<Aggregator>>,
locator: web::Data<Addr<Locator>>, locator: web::Data<Addr<Locator>>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let ip = req.connection_info().remote().and_then(|mut addr| { let ip = req.connection_info().realip_remote_addr().and_then(|mut addr| {
if let Some(port_idx) = addr.find(":") { if let Some(port_idx) = addr.find(":") {
addr = &addr[..port_idx]; addr = &addr[..port_idx];
} }
@@ -77,34 +77,34 @@ fn feed_route(
) )
} }
fn state_route( // fn state_route(
path: web::Path<(Box<str>, NodeId)>, // path: web::Path<(Box<str>, NodeId)>,
aggregator: web::Data<Addr<Aggregator>>, // aggregator: web::Data<Addr<Aggregator>>,
) -> impl Future<Item = HttpResponse, Error = Error> { // ) -> impl Future<Output = Result<HttpResponse, Error>> {
let (chain, nid) = path.into_inner(); // let (chain, nid) = path.into_inner();
aggregator // aggregator
.send(GetNetworkState(chain, nid)) // .send(GetNetworkState(chain, nid))
.flatten() // .flatten()
.from_err() // .from_err()
.and_then(|data| match data.and_then(|nested| nested) { // .and_then(|data| match data.and_then(|nested| nested) {
Some(body) => HttpResponse::Ok() // Some(body) => HttpResponse::Ok()
.content_type("application/json") // .content_type("application/json")
.body(body), // .body(body),
None => HttpResponse::Ok() // None => HttpResponse::Ok()
.body("Node has disconnected or has not submitted its network state yet"), // .body("Node has disconnected or has not submitted its network state yet"),
}) // })
} // }
fn health( // fn health(
aggregator: web::Data<Addr<Aggregator>>, // aggregator: web::Data<Addr<Aggregator>>,
) -> impl Future<Item = HttpResponse, Error = Error> { // ) -> impl Future<Output = Result<HttpResponse, Error>> {
aggregator.send(GetHealth).from_err().and_then(|count| { // aggregator.send(GetHealth).from_err().and_then(|count| {
let body = format!("Connected chains: {}", count); // let body = format!("Connected chains: {}", count);
HttpResponse::Ok().body(body) // HttpResponse::Ok().body(body)
}) // })
} // }
/// Telemetry entry point. Listening by default on 127.0.0.1:8000. /// Telemetry entry point. Listening by default on 127.0.0.1:8000.
/// This can be changed using the `PORT` and `BIND` ENV variables. /// This can be changed using the `PORT` and `BIND` ENV variables.
@@ -127,10 +127,10 @@ fn main() -> std::io::Result<()> {
.service(resource("/submit/").route(get().to(node_route))) .service(resource("/submit/").route(get().to(node_route)))
.service(resource("/feed").route(get().to(feed_route))) .service(resource("/feed").route(get().to(feed_route)))
.service(resource("/feed/").route(get().to(feed_route))) .service(resource("/feed/").route(get().to(feed_route)))
.service(resource("/network_state/{chain}/{nid}").route(get().to_async(state_route))) // .service(resource("/network_state/{chain}/{nid}").route(get().to_async(state_route)))
.service(resource("/network_state/{chain}/{nid}/").route(get().to_async(state_route))) // .service(resource("/network_state/{chain}/{nid}/").route(get().to_async(state_route)))
.service(resource("/health").route(get().to_async(health))) // .service(resource("/health").route(get().to_async(health)))
.service(resource("/health/").route(get().to_async(health))) // .service(resource("/health/").route(get().to_async(health)))
}) })
.bind(format!("{}", opts.socket))? .bind(format!("{}", opts.socket))?
.start(); .start();
+1 -1
View File
@@ -226,7 +226,7 @@ impl Node {
if let Ok(stringified) = serde_json::from_str::<String>(json) { if let Ok(stringified) = serde_json::from_str::<String>(json) {
Some(stringified.into()) Some(stringified.into())
} else { } else {
Some(json.into()) Some(json.to_owned().into())
} }
} }
+10 -8
View File
@@ -107,6 +107,7 @@ impl NodeConnector {
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Initialize(pub NodeId, pub Addr<Chain>); pub struct Initialize(pub NodeId, pub Addr<Chain>);
impl Handler<Initialize> for NodeConnector { impl Handler<Initialize> for NodeConnector {
@@ -130,23 +131,24 @@ impl Handler<Initialize> for NodeConnector {
} }
} }
impl StreamHandler<ws::Message, ws::ProtocolError> for NodeConnector { impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
self.hb = Instant::now(); self.hb = Instant::now();
let data = match msg { let data = match msg {
ws::Message::Ping(msg) => { Ok(ws::Message::Ping(msg)) => {
ctx.pong(&msg); ctx.pong(&msg);
return; return;
} }
ws::Message::Pong(_) => return, Ok(ws::Message::Pong(_)) => return,
ws::Message::Text(text) => text.into(), Ok(ws::Message::Text(text)) => text.into(),
ws::Message::Binary(data) => data, Ok(ws::Message::Binary(data)) => data,
ws::Message::Close(_) => { Ok(ws::Message::Close(_)) => {
ctx.stop(); ctx.stop();
return; return;
} }
ws::Message::Nop => return, Ok(ws::Message::Nop) => return,
Err(error) => error!("{:?}", error),
}; };
match serde_json::from_slice(&data) { match serde_json::from_slice(&data) {
+1
View File
@@ -6,6 +6,7 @@ use crate::node::NodeDetails;
use crate::types::{Block, BlockNumber, BlockHash}; use crate::types::{Block, BlockNumber, BlockHash};
#[derive(Deserialize, Debug, Message)] #[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")]
pub struct NodeMessage { pub struct NodeMessage {
pub level: Level, pub level: Level,
pub ts: DateTime<Utc>, pub ts: DateTime<Utc>,
+1
View File
@@ -47,6 +47,7 @@ impl Actor for Locator {
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct LocateRequest { pub struct LocateRequest {
pub ip: Ipv4Addr, pub ip: Ipv4Addr,
pub nid: NodeId, pub nid: NodeId,