Finally working

This commit is contained in:
Maciej Hirsz
2020-09-29 19:16:28 +02:00
parent 4a5bafcd41
commit 8bf1020c16
8 changed files with 108 additions and 87 deletions
+1
View File
@@ -2147,6 +2147,7 @@ version = "0.2.1"
dependencies = [ dependencies = [
"actix 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
+1
View File
@@ -7,6 +7,7 @@ license = "GPL-3.0"
[dependencies] [dependencies]
actix = "0.10.0" actix = "0.10.0"
actix-rt = "1.1.1"
actix-web = "3.0.1" actix-web = "3.0.1"
actix-web-actors = "3.0.0" actix-web-actors = "3.0.0"
actix-http = "2.0.0" actix-http = "2.0.0"
+24 -30
View File
@@ -121,15 +121,13 @@ pub struct DropChain(pub ChainId);
pub struct RenameChain(pub ChainId, pub Label); pub struct RenameChain(pub ChainId, pub Label);
/// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain /// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain
#[derive(Message)]
#[rtype(result = "bool")]
pub struct Subscribe { pub struct Subscribe {
pub chain: Label, pub chain: Label,
pub feed: Addr<FeedConnector>, pub feed: Addr<FeedConnector>,
} }
impl Message for Subscribe {
type Result = bool;
}
/// Message sent from the FeedConnector to the Aggregator consensus requested /// Message sent from the FeedConnector to the Aggregator consensus requested
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
@@ -162,18 +160,14 @@ pub struct Disconnect(pub FeedId);
pub struct NodeCount(pub ChainId, pub usize); pub struct NodeCount(pub ChainId, pub usize);
/// Message sent to the Aggregator to get the network state of a particular node /// Message sent to the Aggregator to get the network state of a particular node
// pub struct GetNetworkState(pub Box<str>, pub NodeId); #[derive(Message)]
#[rtype(result = "Option<Request<Chain, GetNodeNetworkState>>")]
pub struct GetNetworkState(pub Box<str>, pub NodeId);
/// Message sent to the Aggregator to get a health check /// Message sent to the Aggregator to get a health check
// pub struct GetHealth; #[derive(Message)]
#[rtype(result = "usize")]
// impl Message for GetNetworkState { pub struct GetHealth;
// type Result = Option<Request<Chain, GetNodeNetworkState>>;
// }
// impl Message for GetHealth {
// type Result = usize;
// }
impl Handler<AddNode> for Aggregator { impl Handler<AddNode> for Aggregator {
type Result = (); type Result = ();
@@ -213,7 +207,7 @@ impl Handler<DropChain> for Aggregator {
} }
self.serializer.push(feed::RemovedChain(label)); self.serializer.push(feed::RemovedChain(label));
info!("Dropped chain [{}] from the aggregator", label); log::info!("Dropped chain [{}] from the aggregator", label);
self.broadcast(); self.broadcast();
} }
@@ -292,7 +286,7 @@ impl Handler<Connect> for Aggregator {
let fid = self.feeds.add(connector.clone()); let fid = self.feeds.add(connector.clone());
info!("Feed #{} connected", fid); log::info!("Feed #{} connected", fid);
connector.do_send(Connected(fid)); connector.do_send(Connected(fid));
@@ -315,7 +309,7 @@ impl Handler<Disconnect> for Aggregator {
fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) { fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) {
let Disconnect(fid) = msg; let Disconnect(fid) = msg;
info!("Feed #{} disconnected", fid); log::info!("Feed #{} disconnected", fid);
self.feeds.remove(fid); self.feeds.remove(fid);
} }
@@ -338,20 +332,20 @@ impl Handler<NodeCount> for Aggregator {
} }
} }
// impl Handler<GetNetworkState> for Aggregator { impl Handler<GetNetworkState> for Aggregator {
// type Result = <GetNetworkState as Message>::Result; type Result = <GetNetworkState as Message>::Result;
// fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result {
// let GetNetworkState(chain, nid) = msg; let GetNetworkState(chain, nid) = msg;
// Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid))) Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid)))
// } }
// } }
// impl Handler<GetHealth> for Aggregator { impl Handler<GetHealth> for Aggregator {
// type Result = usize; type Result = usize;
// fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result {
// self.chains.len() self.chains.len()
// } }
// } }
+3 -3
View File
@@ -46,7 +46,7 @@ pub struct Chain {
impl Chain { impl Chain {
pub fn new(cid: ChainId, aggregator: Addr<Aggregator>, label: Label) -> Self { pub fn new(cid: ChainId, aggregator: Addr<Aggregator>, label: Label) -> Self {
info!("[{}] Created", label); log::info!("[{}] Created", label);
Chain { Chain {
cid, cid,
@@ -279,7 +279,7 @@ impl Chain {
if node.update_block(*block) { if node.update_block(*block) {
if block.height > self.best.height { if block.height > self.best.height {
self.best = *block; self.best = *block;
info!( log::info!(
"[{}] [{}/{}] new best block ({}) {:?}", "[{}] [{}/{}] new best block ({}) {:?}",
self.label.0, self.label.0,
nodes_len, nodes_len,
@@ -426,7 +426,7 @@ impl Handler<RemoveNode> for Chain {
} }
if self.nodes.is_empty() { if self.nodes.is_empty() {
info!("[{}] Lost all nodes, dropping...", self.label.0); log::info!("[{}] Lost all nodes, dropping...", self.label.0);
ctx.stop(); ctx.stop();
} }
+14 -12
View File
@@ -88,14 +88,12 @@ impl FeedConnector {
feed: ctx.address(), feed: ctx.address(),
}) })
.into_actor(self) .into_actor(self)
.then(|res, actor, _| { .then(|res, act, _| {
match res { match res {
Ok(true) => (), Ok(true) => (),
// Chain not found, reset hash _ => act.chain_hash = 0,
_ => actor.chain_hash = 0,
} }
async {}.into_actor(act)
fut::ok(())
}) })
.wait(ctx); .wait(ctx);
} }
@@ -145,25 +143,29 @@ pub struct Connected(pub FeedId);
pub struct Serialized(pub Bytes); pub struct Serialized(pub Bytes);
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for FeedConnector { impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for FeedConnector {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg { match msg {
ws::Message::Ping(msg) => { Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now(); self.hb = Instant::now();
ctx.pong(&msg); ctx.pong(&msg);
} }
ws::Message::Pong(_) => self.hb = Instant::now(), Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
ws::Message::Text(text) => { Ok(ws::Message::Text(text)) => {
if let Some(idx) = text.find(':') { if let Some(idx) = text.find(':') {
let cmd = &text[..idx]; let cmd = &text[..idx];
let payload = &text[idx+1..]; let payload = &text[idx+1..];
info!("New FEED message: {}", cmd); log::info!("New FEED message: {}", cmd);
self.handle_cmd(cmd, payload, ctx); self.handle_cmd(cmd, payload, ctx);
} }
} }
ws::Message::Close(_) => ctx.stop(), Ok(ws::Message::Close(_)) => ctx.stop(),
_ => (), Ok(_) => (),
Err(error) => {
log::error!("{:?}", error);
ctx.stop();
}
} }
} }
} }
+54 -39
View File
@@ -1,5 +1,5 @@
#[macro_use] // #[macro_use]
extern crate log; // extern crate log;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
@@ -8,6 +8,7 @@ use actix_http::ws::Codec;
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws; use actix_web_actors::ws;
use clap::Clap; use clap::Clap;
use simple_logger::SimpleLogger;
mod aggregator; mod aggregator;
mod chain; mod chain;
@@ -40,7 +41,7 @@ struct Opts {
} }
/// Entry point for connecting nodes /// Entry point for connecting nodes
fn node_route( async fn node_route(
req: HttpRequest, req: HttpRequest,
stream: web::Payload, stream: web::Payload,
aggregator: web::Data<Addr<Aggregator>>, aggregator: web::Data<Addr<Aggregator>>,
@@ -65,7 +66,7 @@ fn node_route(
} }
/// Entry point for connecting feeds /// Entry point for connecting feeds
fn feed_route( async fn feed_route(
req: HttpRequest, req: HttpRequest,
stream: web::Payload, stream: web::Payload,
aggregator: web::Data<Addr<Aggregator>>, aggregator: web::Data<Addr<Aggregator>>,
@@ -77,44 +78,59 @@ fn feed_route(
) )
} }
// fn state_route( /// Entry point for network state dump
// path: web::Path<(Box<str>, NodeId)>, async fn state_route(
// aggregator: web::Data<Addr<Aggregator>>, path: web::Path<(Box<str>, NodeId)>,
// ) -> impl Future<Output = Result<HttpResponse, Error>> { aggregator: web::Data<Addr<Aggregator>>,
// let (chain, nid) = path.into_inner(); ) -> Result<HttpResponse, Error> {
let (chain, nid) = path.into_inner();
// aggregator let res = match aggregator.send(GetNetworkState(chain, nid)).await {
// .send(GetNetworkState(chain, nid)) Ok(Some(res)) => res.await,
// .flatten() Ok(None) => Ok(None),
// .from_err() Err(error) => Err(error)
// .and_then(|data| match data.and_then(|nested| nested) { };
// Some(body) => HttpResponse::Ok()
// .content_type("application/json")
// .body(body),
// None => HttpResponse::Ok()
// .body("Node has disconnected or has not submitted its network state yet"),
// })
// }
// fn health( match res {
// aggregator: web::Data<Addr<Aggregator>>, Ok(Some(body)) => {
// ) -> impl Future<Output = Result<HttpResponse, Error>> { HttpResponse::Ok().content_type("application/json").body(body).await
// aggregator.send(GetHealth).from_err().and_then(|count| { },
// let body = format!("Connected chains: {}", count); Ok(None) => {
HttpResponse::Ok().body("Node has disconnected or has not submitted its network state yet").await
},
Err(error) => {
log::error!("Network state mailbox error: {:?}", error);
// HttpResponse::Ok().body(body) HttpResponse::InternalServerError().await
// }) }
// } }
}
/// Entry check for health check monitoring bots
async fn health(aggregator: web::Data<Addr<Aggregator>>) -> Result<HttpResponse, Error> {
match aggregator.send(GetHealth).await {
Ok(count) => {
let body = format!("Connected chains: {}", count);
HttpResponse::Ok().body(body).await
},
Err(error) => {
log::error!("Health check mailbox error: {:?}", error);
HttpResponse::InternalServerError().await
}
}
}
/// Telemetry entry point. Listening by default on 127.0.0.1:8000. /// Telemetry entry point. Listening by default on 127.0.0.1:8000.
/// This can be changed using the `PORT` and `BIND` ENV variables. /// This can be changed using the `PORT` and `BIND` ENV variables.
fn main() -> std::io::Result<()> { #[actix_rt::main]
async fn main() -> std::io::Result<()> {
use web::{get, resource}; use web::{get, resource};
simple_logger::init_with_level(log::Level::Info).expect("Must be able to start a logger"); SimpleLogger::new().with_level(log::LevelFilter::Info).init().expect("Must be able to start a logger");
let opts: Opts = Opts::parse(); let opts: Opts = Opts::parse();
let sys = System::new("substrate-telemetry");
let aggregator = Aggregator::new().start(); let aggregator = Aggregator::new().start();
let factory = LocatorFactory::new(); let factory = LocatorFactory::new();
let locator = SyncArbiter::start(4, move || factory.create()); let locator = SyncArbiter::start(4, move || factory.create());
@@ -127,13 +143,12 @@ fn main() -> std::io::Result<()> {
.service(resource("/submit/").route(get().to(node_route))) .service(resource("/submit/").route(get().to(node_route)))
.service(resource("/feed").route(get().to(feed_route))) .service(resource("/feed").route(get().to(feed_route)))
.service(resource("/feed/").route(get().to(feed_route))) .service(resource("/feed/").route(get().to(feed_route)))
// .service(resource("/network_state/{chain}/{nid}").route(get().to_async(state_route))) .service(resource("/network_state/{chain}/{nid}").route(get().to(state_route)))
// .service(resource("/network_state/{chain}/{nid}/").route(get().to_async(state_route))) .service(resource("/network_state/{chain}/{nid}/").route(get().to(state_route)))
// .service(resource("/health").route(get().to_async(health))) .service(resource("/health").route(get().to(health)))
// .service(resource("/health/").route(get().to_async(health))) .service(resource("/health/").route(get().to(health)))
}) })
.bind(format!("{}", opts.socket))? .bind(format!("{}", opts.socket))?
.start(); .run()
.await
sys.run()
} }
+9 -1
View File
@@ -148,7 +148,15 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
return; return;
} }
Ok(ws::Message::Nop) => return, Ok(ws::Message::Nop) => return,
Err(error) => error!("{:?}", error), Ok(ws::Message::Continuation(_)) => {
log::error!("Continuation not supported");
return;
}
Err(error) => {
log::error!("{:?}", error);
ctx.stop();
return;
}
}; };
match serde_json::from_slice(&data) { match serde_json::from_slice(&data) {
+2 -2
View File
@@ -98,7 +98,7 @@ impl Handler<LocateRequest> for Locator {
let location = match self.iplocate(ip) { let location = match self.iplocate(ip) {
Ok(location) => location, Ok(location) => location,
Err(err) => return debug!("GET error for ip location: {:?}", err), Err(err) => return log::debug!("GET error for ip location: {:?}", err),
}; };
self.cache.write().insert(ip, location.clone()); self.cache.write().insert(ip, location.clone());
@@ -140,7 +140,7 @@ impl Locator {
match self.client.get(url).send()?.json::<T>() { match self.client.get(url).send()?.json::<T>() {
Ok(result) => Ok(Some(result)), Ok(result) => Ok(Some(result)),
Err(err) => { Err(err) => {
debug!("JSON error for ip location: {:?}", err); log::debug!("JSON error for ip location: {:?}", err);
Ok(None) Ok(None)
} }
} }