From 87e3c52b35fc9473271752142c4e61e541b22d88 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Fri, 21 May 2021 12:39:35 +0200 Subject: [PATCH] Revert "Preparing backend to receive data from shards (#337)" This reverts commit f8b7128dca49f3959d2c746432627bef437a7bb1. --- backend/Cargo.lock | 31 ----- backend/Cargo.toml | 32 ++++- backend/core/Cargo.toml | 29 ----- backend/core/src/shard.rs | 13 -- backend/core/src/shard/connector.rs | 156 ----------------------- backend/{core => }/src/aggregator.rs | 0 backend/{core => }/src/chain.rs | 0 backend/{core => }/src/feed.rs | 0 backend/{core => }/src/feed/connector.rs | 0 backend/{core => }/src/main.rs | 23 ---- backend/{core => }/src/node.rs | 0 backend/{core => }/src/node/connector.rs | 0 backend/{core => }/src/node/message.rs | 0 backend/{core => }/src/types.rs | 0 backend/{core => }/src/util.rs | 0 backend/{core => }/src/util/dense_map.rs | 0 backend/{core => }/src/util/hash.rs | 43 +------ backend/{core => }/src/util/location.rs | 0 backend/{core => }/src/util/mean_list.rs | 0 backend/{core => }/src/util/num_stats.rs | 0 20 files changed, 32 insertions(+), 295 deletions(-) delete mode 100644 backend/core/Cargo.toml delete mode 100644 backend/core/src/shard.rs delete mode 100644 backend/core/src/shard/connector.rs rename backend/{core => }/src/aggregator.rs (100%) rename backend/{core => }/src/chain.rs (100%) rename backend/{core => }/src/feed.rs (100%) rename backend/{core => }/src/feed/connector.rs (100%) rename backend/{core => }/src/main.rs (90%) rename backend/{core => }/src/node.rs (100%) rename backend/{core => }/src/node/connector.rs (100%) rename backend/{core => }/src/node/message.rs (100%) rename backend/{core => }/src/types.rs (100%) rename backend/{core => }/src/util.rs (100%) rename backend/{core => }/src/util/dense_map.rs (100%) rename backend/{core => }/src/util/hash.rs (63%) rename backend/{core => }/src/util/location.rs (100%) rename backend/{core => }/src/util/mean_list.rs (100%) rename backend/{core => }/src/util/num_stats.rs (100%) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index f3fca55..36ffa80 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -340,15 +340,6 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bitflags" version = "1.2.1" @@ -1697,7 +1688,6 @@ dependencies = [ "actix-http", "actix-web", "actix-web-actors", - "bincode", "bytes", "chrono", "clap", @@ -1713,7 +1703,6 @@ dependencies = [ "serde", "serde_json", "simple_logger", - "thiserror", ] [[package]] @@ -1748,26 +1737,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "thiserror" -version = "1.0.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" -dependencies = [ - "thiserror-impl", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "thread_local" version = "1.0.1" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 2a6589f..739d66b 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,10 +1,30 @@ -[workspace] -members = [ - "core", -] +[package] +name = "telemetry" +version = "0.3.0" +authors = ["Parity Technologies Ltd. "] +edition = "2018" +license = "GPL-3.0" -[profile.dev] -opt-level = 3 +[dependencies] +actix = "0.11.1" +actix-web = { version = "4.0.0-beta.4", default-features = false } +actix-web-actors = "4.0.0-beta.3" +actix-http = "3.0.0-beta.4" +bytes = "1.0.1" +chrono = { version = "0.4", features = ["serde"] } +fnv = "1.0.7" +hex = "0.4.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = ["raw_value"] } +primitive-types = { version = "0.9.0", features = ["serde"] } +log = "0.4" +simple_logger = "1.11.0" +num-traits = "0.2" +parking_lot = "0.11" +reqwest = { version = "0.11.1", features = ["blocking", "json"] } +rustc-hash = "1.1.0" +clap = "3.0.0-beta.2" +ctor = "0.1.20" [profile.release] lto = true diff --git a/backend/core/Cargo.toml b/backend/core/Cargo.toml deleted file mode 100644 index 8ea9c50..0000000 --- a/backend/core/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "telemetry" -version = "0.3.0" -authors = ["Parity Technologies Ltd. "] -edition = "2018" -license = "GPL-3.0" - -[dependencies] -actix = "0.11.1" -actix-web = { version = "4.0.0-beta.4", default-features = false } -actix-web-actors = "4.0.0-beta.3" -actix-http = "3.0.0-beta.4" -bincode = "1.3.3" -bytes = "1.0.1" -chrono = { version = "0.4", features = ["serde"] } -fnv = "1.0.7" -hex = "0.4.3" -serde = { version = "1.0", features = ["derive"] } -serde_json = { version = "1.0", features = ["raw_value"] } -thiserror = "1.0.24" -primitive-types = { version = "0.9.0", features = ["serde"] } -log = "0.4" -simple_logger = "1.11.0" -num-traits = "0.2" -parking_lot = "0.11" -reqwest = { version = "0.11.1", features = ["blocking", "json"] } -rustc-hash = "1.1.0" -clap = "3.0.0-beta.2" -ctor = "0.1.20" diff --git a/backend/core/src/shard.rs b/backend/core/src/shard.rs deleted file mode 100644 index 30c7678..0000000 --- a/backend/core/src/shard.rs +++ /dev/null @@ -1,13 +0,0 @@ -use crate::node::message::Payload; -use serde::Deserialize; - -pub mod connector; - -/// Alias for the ID of the node connection -type ShardConnId = usize; - -#[derive(Deserialize)] -pub struct ShardMessage { - pub conn_id: ShardConnId, - pub payload: Payload, -} diff --git a/backend/core/src/shard/connector.rs b/backend/core/src/shard/connector.rs deleted file mode 100644 index 111ad1f..0000000 --- a/backend/core/src/shard/connector.rs +++ /dev/null @@ -1,156 +0,0 @@ -use std::mem; -use std::time::{Duration, Instant}; - -use crate::aggregator::{AddNode, Aggregator}; -use crate::chain::{Chain, RemoveNode, UpdateNode}; -use crate::shard::ShardMessage; -use crate::types::NodeId; -use crate::util::{DenseMap, Hash}; -use actix::prelude::*; -use actix_http::ws::Item; -use actix_web_actors::ws::{self, CloseReason}; -use bincode::Options; -use bytes::{Bytes, BytesMut}; - -/// How often heartbeat pings are sent -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); -/// How long before lack of client response causes a timeout -const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); -/// Continuation buffer limit, 10mb -const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024; - -pub struct ShardConnector { - /// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT), - hb: Instant, - /// Aggregator actor address - aggregator: Addr, - /// Genesis hash of the chain this connection will be submitting data for - genesis_hash: Hash, - /// Chain address to which this multiplex connector is delegating messages - chain: Option>, - /// Mapping `ShardConnId` to `NodeId` - nodes: DenseMap, - /// Buffer for constructing continuation messages - contbuf: BytesMut, -} - -impl Actor for ShardConnector { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.heartbeat(ctx); - } - - fn stopped(&mut self, _: &mut Self::Context) { - if let Some(ref chain) = self.chain { - for (_, nid) in self.nodes.iter() { - chain.do_send(RemoveNode(*nid)) - } - } - } -} - -impl ShardConnector { - pub fn new(aggregator: Addr, genesis_hash: Hash) -> Self { - Self { - hb: Instant::now(), - aggregator, - genesis_hash, - chain: None, - nodes: DenseMap::new(), - contbuf: BytesMut::new(), - } - } - - fn heartbeat(&self, ctx: &mut ::Context) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // stop actor - ctx.close(Some(CloseReason { - code: ws::CloseCode::Abnormal, - description: Some("Missed heartbeat".into()), - })); - ctx.stop(); - } - }); - } - - fn handle_message(&mut self, msg: ShardMessage, ctx: &mut ::Context) { - let ShardMessage { conn_id, payload } = msg; - - // TODO: get `NodeId` for `ShardConnId` and proxy payload to `self.chain`. - } - - fn start_frame(&mut self, bytes: &[u8]) { - if !self.contbuf.is_empty() { - log::error!("Unused continuation buffer"); - self.contbuf.clear(); - } - self.continue_frame(bytes); - } - - fn continue_frame(&mut self, bytes: &[u8]) { - if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT { - self.contbuf.extend_from_slice(&bytes); - } else { - log::error!("Continuation buffer overflow"); - self.contbuf = BytesMut::new(); - } - } - - fn finish_frame(&mut self) -> Bytes { - mem::replace(&mut self.contbuf, BytesMut::new()).freeze() - } -} - -impl StreamHandler> for ShardConnector { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - self.hb = Instant::now(); - - let data = match msg { - Ok(ws::Message::Ping(msg)) => { - ctx.pong(&msg); - return; - } - Ok(ws::Message::Pong(_)) => return, - Ok(ws::Message::Text(text)) => text.into_bytes(), - Ok(ws::Message::Binary(data)) => data, - Ok(ws::Message::Close(reason)) => { - ctx.close(reason); - ctx.stop(); - return; - } - Ok(ws::Message::Nop) => return, - Ok(ws::Message::Continuation(cont)) => match cont { - Item::FirstText(bytes) | Item::FirstBinary(bytes) => { - self.start_frame(&bytes); - return; - } - Item::Continue(bytes) => { - self.continue_frame(&bytes); - return; - } - Item::Last(bytes) => { - self.continue_frame(&bytes); - self.finish_frame() - } - }, - Err(error) => { - log::error!("{:?}", error); - ctx.stop(); - return; - } - }; - - match bincode::options().deserialize(&data) { - Ok(msg) => self.handle_message(msg, ctx), - #[cfg(debug)] - Err(err) => { - log::warn!("Failed to parse shard message: {}", err,) - } - #[cfg(not(debug))] - Err(_) => (), - } - } -} diff --git a/backend/core/src/aggregator.rs b/backend/src/aggregator.rs similarity index 100% rename from backend/core/src/aggregator.rs rename to backend/src/aggregator.rs diff --git a/backend/core/src/chain.rs b/backend/src/chain.rs similarity index 100% rename from backend/core/src/chain.rs rename to backend/src/chain.rs diff --git a/backend/core/src/feed.rs b/backend/src/feed.rs similarity index 100% rename from backend/core/src/feed.rs rename to backend/src/feed.rs diff --git a/backend/core/src/feed/connector.rs b/backend/src/feed/connector.rs similarity index 100% rename from backend/core/src/feed/connector.rs rename to backend/src/feed/connector.rs diff --git a/backend/core/src/main.rs b/backend/src/main.rs similarity index 90% rename from backend/core/src/main.rs rename to backend/src/main.rs index 3064cf1..c2a762e 100644 --- a/backend/core/src/main.rs +++ b/backend/src/main.rs @@ -13,14 +13,12 @@ mod aggregator; mod chain; mod feed; mod node; -mod shard; mod types; mod util; use aggregator::{Aggregator, GetHealth, GetNetworkState}; use feed::connector::FeedConnector; use node::connector::NodeConnector; -use shard::connector::ShardConnector; use types::NodeId; use util::{Locator, LocatorFactory}; @@ -105,27 +103,6 @@ async fn node_route( ))) } -#[get("/shard_submit/{chain_hash}")] -async fn shard_route( - req: HttpRequest, - stream: web::Payload, - aggregator: web::Data>, - path: web::Path>, -) -> Result { - let hash_str = path.into_inner(); - let genesis_hash = hash_str.parse()?; - - let mut res = ws::handshake(&req)?; - - let aggregator = aggregator.get_ref().clone(); - - Ok(res.streaming(ws::WebsocketContext::with_codec( - ShardConnector::new(aggregator, genesis_hash), - stream, - Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit - ))) -} - /// Entry point for connecting feeds #[get("/feed")] async fn feed_route( diff --git a/backend/core/src/node.rs b/backend/src/node.rs similarity index 100% rename from backend/core/src/node.rs rename to backend/src/node.rs diff --git a/backend/core/src/node/connector.rs b/backend/src/node/connector.rs similarity index 100% rename from backend/core/src/node/connector.rs rename to backend/src/node/connector.rs diff --git a/backend/core/src/node/message.rs b/backend/src/node/message.rs similarity index 100% rename from backend/core/src/node/message.rs rename to backend/src/node/message.rs diff --git a/backend/core/src/types.rs b/backend/src/types.rs similarity index 100% rename from backend/core/src/types.rs rename to backend/src/types.rs diff --git a/backend/core/src/util.rs b/backend/src/util.rs similarity index 100% rename from backend/core/src/util.rs rename to backend/src/util.rs diff --git a/backend/core/src/util/dense_map.rs b/backend/src/util/dense_map.rs similarity index 100% rename from backend/core/src/util/dense_map.rs rename to backend/src/util/dense_map.rs diff --git a/backend/core/src/util/hash.rs b/backend/src/util/hash.rs similarity index 63% rename from backend/core/src/util/hash.rs rename to backend/src/util/hash.rs index 727602d..db9a557 100644 --- a/backend/core/src/util/hash.rs +++ b/backend/src/util/hash.rs @@ -1,13 +1,11 @@ -use std::fmt::{self, Debug, Display}; -use std::str::FromStr; +use std::fmt::{self, Debug}; -use actix_web::error::ResponseError; use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor}; const HASH_BYTES: usize = 32; /// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`. -// We could use primitive_types::H256 here, but opted for a custom type to avoid more dependencies. +// We could use primitive_types::H256 here, but opted for a custom type to aboid more dependencies. #[derive(Hash, PartialEq, Eq, Clone, Copy)] pub struct Hash([u8; HASH_BYTES]); @@ -24,23 +22,14 @@ impl<'de> Visitor<'de> for HashVisitor { where E: de::Error, { - value - .parse() - .map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self)) - } -} - -impl FromStr for Hash { - type Err = HashParseError; - - fn from_str(value: &str) -> Result { if !value.starts_with("0x") { - return Err(HashParseError::InvalidPrefix); + return Err(de::Error::invalid_value(Unexpected::Str(value), &self)); } let mut hash = [0; HASH_BYTES]; - hex::decode_to_slice(&value[2..], &mut hash).map_err(HashParseError::HexError)?; + hex::decode_to_slice(&value[2..], &mut hash) + .map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self))?; Ok(Hash(hash)) } @@ -55,7 +44,7 @@ impl<'de> Deserialize<'de> for Hash { } } -impl Display for Hash { +impl Debug for Hash { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("0x")?; @@ -67,23 +56,3 @@ impl Display for Hash { f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed")) } } - -impl Debug for Hash { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - Display::fmt(self, f) - } -} - -#[derive(thiserror::Error, Debug)] -pub enum HashParseError { - HexError(hex::FromHexError), - InvalidPrefix, -} - -impl Display for HashParseError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - Debug::fmt(self, f) - } -} - -impl ResponseError for HashParseError {} diff --git a/backend/core/src/util/location.rs b/backend/src/util/location.rs similarity index 100% rename from backend/core/src/util/location.rs rename to backend/src/util/location.rs diff --git a/backend/core/src/util/mean_list.rs b/backend/src/util/mean_list.rs similarity index 100% rename from backend/core/src/util/mean_list.rs rename to backend/src/util/mean_list.rs diff --git a/backend/core/src/util/num_stats.rs b/backend/src/util/num_stats.rs similarity index 100% rename from backend/core/src/util/num_stats.rs rename to backend/src/util/num_stats.rs