diff --git a/backend/Cargo.lock b/backend/Cargo.lock index a9e2efe..2746e9a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -2147,6 +2147,7 @@ version = "0.2.1" dependencies = [ "actix 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-rt 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7e05620..2f2380b 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -7,6 +7,7 @@ license = "GPL-3.0" [dependencies] actix = "0.10.0" +actix-rt = "1.1.1" actix-web = "3.0.1" actix-web-actors = "3.0.0" actix-http = "2.0.0" diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 1126b43..ff8ab47 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -121,15 +121,13 @@ pub struct DropChain(pub ChainId); pub struct RenameChain(pub ChainId, pub Label); /// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain +#[derive(Message)] +#[rtype(result = "bool")] pub struct Subscribe { pub chain: Label, pub feed: Addr, } -impl Message for Subscribe { - type Result = bool; -} - /// Message sent from the FeedConnector to the Aggregator consensus requested #[derive(Message)] #[rtype(result = "()")] @@ -162,18 +160,14 @@ pub struct Disconnect(pub FeedId); pub struct NodeCount(pub ChainId, pub usize); /// Message sent to the Aggregator to get the network state of a particular node -// pub struct GetNetworkState(pub Box, pub NodeId); +#[derive(Message)] +#[rtype(result = "Option>")] +pub struct GetNetworkState(pub Box, pub NodeId); /// Message sent to the Aggregator to get a health check -// pub struct GetHealth; - -// impl Message for GetNetworkState { -// type Result = Option>; -// } - -// impl Message for GetHealth { -// type Result = usize; -// } +#[derive(Message)] +#[rtype(result = "usize")] +pub struct GetHealth; impl Handler for Aggregator { type Result = (); @@ -213,7 +207,7 @@ impl Handler for Aggregator { } self.serializer.push(feed::RemovedChain(label)); - info!("Dropped chain [{}] from the aggregator", label); + log::info!("Dropped chain [{}] from the aggregator", label); self.broadcast(); } @@ -292,7 +286,7 @@ impl Handler for Aggregator { let fid = self.feeds.add(connector.clone()); - info!("Feed #{} connected", fid); + log::info!("Feed #{} connected", fid); connector.do_send(Connected(fid)); @@ -315,7 +309,7 @@ impl Handler for Aggregator { fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) { let Disconnect(fid) = msg; - info!("Feed #{} disconnected", fid); + log::info!("Feed #{} disconnected", fid); self.feeds.remove(fid); } @@ -338,20 +332,20 @@ impl Handler for Aggregator { } } -// impl Handler for Aggregator { -// type Result = ::Result; +impl Handler for Aggregator { + type Result = ::Result; -// fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result { -// let GetNetworkState(chain, nid) = msg; + fn handle(&mut self, msg: GetNetworkState, _: &mut Self::Context) -> Self::Result { + let GetNetworkState(chain, nid) = msg; -// Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid))) -// } -// } + Some(self.get_chain(&*chain)?.addr.send(GetNodeNetworkState(nid))) + } +} -// impl Handler for Aggregator { -// type Result = usize; +impl Handler for Aggregator { + type Result = usize; -// fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result { -// self.chains.len() -// } -// } + fn handle(&mut self, _: GetHealth, _: &mut Self::Context) -> Self::Result { + self.chains.len() + } +} diff --git a/backend/src/chain.rs b/backend/src/chain.rs index 7e54858..0317282 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -46,7 +46,7 @@ pub struct Chain { impl Chain { pub fn new(cid: ChainId, aggregator: Addr, label: Label) -> Self { - info!("[{}] Created", label); + log::info!("[{}] Created", label); Chain { cid, @@ -279,7 +279,7 @@ impl Chain { if node.update_block(*block) { if block.height > self.best.height { self.best = *block; - info!( + log::info!( "[{}] [{}/{}] new best block ({}) {:?}", self.label.0, nodes_len, @@ -426,7 +426,7 @@ impl Handler for Chain { } if self.nodes.is_empty() { - info!("[{}] Lost all nodes, dropping...", self.label.0); + log::info!("[{}] Lost all nodes, dropping...", self.label.0); ctx.stop(); } diff --git a/backend/src/feed/connector.rs b/backend/src/feed/connector.rs index 40eaf88..2756eed 100644 --- a/backend/src/feed/connector.rs +++ b/backend/src/feed/connector.rs @@ -88,14 +88,12 @@ impl FeedConnector { feed: ctx.address(), }) .into_actor(self) - .then(|res, actor, _| { + .then(|res, act, _| { match res { Ok(true) => (), - // Chain not found, reset hash - _ => actor.chain_hash = 0, + _ => act.chain_hash = 0, } - - fut::ok(()) + async {}.into_actor(act) }) .wait(ctx); } @@ -145,25 +143,29 @@ pub struct Connected(pub FeedId); pub struct Serialized(pub Bytes); impl StreamHandler> for FeedConnector { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { - ws::Message::Ping(msg) => { + Ok(ws::Message::Ping(msg)) => { self.hb = Instant::now(); ctx.pong(&msg); } - ws::Message::Pong(_) => self.hb = Instant::now(), - ws::Message::Text(text) => { + Ok(ws::Message::Pong(_)) => self.hb = Instant::now(), + Ok(ws::Message::Text(text)) => { if let Some(idx) = text.find(':') { let cmd = &text[..idx]; let payload = &text[idx+1..]; - info!("New FEED message: {}", cmd); + log::info!("New FEED message: {}", cmd); self.handle_cmd(cmd, payload, ctx); } } - ws::Message::Close(_) => ctx.stop(), - _ => (), + Ok(ws::Message::Close(_)) => ctx.stop(), + Ok(_) => (), + Err(error) => { + log::error!("{:?}", error); + ctx.stop(); + } } } } diff --git a/backend/src/main.rs b/backend/src/main.rs index a6523be..9b27add 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,5 +1,5 @@ -#[macro_use] -extern crate log; +// #[macro_use] +// extern crate log; use std::net::Ipv4Addr; @@ -8,6 +8,7 @@ use actix_http::ws::Codec; use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web_actors::ws; use clap::Clap; +use simple_logger::SimpleLogger; mod aggregator; mod chain; @@ -40,7 +41,7 @@ struct Opts { } /// Entry point for connecting nodes -fn node_route( +async fn node_route( req: HttpRequest, stream: web::Payload, aggregator: web::Data>, @@ -65,7 +66,7 @@ fn node_route( } /// Entry point for connecting feeds -fn feed_route( +async fn feed_route( req: HttpRequest, stream: web::Payload, aggregator: web::Data>, @@ -77,44 +78,59 @@ fn feed_route( ) } -// fn state_route( -// path: web::Path<(Box, NodeId)>, -// aggregator: web::Data>, -// ) -> impl Future> { -// let (chain, nid) = path.into_inner(); +/// Entry point for network state dump +async fn state_route( + path: web::Path<(Box, NodeId)>, + aggregator: web::Data>, +) -> Result { + let (chain, nid) = path.into_inner(); -// aggregator -// .send(GetNetworkState(chain, nid)) -// .flatten() -// .from_err() -// .and_then(|data| match data.and_then(|nested| nested) { -// Some(body) => HttpResponse::Ok() -// .content_type("application/json") -// .body(body), -// None => HttpResponse::Ok() -// .body("Node has disconnected or has not submitted its network state yet"), -// }) -// } + let res = match aggregator.send(GetNetworkState(chain, nid)).await { + Ok(Some(res)) => res.await, + Ok(None) => Ok(None), + Err(error) => Err(error) + }; -// fn health( -// aggregator: web::Data>, -// ) -> impl Future> { -// aggregator.send(GetHealth).from_err().and_then(|count| { -// let body = format!("Connected chains: {}", count); + match res { + Ok(Some(body)) => { + HttpResponse::Ok().content_type("application/json").body(body).await + }, + Ok(None) => { + HttpResponse::Ok().body("Node has disconnected or has not submitted its network state yet").await + }, + Err(error) => { + log::error!("Network state mailbox error: {:?}", error); -// HttpResponse::Ok().body(body) -// }) -// } + HttpResponse::InternalServerError().await + } + } +} + +/// Entry check for health check monitoring bots +async fn health(aggregator: web::Data>) -> Result { + match aggregator.send(GetHealth).await { + Ok(count) => { + let body = format!("Connected chains: {}", count); + + HttpResponse::Ok().body(body).await + }, + Err(error) => { + log::error!("Health check mailbox error: {:?}", error); + + HttpResponse::InternalServerError().await + } + } +} /// Telemetry entry point. Listening by default on 127.0.0.1:8000. /// This can be changed using the `PORT` and `BIND` ENV variables. -fn main() -> std::io::Result<()> { +#[actix_rt::main] +async fn main() -> std::io::Result<()> { use web::{get, resource}; - simple_logger::init_with_level(log::Level::Info).expect("Must be able to start a logger"); + SimpleLogger::new().with_level(log::LevelFilter::Info).init().expect("Must be able to start a logger"); let opts: Opts = Opts::parse(); - let sys = System::new("substrate-telemetry"); let aggregator = Aggregator::new().start(); let factory = LocatorFactory::new(); let locator = SyncArbiter::start(4, move || factory.create()); @@ -127,13 +143,12 @@ fn main() -> std::io::Result<()> { .service(resource("/submit/").route(get().to(node_route))) .service(resource("/feed").route(get().to(feed_route))) .service(resource("/feed/").route(get().to(feed_route))) - // .service(resource("/network_state/{chain}/{nid}").route(get().to_async(state_route))) - // .service(resource("/network_state/{chain}/{nid}/").route(get().to_async(state_route))) - // .service(resource("/health").route(get().to_async(health))) - // .service(resource("/health/").route(get().to_async(health))) + .service(resource("/network_state/{chain}/{nid}").route(get().to(state_route))) + .service(resource("/network_state/{chain}/{nid}/").route(get().to(state_route))) + .service(resource("/health").route(get().to(health))) + .service(resource("/health/").route(get().to(health))) }) .bind(format!("{}", opts.socket))? - .start(); - - sys.run() + .run() + .await } diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index 7e9b94c..d6f09bb 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -148,7 +148,15 @@ impl StreamHandler> for NodeConnector { return; } Ok(ws::Message::Nop) => return, - Err(error) => error!("{:?}", error), + Ok(ws::Message::Continuation(_)) => { + log::error!("Continuation not supported"); + return; + } + Err(error) => { + log::error!("{:?}", error); + ctx.stop(); + return; + } }; match serde_json::from_slice(&data) { diff --git a/backend/src/util/location.rs b/backend/src/util/location.rs index 09ab871..d4bad3d 100644 --- a/backend/src/util/location.rs +++ b/backend/src/util/location.rs @@ -98,7 +98,7 @@ impl Handler for Locator { let location = match self.iplocate(ip) { Ok(location) => location, - Err(err) => return debug!("GET error for ip location: {:?}", err), + Err(err) => return log::debug!("GET error for ip location: {:?}", err), }; self.cache.write().insert(ip, location.clone()); @@ -140,7 +140,7 @@ impl Locator { match self.client.get(url).send()?.json::() { Ok(result) => Ok(Some(result)), Err(err) => { - debug!("JSON error for ip location: {:?}", err); + log::debug!("JSON error for ip location: {:?}", err); Ok(None) } }