From e1daa07c51ef8c9963ac824a1b83539018397283 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Fri, 4 Jun 2021 09:40:37 +0200 Subject: [PATCH] Re-commit "Preparing backend to receive data from shards (#337)" This reverts commit 87e3c52b35fc9473271752142c4e61e541b22d88. --- backend/Cargo.lock | 31 +++++ backend/Cargo.toml | 32 +---- backend/core/Cargo.toml | 29 +++++ backend/{ => core}/src/aggregator.rs | 0 backend/{ => core}/src/chain.rs | 0 backend/{ => core}/src/feed.rs | 0 backend/{ => core}/src/feed/connector.rs | 0 backend/{ => core}/src/main.rs | 23 ++++ backend/{ => core}/src/node.rs | 0 backend/{ => core}/src/node/connector.rs | 0 backend/{ => core}/src/node/message.rs | 0 backend/core/src/shard.rs | 13 ++ backend/core/src/shard/connector.rs | 156 +++++++++++++++++++++++ backend/{ => core}/src/types.rs | 0 backend/{ => core}/src/util.rs | 0 backend/{ => core}/src/util/dense_map.rs | 0 backend/{ => core}/src/util/hash.rs | 43 ++++++- backend/{ => core}/src/util/location.rs | 0 backend/{ => core}/src/util/mean_list.rs | 0 backend/{ => core}/src/util/num_stats.rs | 0 20 files changed, 295 insertions(+), 32 deletions(-) create mode 100644 backend/core/Cargo.toml rename backend/{ => core}/src/aggregator.rs (100%) rename backend/{ => core}/src/chain.rs (100%) rename backend/{ => core}/src/feed.rs (100%) rename backend/{ => core}/src/feed/connector.rs (100%) rename backend/{ => core}/src/main.rs (90%) rename backend/{ => core}/src/node.rs (100%) rename backend/{ => core}/src/node/connector.rs (100%) rename backend/{ => core}/src/node/message.rs (100%) create mode 100644 backend/core/src/shard.rs create mode 100644 backend/core/src/shard/connector.rs rename backend/{ => core}/src/types.rs (100%) rename backend/{ => core}/src/util.rs (100%) rename backend/{ => core}/src/util/dense_map.rs (100%) rename backend/{ => core}/src/util/hash.rs (63%) rename backend/{ => core}/src/util/location.rs (100%) rename backend/{ => core}/src/util/mean_list.rs (100%) rename backend/{ => core}/src/util/num_stats.rs (100%) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 36ffa80..f3fca55 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -340,6 +340,15 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -1688,6 +1697,7 @@ dependencies = [ "actix-http", "actix-web", "actix-web-actors", + "bincode", "bytes", "chrono", "clap", @@ -1703,6 +1713,7 @@ dependencies = [ "serde", "serde_json", "simple_logger", + "thiserror", ] [[package]] @@ -1737,6 +1748,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.0.1" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 739d66b..2a6589f 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,30 +1,10 @@ -[package] -name = "telemetry" -version = "0.3.0" -authors = ["Parity Technologies Ltd. "] -edition = "2018" -license = "GPL-3.0" +[workspace] +members = [ + "core", +] -[dependencies] -actix = "0.11.1" -actix-web = { version = "4.0.0-beta.4", default-features = false } -actix-web-actors = "4.0.0-beta.3" -actix-http = "3.0.0-beta.4" -bytes = "1.0.1" -chrono = { version = "0.4", features = ["serde"] } -fnv = "1.0.7" -hex = "0.4.3" -serde = { version = "1.0", features = ["derive"] } -serde_json = { version = "1.0", features = ["raw_value"] } -primitive-types = { version = "0.9.0", features = ["serde"] } -log = "0.4" -simple_logger = "1.11.0" -num-traits = "0.2" -parking_lot = "0.11" -reqwest = { version = "0.11.1", features = ["blocking", "json"] } -rustc-hash = "1.1.0" -clap = "3.0.0-beta.2" -ctor = "0.1.20" +[profile.dev] +opt-level = 3 [profile.release] lto = true diff --git a/backend/core/Cargo.toml b/backend/core/Cargo.toml new file mode 100644 index 0000000..8ea9c50 --- /dev/null +++ b/backend/core/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "telemetry" +version = "0.3.0" +authors = ["Parity Technologies Ltd. "] +edition = "2018" +license = "GPL-3.0" + +[dependencies] +actix = "0.11.1" +actix-web = { version = "4.0.0-beta.4", default-features = false } +actix-web-actors = "4.0.0-beta.3" +actix-http = "3.0.0-beta.4" +bincode = "1.3.3" +bytes = "1.0.1" +chrono = { version = "0.4", features = ["serde"] } +fnv = "1.0.7" +hex = "0.4.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = ["raw_value"] } +thiserror = "1.0.24" +primitive-types = { version = "0.9.0", features = ["serde"] } +log = "0.4" +simple_logger = "1.11.0" +num-traits = "0.2" +parking_lot = "0.11" +reqwest = { version = "0.11.1", features = ["blocking", "json"] } +rustc-hash = "1.1.0" +clap = "3.0.0-beta.2" +ctor = "0.1.20" diff --git a/backend/src/aggregator.rs b/backend/core/src/aggregator.rs similarity index 100% rename from backend/src/aggregator.rs rename to backend/core/src/aggregator.rs diff --git a/backend/src/chain.rs b/backend/core/src/chain.rs similarity index 100% rename from backend/src/chain.rs rename to backend/core/src/chain.rs diff --git a/backend/src/feed.rs b/backend/core/src/feed.rs similarity index 100% rename from backend/src/feed.rs rename to backend/core/src/feed.rs diff --git a/backend/src/feed/connector.rs b/backend/core/src/feed/connector.rs similarity index 100% rename from backend/src/feed/connector.rs rename to backend/core/src/feed/connector.rs diff --git a/backend/src/main.rs b/backend/core/src/main.rs similarity index 90% rename from backend/src/main.rs rename to backend/core/src/main.rs index c2a762e..3064cf1 100644 --- a/backend/src/main.rs +++ b/backend/core/src/main.rs @@ -13,12 +13,14 @@ mod aggregator; mod chain; mod feed; mod node; +mod shard; mod types; mod util; use aggregator::{Aggregator, GetHealth, GetNetworkState}; use feed::connector::FeedConnector; use node::connector::NodeConnector; +use shard::connector::ShardConnector; use types::NodeId; use util::{Locator, LocatorFactory}; @@ -103,6 +105,27 @@ async fn node_route( ))) } +#[get("/shard_submit/{chain_hash}")] +async fn shard_route( + req: HttpRequest, + stream: web::Payload, + aggregator: web::Data>, + path: web::Path>, +) -> Result { + let hash_str = path.into_inner(); + let genesis_hash = hash_str.parse()?; + + let mut res = ws::handshake(&req)?; + + let aggregator = aggregator.get_ref().clone(); + + Ok(res.streaming(ws::WebsocketContext::with_codec( + ShardConnector::new(aggregator, genesis_hash), + stream, + Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit + ))) +} + /// Entry point for connecting feeds #[get("/feed")] async fn feed_route( diff --git a/backend/src/node.rs b/backend/core/src/node.rs similarity index 100% rename from backend/src/node.rs rename to backend/core/src/node.rs diff --git a/backend/src/node/connector.rs b/backend/core/src/node/connector.rs similarity index 100% rename from backend/src/node/connector.rs rename to backend/core/src/node/connector.rs diff --git a/backend/src/node/message.rs b/backend/core/src/node/message.rs similarity index 100% rename from backend/src/node/message.rs rename to backend/core/src/node/message.rs diff --git a/backend/core/src/shard.rs b/backend/core/src/shard.rs new file mode 100644 index 0000000..30c7678 --- /dev/null +++ b/backend/core/src/shard.rs @@ -0,0 +1,13 @@ +use crate::node::message::Payload; +use serde::Deserialize; + +pub mod connector; + +/// Alias for the ID of the node connection +type ShardConnId = usize; + +#[derive(Deserialize)] +pub struct ShardMessage { + pub conn_id: ShardConnId, + pub payload: Payload, +} diff --git a/backend/core/src/shard/connector.rs b/backend/core/src/shard/connector.rs new file mode 100644 index 0000000..111ad1f --- /dev/null +++ b/backend/core/src/shard/connector.rs @@ -0,0 +1,156 @@ +use std::mem; +use std::time::{Duration, Instant}; + +use crate::aggregator::{AddNode, Aggregator}; +use crate::chain::{Chain, RemoveNode, UpdateNode}; +use crate::shard::ShardMessage; +use crate::types::NodeId; +use crate::util::{DenseMap, Hash}; +use actix::prelude::*; +use actix_http::ws::Item; +use actix_web_actors::ws::{self, CloseReason}; +use bincode::Options; +use bytes::{Bytes, BytesMut}; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); +/// Continuation buffer limit, 10mb +const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024; + +pub struct ShardConnector { + /// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT), + hb: Instant, + /// Aggregator actor address + aggregator: Addr, + /// Genesis hash of the chain this connection will be submitting data for + genesis_hash: Hash, + /// Chain address to which this multiplex connector is delegating messages + chain: Option>, + /// Mapping `ShardConnId` to `NodeId` + nodes: DenseMap, + /// Buffer for constructing continuation messages + contbuf: BytesMut, +} + +impl Actor for ShardConnector { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.heartbeat(ctx); + } + + fn stopped(&mut self, _: &mut Self::Context) { + if let Some(ref chain) = self.chain { + for (_, nid) in self.nodes.iter() { + chain.do_send(RemoveNode(*nid)) + } + } + } +} + +impl ShardConnector { + pub fn new(aggregator: Addr, genesis_hash: Hash) -> Self { + Self { + hb: Instant::now(), + aggregator, + genesis_hash, + chain: None, + nodes: DenseMap::new(), + contbuf: BytesMut::new(), + } + } + + fn heartbeat(&self, ctx: &mut ::Context) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // stop actor + ctx.close(Some(CloseReason { + code: ws::CloseCode::Abnormal, + description: Some("Missed heartbeat".into()), + })); + ctx.stop(); + } + }); + } + + fn handle_message(&mut self, msg: ShardMessage, ctx: &mut ::Context) { + let ShardMessage { conn_id, payload } = msg; + + // TODO: get `NodeId` for `ShardConnId` and proxy payload to `self.chain`. + } + + fn start_frame(&mut self, bytes: &[u8]) { + if !self.contbuf.is_empty() { + log::error!("Unused continuation buffer"); + self.contbuf.clear(); + } + self.continue_frame(bytes); + } + + fn continue_frame(&mut self, bytes: &[u8]) { + if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT { + self.contbuf.extend_from_slice(&bytes); + } else { + log::error!("Continuation buffer overflow"); + self.contbuf = BytesMut::new(); + } + } + + fn finish_frame(&mut self) -> Bytes { + mem::replace(&mut self.contbuf, BytesMut::new()).freeze() + } +} + +impl StreamHandler> for ShardConnector { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + self.hb = Instant::now(); + + let data = match msg { + Ok(ws::Message::Ping(msg)) => { + ctx.pong(&msg); + return; + } + Ok(ws::Message::Pong(_)) => return, + Ok(ws::Message::Text(text)) => text.into_bytes(), + Ok(ws::Message::Binary(data)) => data, + Ok(ws::Message::Close(reason)) => { + ctx.close(reason); + ctx.stop(); + return; + } + Ok(ws::Message::Nop) => return, + Ok(ws::Message::Continuation(cont)) => match cont { + Item::FirstText(bytes) | Item::FirstBinary(bytes) => { + self.start_frame(&bytes); + return; + } + Item::Continue(bytes) => { + self.continue_frame(&bytes); + return; + } + Item::Last(bytes) => { + self.continue_frame(&bytes); + self.finish_frame() + } + }, + Err(error) => { + log::error!("{:?}", error); + ctx.stop(); + return; + } + }; + + match bincode::options().deserialize(&data) { + Ok(msg) => self.handle_message(msg, ctx), + #[cfg(debug)] + Err(err) => { + log::warn!("Failed to parse shard message: {}", err,) + } + #[cfg(not(debug))] + Err(_) => (), + } + } +} diff --git a/backend/src/types.rs b/backend/core/src/types.rs similarity index 100% rename from backend/src/types.rs rename to backend/core/src/types.rs diff --git a/backend/src/util.rs b/backend/core/src/util.rs similarity index 100% rename from backend/src/util.rs rename to backend/core/src/util.rs diff --git a/backend/src/util/dense_map.rs b/backend/core/src/util/dense_map.rs similarity index 100% rename from backend/src/util/dense_map.rs rename to backend/core/src/util/dense_map.rs diff --git a/backend/src/util/hash.rs b/backend/core/src/util/hash.rs similarity index 63% rename from backend/src/util/hash.rs rename to backend/core/src/util/hash.rs index db9a557..727602d 100644 --- a/backend/src/util/hash.rs +++ b/backend/core/src/util/hash.rs @@ -1,11 +1,13 @@ -use std::fmt::{self, Debug}; +use std::fmt::{self, Debug, Display}; +use std::str::FromStr; +use actix_web::error::ResponseError; use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor}; const HASH_BYTES: usize = 32; /// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`. -// We could use primitive_types::H256 here, but opted for a custom type to aboid more dependencies. +// We could use primitive_types::H256 here, but opted for a custom type to avoid more dependencies. #[derive(Hash, PartialEq, Eq, Clone, Copy)] pub struct Hash([u8; HASH_BYTES]); @@ -22,14 +24,23 @@ impl<'de> Visitor<'de> for HashVisitor { where E: de::Error, { + value + .parse() + .map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self)) + } +} + +impl FromStr for Hash { + type Err = HashParseError; + + fn from_str(value: &str) -> Result { if !value.starts_with("0x") { - return Err(de::Error::invalid_value(Unexpected::Str(value), &self)); + return Err(HashParseError::InvalidPrefix); } let mut hash = [0; HASH_BYTES]; - hex::decode_to_slice(&value[2..], &mut hash) - .map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self))?; + hex::decode_to_slice(&value[2..], &mut hash).map_err(HashParseError::HexError)?; Ok(Hash(hash)) } @@ -44,7 +55,7 @@ impl<'de> Deserialize<'de> for Hash { } } -impl Debug for Hash { +impl Display for Hash { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("0x")?; @@ -56,3 +67,23 @@ impl Debug for Hash { f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed")) } } + +impl Debug for Hash { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(self, f) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum HashParseError { + HexError(hex::FromHexError), + InvalidPrefix, +} + +impl Display for HashParseError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self, f) + } +} + +impl ResponseError for HashParseError {} diff --git a/backend/src/util/location.rs b/backend/core/src/util/location.rs similarity index 100% rename from backend/src/util/location.rs rename to backend/core/src/util/location.rs diff --git a/backend/src/util/mean_list.rs b/backend/core/src/util/mean_list.rs similarity index 100% rename from backend/src/util/mean_list.rs rename to backend/core/src/util/mean_list.rs diff --git a/backend/src/util/num_stats.rs b/backend/core/src/util/num_stats.rs similarity index 100% rename from backend/src/util/num_stats.rs rename to backend/core/src/util/num_stats.rs