diff --git a/backend/Cargo.lock b/backend/Cargo.lock index d05e8e9..7c52f7c 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -96,6 +96,12 @@ dependencies = [ "safemem", ] +[[package]] +name = "bumpalo" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" + [[package]] name = "byte-slice-cast" version = "1.0.0" @@ -114,6 +120,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +[[package]] +name = "cc" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a72c244c1ff497a746a7e1fb3d14bd08420ecda70c8f25c7112f2781652d787" + [[package]] name = "cfg-if" version = "1.0.0" @@ -177,6 +189,22 @@ dependencies = [ "thiserror", ] +[[package]] +name = "core-foundation" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" + [[package]] name = "cpufeatures" version = "0.1.4" @@ -201,6 +229,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "encoding_rs" +version = "0.8.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065" +dependencies = [ + "cfg-if", +] + [[package]] name = "fixed-hash" version = "0.7.0" @@ -219,6 +256,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -493,6 +545,19 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.2.3" @@ -550,12 +615,27 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "ipnet" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" + [[package]] name = "itoa" version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "js-sys" +version = "0.3.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83bdfbace3a0e81a4253f73b49e960b053e396a11012cbd49b9b74d6a2b67062" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -654,6 +734,24 @@ dependencies = [ "twoway", ] +[[package]] +name = "native-tls" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8d96b2e1c8da3957d58100b09f102c6d9cfdfced01b7ec5a8974044bb09dbd4" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -704,6 +802,39 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "549430950c79ae24e6d02e0b7404534ecf311d94cc9f861e9e4020187d13d885" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" + +[[package]] +name = "openssl-sys" +version = "0.9.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parity-scale-codec" version = "2.1.3" @@ -779,6 +910,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -962,6 +1099,41 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "rustc-hash" version = "1.1.0" @@ -986,6 +1158,16 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -998,6 +1180,29 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "security-framework" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e4effb91b4b8b6fb7732e670b6cee160278ff8e6bf485c7805d9e319d76e284" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.126" @@ -1202,7 +1407,10 @@ dependencies = [ "http", "log", "once_cell", + "parking_lot", "primitive-types", + "reqwest", + "rustc-hash", "serde", "serde_json", "simple_logger", @@ -1314,6 +1522,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.6" @@ -1495,6 +1713,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -1558,6 +1782,84 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasm-bindgen" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" +dependencies = [ + "cfg-if", + "serde", + "serde_json", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fba7978c679d53ce2d0ac80c8c175840feb849a161664365d1287b41f2e67f1" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" + +[[package]] +name = "web-sys" +version = "0.3.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1580,6 +1882,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi", +] + [[package]] name = "wyz" version = "0.2.0" diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index 9f4e0a7..e52c41f 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -16,7 +16,7 @@ impl std::convert::From for Id { } } -/// A struct that allows you to assign ID to an arbitrary set of +/// A struct that allows you to assign an ID to an arbitrary set of /// details (so long as they are Eq+Hash+Clone), and then access /// the assigned ID given those details or access the details given /// the ID. diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index b2b1b9b..a9c1d7d 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -9,9 +9,6 @@ use serde::{Deserialize, Serialize}; /// might send data on behalf of more than one chain. pub type LocalId = Id; -/// A global ID assigned to messages from each different pair of ConnId+LocalId. -pub type GlobalId = usize; - /// Message sent from the shard to the backend core #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromShardAggregator { @@ -37,6 +34,14 @@ pub enum FromShardAggregator { #[derive(Deserialize, Serialize, Debug, Clone)] pub enum FromTelemetryCore { Mute { - local_id: LocalId + local_id: LocalId, + reason: MuteReason } } + +/// Why is the thing being muted? +#[derive(Deserialize, Serialize, Debug, Clone)] +pub enum MuteReason { + Overquota, + ChainNotAllowed +} \ No newline at end of file diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index ec5b967..f0c068c 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -4,4 +4,5 @@ pub mod types; pub mod util; pub mod json; pub mod log_level; -pub mod assign_id; \ No newline at end of file +pub mod assign_id; +pub mod most_seen; \ No newline at end of file diff --git a/backend/common/src/most_seen.rs b/backend/common/src/most_seen.rs new file mode 100644 index 0000000..e2b0c64 --- /dev/null +++ b/backend/common/src/most_seen.rs @@ -0,0 +1,109 @@ +use std::collections::HashMap; +use std::hash::Hash; + +/// Add items to this, and it will keep track of what the item +/// seen the most is. +#[derive(Debug)] +pub struct MostSeen { + current_best: T, + current_count: usize, + others: HashMap +} + +impl MostSeen { + pub fn new(item: T) -> Self { + Self { + current_best: item, + current_count: 1, + others: HashMap::new() + } + } + pub fn best(&self) -> &T { + &self.current_best + } +} + +impl MostSeen { + pub fn insert(&mut self, item: &T) -> ChangeResult { + if &self.current_best == item { + // Item already the best one; bump count. + self.current_count += 1; + return ChangeResult::NoChange; + } + + // Item not the best; increment count in map + let item_count = self.others.entry(item.clone()).or_default(); + *item_count += 1; + + // Is item now the best? + if *item_count > self.current_count { + let (item, count) = self.others + .remove_entry(item) + .expect("item added above"); + self.current_best = item; + self.current_count = count; + + ChangeResult::NewMostSeenItem + } else { + ChangeResult::NoChange + } + } + pub fn remove(&mut self, item: &T) -> ChangeResult { + if &self.current_best == item { + // Item already the best one; reduce count + self.current_count -= 1; + + // Is there a new best? + let other_best = self.others + .iter() + .max_by_key(|f| f.1); + + let (other_item, &other_count) = match other_best { + Some(item) => item, + None => { return ChangeResult::NoChange } + }; + + if other_count > self.current_count { + // Clone item to unborrow self.others so that we can remove + // the item from it. We could pre-emptively remove and reinsert + // instead, but most of the time there is no change, so I'm + // aiming to keep that path cheaper. + let other_item = other_item.clone(); + let (other_item, other_count) = self.others + .remove_entry(&other_item) + .expect("item returned above, so def exists"); + + self.current_best = other_item; + self.current_count = other_count; + + return ChangeResult::NewMostSeenItem; + } else { + return ChangeResult::NoChange; + } + } + + // Item is in the map; not the best anyway. decrement count. + if let Some(count) = self.others.get_mut(item) { + *count += 1; + } + ChangeResult::NoChange + } +} + +/// Record the result of adding/removing an entry +#[derive(Clone,Copy)] +pub enum ChangeResult { + /// The best item has remained the same. + NoChange, + /// There is a new best item now. + NewMostSeenItem +} + +impl ChangeResult { + pub fn has_changed(self) -> bool { + match self { + ChangeResult::NewMostSeenItem => true, + ChangeResult::NoChange => false + } + } +} \ No newline at end of file diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index a6e5f1a..3c89a7e 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -202,7 +202,7 @@ impl Aggregator { let _ = tx_to_telemetry_core.send(FromShardAggregator::RemoveNode { local_id }).await; } }, - ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { local_id }) => { + ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { local_id, reason: _ }) => { // Ignore incoming messages if we're not connected to the backend: if !connected_to_telemetry_core { continue } diff --git a/backend/telemetry/Cargo.toml b/backend/telemetry/Cargo.toml index bfe352a..ace4369 100644 --- a/backend/telemetry/Cargo.toml +++ b/backend/telemetry/Cargo.toml @@ -14,7 +14,10 @@ hex = "0.4.3" http = "0.2.4" log = "0.4.14" once_cell = "1.8.0" +parking_lot = "0.11.1" primitive-types = { version = "0.9.0", features = ["serde"] } +reqwest = { version = "0.11.4", features = ["json"] } +rustc-hash = "1.1.0" serde = { version = "1.0.126", features = ["derive"] } serde_json = "1.0.64" simple_logger = "1.11.0" diff --git a/backend/telemetry/src/aggregator.rs b/backend/telemetry/src/aggregator.rs deleted file mode 100644 index 8b1e227..0000000 --- a/backend/telemetry/src/aggregator.rs +++ /dev/null @@ -1,381 +0,0 @@ -use common::{ - internal_messages::{GlobalId, LocalId}, - node, - util::now -}; -use bimap::BiMap; -use std::{str::FromStr, sync::Arc}; -use std::sync::atomic::AtomicU64; -use futures::channel::{ mpsc, oneshot }; -use futures::{ Sink, SinkExt, StreamExt }; -use tokio::net::TcpStream; -use tokio_util::compat::{ TokioAsyncReadCompatExt }; -use std::collections::{ HashMap, HashSet }; -use crate::state::State; -use crate::feed_message::{ self, FeedMessageSerializer }; - -/// A unique Id is assigned per websocket connection (or more accurately, -/// per feed socket and per shard socket). This can be combined with the -/// [`LocalId`] of messages to give us a global ID. -type ConnId = u64; - -/// Incoming messages come via subscriptions, and end up looking like this. -#[derive(Debug)] -enum ToAggregator { - FromShardWebsocket(ConnId, FromShardWebsocket), - FromFeedWebsocket(ConnId, FromFeedWebsocket), -} - -/// An incoming shard connection can send these messages to the aggregator. -#[derive(Debug)] -pub enum FromShardWebsocket { - /// When the socket is opened, it'll send this first - /// so that we have a way to communicate back to it. - Initialize { - channel: mpsc::Sender, - }, - /// Tell the aggregator about a new node. - Add { - local_id: LocalId, - ip: Option, - node: common::types::NodeDetails, - genesis_hash: common::types::BlockHash - }, - /// Update/pass through details about a node. - Update { - local_id: LocalId, - payload: node::Payload - }, - /// Tell the aggregator that a node has been removed when it disconnects. - Remove { - local_id: LocalId, - }, - /// The shard is disconnected. - Disconnected -} - -/// The aggregator can these messages back to a shard connection. -#[derive(Debug)] -pub enum ToShardWebsocket { - /// Mute messages to the core by passing the shard-local ID of them. - Mute { - local_id: LocalId - } -} - -/// An incoming feed connection can send these messages to the aggregator. -#[derive(Debug)] -pub enum FromFeedWebsocket { - /// When the socket is opened, it'll send this first - /// so that we have a way to communicate back to it. - /// Unbounded so that slow feeds don't block aggregato - /// progress. - Initialize { - channel: mpsc::UnboundedSender, - }, - /// The feed can subscribe to a chain to receive - /// messages relating to it. - Subscribe { - chain: Box - }, - /// The feed wants finality info for the chain, too. - SendFinality, - /// The feed doesn't want any more finality info for the chain. - NoMoreFinality, - /// An explicit ping message. - Ping { - chain: Box - }, - /// The feed is disconnected. - Disconnected -} - -// The frontend sends text based commands; parse them into these messages: -impl FromStr for FromFeedWebsocket { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - let (cmd, chain) = match s.find(':') { - Some(idx) => (&s[..idx], s[idx+1..].into()), - None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`")) - }; - match cmd { - "ping" => Ok(FromFeedWebsocket::Ping { chain }), - "subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }), - "send-finality" => Ok(FromFeedWebsocket::SendFinality), - "no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality), - _ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)) - } - } -} - -/// The aggregator can these messages back to a feed connection. -#[derive(Debug)] -pub enum ToFeedWebsocket { - Bytes(Vec) -} - -#[derive(Clone)] -pub struct Aggregator(Arc); - -struct AggregatorInternal { - /// Shards that connect are each assigned a unique connection ID. - /// This helps us know who to send messages back to (especially in - /// conjunction with the [`LocalId`] that messages will come with). - shard_conn_id: AtomicU64, - /// Feeds that connect have their own unique connection ID, too. - feed_conn_id: AtomicU64, - /// Send messages in to the aggregator from the outside via this. This is - /// stored here so that anybody holding an `Aggregator` handle can - /// make use of it. - tx_to_aggregator: mpsc::Sender -} - -impl Aggregator { - /// Spawn a new Aggregator. This connects to the telemetry backend - pub async fn spawn(denylist: Vec) -> anyhow::Result { - let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); - - // Handle any incoming messages in our handler loop: - tokio::spawn(Aggregator::handle_messages(rx_from_external, denylist)); - - // Return a handle to our aggregator: - Ok(Aggregator(Arc::new(AggregatorInternal { - shard_conn_id: AtomicU64::new(1), - feed_conn_id: AtomicU64::new(1), - tx_to_aggregator, - }))) - } - - // This is spawned into a separate task and handles any messages coming - // in to the aggregator. If nobody is tolding the tx side of the channel - // any more, this task will gracefully end. - async fn handle_messages(mut rx_from_external: mpsc::Receiver, denylist: Vec) { - - let mut node_state = State::new(denylist); - - // Maintain mappings from the shard connection ID and local ID of messages to a global ID - // that uniquely identifies nodes in our node state. - let mut global_ids: BiMap = BiMap::new(); - - // Keep track of channels to communicate with feeds and shards: - let mut feed_channels = HashMap::new(); - let mut shard_channels = HashMap::new(); - - // What chains have our feeds subscribed to (one at a time at the mo)? - // Both of these need to be kept in sync (should move to own struct eventually). - let mut feed_conn_id_to_chain: HashMap> = HashMap::new(); - let mut chain_to_feed_conn_ids: HashMap, HashSet> = HashMap::new(); - - // Which feeds want finality info too? - let mut feed_conn_id_finality: HashSet = HashSet::new(); - - // Now, loop and receive messages to handle. - while let Some(msg) = rx_from_external.next().await { - match msg { - // FROM FEED - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { mut channel }) => { - feed_channels.insert(feed_conn_id, channel.clone()); - - // Tell the new feed subscription some basic things to get it going: - let mut feed_serializer = FeedMessageSerializer::new(); - feed_serializer.push(feed_message::Version(31)); - for chain in node_state.iter_chains() { - feed_serializer.push(feed_message::AddedChain( - chain.label(), - chain.node_count() - )); - } - - // Send this to the channel that subscribed: - if let Some(bytes) = feed_serializer.into_finalized() { - let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await; - } - }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Ping { chain }) => { - let feed_channel = match feed_channels.get_mut(&feed_conn_id) { - Some(chan) => chan, - None => continue - }; - - // Pong! - let mut feed_serializer = FeedMessageSerializer::new(); - feed_serializer.push(feed_message::Pong(&chain)); - if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; - } - }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Subscribe { chain }) => { - let feed_channel = match feed_channels.get_mut(&feed_conn_id) { - Some(chan) => chan, - None => continue - }; - - // Unsubscribe from previous chain if subscribed to one: - let old_chain_label = feed_conn_id_to_chain.remove(&feed_conn_id); - if let Some(old_chain_label) = &old_chain_label { - if let Some(map) = chain_to_feed_conn_ids.get_mut(old_chain_label) { - map.remove(&feed_conn_id); - } - } - - // Untoggle request for finality feeds: - feed_conn_id_finality.remove(&feed_conn_id); - - // Get the chain we're subscribing to, ignoring the rest if it doesn't exist. - let chain = match node_state.get_chain_by_label(&chain) { - Some(chain) => chain, - None => continue - }; - - // Send messages to the feed about the new chain: - let mut feed_serializer = FeedMessageSerializer::new(); - if let Some(old_chain_label) = old_chain_label { - feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label)); - } - feed_serializer.push(feed_message::SubscribedTo(chain.label())); - feed_serializer.push(feed_message::TimeSync(now())); - feed_serializer.push(feed_message::BestBlock ( - chain.best_block().height, - chain.timestamp(), - chain.average_block_time() - )); - feed_serializer.push(feed_message::BestFinalized ( - chain.finalized_block().height, - chain.finalized_block().hash - )); - for (idx, (gid, node)) in node_state.get_nodes_in_chain(chain).enumerate() { - // Send subscription confirmation and chain head before doing all the nodes, - // and continue sending batches of 32 nodes a time over the wire subsequently - if idx % 32 == 0 { - if let Some(bytes) = feed_serializer.finalize() { - let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; - } - } - feed_serializer.push(feed_message::AddedNode(gid, node)); - feed_serializer.push(feed_message::FinalizedBlock( - gid, - node.finalized().height, - node.finalized().hash, - )); - if node.stale() { - feed_serializer.push(feed_message::StaleNode(gid)); - } - } - if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; - } - - // Actually make a note of the new chain subsciption: - feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into()); - chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id); - }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality) => { - feed_conn_id_finality.insert(feed_conn_id); - }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality) => { - feed_conn_id_finality.remove(&feed_conn_id); - }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Disconnected) => { - // The feed has disconnected; clean up references to it: - if let Some(chain) = feed_conn_id_to_chain.remove(&feed_conn_id) { - chain_to_feed_conn_ids.remove(&chain); - } - feed_channels.remove(&feed_conn_id); - feed_conn_id_finality.remove(&feed_conn_id); - }, - - // FROM SHARD - ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => { - shard_channels.insert(shard_conn_id, channel); - }, - ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node, genesis_hash }) => { - // Get globalId from add_node and store that against shard/local_id. - - // TODO: node_state.add_node. Every feed should know about node count changes. - }, - ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Remove { local_id }) => { - if let Some(id) = global_ids.remove_by_right(&(shard_conn_id, local_id)) { - // TODO: node_state.remove_node, Every feed should know about node count changes. - } - }, - ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => { - // TODO: Fill this all in... - let global_node_id = match global_ids.get_by_right(&(shard_conn_id, local_id)) { - Some(id) => id, - None => continue - }; - - if let Some(block) = payload.best_block() { - - } - - match payload { - node::Payload::SystemInterval(system_interval) => { - - }, - node::Payload::AfgAuthoritySet(_) => { - - }, - node::Payload::AfgFinalized(_) => { - - }, - node::Payload::AfgReceivedPrecommit(_) => { - - }, - node::Payload::AfgReceivedPrevote(_) => { - - }, - // This message should have been handled before the payload made it this far: - node::Payload::SystemConnected(_) => { - unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard"); - }, - // The following messages aren't handled at the moment. List them explicitly so - // that we have to make an explicit choice for any new messages: - node::Payload::BlockImport(_) | - node::Payload::NotifyFinalized(_) | - node::Payload::AfgReceivedCommit(_) | - node::Payload::TxPoolImport | - node::Payload::AfgFinalizedBlocksUpTo | - node::Payload::AuraPreSealedBlock | - node::Payload::PreparedBlockForProposing => {}, - } - - // TODO: node_state.update_node, then handle returned diffs - }, - ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Disconnected) => { - // The shard has disconnected; remove the shard channel, but also - // remove any nodes associated with the shard, firing the relevant feed messages. - } - } - } - } - - /// Return a sink that a shard can send messages into to be handled by the aggregator. - pub fn subscribe_shard(&self) -> impl Sink + Unpin { - // Assign a unique aggregator-local ID to each connection that subscribes, and pass - // that along with every message to the aggregator loop: - let shard_conn_id: ConnId = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let tx_to_aggregator = self.0.tx_to_aggregator.clone(); - - // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, - // but pinning by boxing is the easy solution for now: - Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(ToAggregator::FromShardWebsocket(shard_conn_id, msg)) - })) - } - - /// Return a sink that a feed can send messages into to be handled by the aggregator. - pub fn subscribe_feed(&self) -> impl Sink + Unpin { - // Assign a unique aggregator-local ID to each connection that subscribes, and pass - // that along with every message to the aggregator loop: - let feed_conn_id: ConnId = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let tx_to_aggregator = self.0.tx_to_aggregator.clone(); - - // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, - // but pinning by boxing is the easy solution for now: - Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(ToAggregator::FromFeedWebsocket(feed_conn_id, msg)) - })) - } - -} \ No newline at end of file diff --git a/backend/telemetry/src/aggregator/aggregator.rs b/backend/telemetry/src/aggregator/aggregator.rs new file mode 100644 index 0000000..90e3b1f --- /dev/null +++ b/backend/telemetry/src/aggregator/aggregator.rs @@ -0,0 +1,83 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use futures::channel::mpsc; +use futures::{ Sink, SinkExt }; +use super::inner_loop; + +/// A unique Id is assigned per websocket connection (or more accurately, +/// per feed socket and per shard socket). This can be combined with the +/// [`LocalId`] of messages to give us a global ID. +type ConnId = u64; + +#[derive(Clone)] +pub struct Aggregator(Arc); + +struct AggregatorInternal { + /// Shards that connect are each assigned a unique connection ID. + /// This helps us know who to send messages back to (especially in + /// conjunction with the [`LocalId`] that messages will come with). + shard_conn_id: AtomicU64, + /// Feeds that connect have their own unique connection ID, too. + feed_conn_id: AtomicU64, + /// Send messages in to the aggregator from the outside via this. This is + /// stored here so that anybody holding an `Aggregator` handle can + /// make use of it. + tx_to_aggregator: mpsc::Sender +} + +impl Aggregator { + /// Spawn a new Aggregator. This connects to the telemetry backend + pub async fn spawn(denylist: Vec) -> anyhow::Result { + let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); + + // Handle any incoming messages in our handler loop: + tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_aggregator.clone(), denylist)); + + // Return a handle to our aggregator: + Ok(Aggregator(Arc::new(AggregatorInternal { + shard_conn_id: AtomicU64::new(1), + feed_conn_id: AtomicU64::new(1), + tx_to_aggregator, + }))) + } + + // This is spawned into a separate task and handles any messages coming + // in to the aggregator. If nobody is tolding the tx side of the channel + // any more, this task will gracefully end. + async fn handle_messages( + rx_from_external: mpsc::Receiver, + tx_to_aggregator: mpsc::Sender, + denylist: Vec + ) { + inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist).handle().await; + } + + /// Return a sink that a shard can send messages into to be handled by the aggregator. + pub fn subscribe_shard(&self) -> impl Sink + Unpin { + // Assign a unique aggregator-local ID to each connection that subscribes, and pass + // that along with every message to the aggregator loop: + let shard_conn_id: ConnId = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let tx_to_aggregator = self.0.tx_to_aggregator.clone(); + + // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, + // but pinning by boxing is the easy solution for now: + Box::pin(tx_to_aggregator.with(move |msg| async move { + Ok(inner_loop::ToAggregator::FromShardWebsocket(shard_conn_id, msg)) + })) + } + + /// Return a sink that a feed can send messages into to be handled by the aggregator. + pub fn subscribe_feed(&self) -> impl Sink + Unpin { + // Assign a unique aggregator-local ID to each connection that subscribes, and pass + // that along with every message to the aggregator loop: + let feed_conn_id: ConnId = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let tx_to_aggregator = self.0.tx_to_aggregator.clone(); + + // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, + // but pinning by boxing is the easy solution for now: + Box::pin(tx_to_aggregator.with(move |msg| async move { + Ok(inner_loop::ToAggregator::FromFeedWebsocket(feed_conn_id, msg)) + })) + } + +} \ No newline at end of file diff --git a/backend/telemetry/src/aggregator/find_location.rs b/backend/telemetry/src/aggregator/find_location.rs new file mode 100644 index 0000000..4aa2834 --- /dev/null +++ b/backend/telemetry/src/aggregator/find_location.rs @@ -0,0 +1,206 @@ +use std::net::Ipv4Addr; +use std::sync::Arc; + +use parking_lot::RwLock; +use rustc_hash::FxHashMap; +use serde::Deserialize; +use futures::{Sink, SinkExt, StreamExt}; +use futures::channel::mpsc; + +use common::types::NodeLocation; +use tokio::sync::Semaphore; + +/// The returned location is optional; it may be None if not found. +pub type Location = Option>; + +/// This is responsible for taking an IP address and attempting +/// to find a geographical location from this +pub fn find_location(response_chan: R) -> mpsc::UnboundedSender<(Id, Ipv4Addr)> +where + R: Sink<(Id, Option>)> + Unpin + Send + Clone + 'static, + Id: Clone + Send + 'static +{ + let (tx, mut rx) = mpsc::unbounded(); + + // cache entries + let mut cache: FxHashMap>> = FxHashMap::default(); + + // Default entry for localhost + cache.insert( + Ipv4Addr::new(127, 0, 0, 1), + Some(Arc::new(NodeLocation { + latitude: 52.516_6667, + longitude: 13.4, + city: "Berlin".into(), + })), + ); + + // Create a locator with our cache. This is used to obtain locations. + let locator = Locator::new(cache); + + // Spawn a loop to handle location requests + tokio::spawn(async move { + + // Allow 4 requests at a time. acquiring a token will block while the + // number of concurrent location requests is more than this. + let semaphore = Arc::new(Semaphore::new(4)); + + loop { + while let Some((id, ip_address)) = rx.next().await { + + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let mut response_chan = response_chan.clone(); + let locator = locator.clone(); + + // Once we have acquired our permit, spawn a task to avoid + // blocking this loop so that we can handle concurrent requests. + tokio::spawn(async move { + match locator.locate(ip_address).await { + Ok(loc) => { + let _ = response_chan.send((id,loc)).await; + }, + Err(e) => { + log::debug!("GET error for ip location: {:?}", e); + } + }; + + // ensure permit is moved into task by dropping it explicitly: + drop(permit); + }); + } + } + }); + + tx +} + +/// This struct can be used to make location requests, given +/// an IPV4 address. +#[derive(Clone)] +struct Locator { + client: reqwest::Client, + cache: Arc>>>>, +} + +impl Locator { + pub fn new(cache: FxHashMap>>) -> Self { + let client = reqwest::Client::new(); + + Locator { + client, + cache: Arc::new(RwLock::new(cache)) + } + } + + pub async fn locate(&self, ip: Ipv4Addr) -> Result>, reqwest::Error> { + // Return location quickly if it's cached: + let cached_loc = { + let cache_reader = self.cache.read(); + cache_reader.get(&ip).map(|o| o.clone()) + }; + if let Some(loc) = cached_loc { + return Ok(loc); + } + + // Look it up via the location services if not cached: + let location = self.iplocate_ipapi_co(ip).await?; + let location = match location { + Some(location) => Ok(Some(location)), + None => self.iplocate_ipinfo_io(ip).await, + }?; + + self.cache.write().insert(ip, location.clone()); + Ok(location) + } + + async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result>, reqwest::Error> { + let location = self + .query(&format!("https://ipapi.co/{}/json", ip)) + .await? + .map(Arc::new); + + Ok(location) + } + + async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result>, reqwest::Error> { + let location = self + .query(&format!("https://ipinfo.io/{}/json", ip)) + .await? + .and_then(|loc: IPApiLocate| loc.into_node_location().map(Arc::new)); + + Ok(location) + } + + async fn query(&self, url: &str) -> Result, reqwest::Error> + where for<'de> T: Deserialize<'de> + { + match self.client.get(url).send().await?.json::().await { + Ok(result) => Ok(Some(result)), + Err(err) => { + log::debug!("JSON error for ip location: {:?}", err); + Ok(None) + } + } + } +} + +/// This is the format returned from ipinfo.co, so we do +/// a little conversion to get it into the shape we want. +#[derive(Deserialize)] +struct IPApiLocate { + city: Box, + loc: Box, +} + +impl IPApiLocate { + fn into_node_location(self) -> Option { + let IPApiLocate { city, loc } = self; + + let mut loc = loc.split(',').map(|n| n.parse()); + + let latitude = loc.next()?.ok()?; + let longitude = loc.next()?.ok()?; + + // Guarantee that the iterator has been exhausted + if loc.next().is_some() { + return None; + } + + Some(NodeLocation { + latitude, + longitude, + city, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ipapi_locate_to_node_location() { + let ipapi = IPApiLocate { + loc: "12.5,56.25".into(), + city: "Foobar".into(), + }; + + let location = ipapi.into_node_location().unwrap(); + + assert_eq!(location.latitude, 12.5); + assert_eq!(location.longitude, 56.25); + assert_eq!(&*location.city, "Foobar"); + } + + #[test] + fn ipapi_locate_to_node_location_too_many() { + let ipapi = IPApiLocate { + loc: "12.5,56.25,1.0".into(), + city: "Foobar".into(), + }; + + let location = ipapi.into_node_location(); + + assert!(location.is_none()); + } +} \ No newline at end of file diff --git a/backend/telemetry/src/aggregator/inner_loop.rs b/backend/telemetry/src/aggregator/inner_loop.rs new file mode 100644 index 0000000..cc77daf --- /dev/null +++ b/backend/telemetry/src/aggregator/inner_loop.rs @@ -0,0 +1,423 @@ +use common::{ + internal_messages::{ + self, + LocalId, + MuteReason + }, + node, + util::now +}; +use bimap::BiMap; +use std::{iter::FromIterator, net::Ipv4Addr, str::FromStr}; +use futures::channel::{ mpsc }; +use futures::{ future, SinkExt, StreamExt }; +use std::collections::{ HashMap, HashSet }; +use crate::state::{ self, State, NodeId }; +use crate::feed_message::{ self, FeedMessageSerializer }; +use super::find_location::{ self, find_location }; + +/// A unique Id is assigned per websocket connection (or more accurately, +/// per feed socket and per shard socket). This can be combined with the +/// [`LocalId`] of messages to give us a global ID. +type ConnId = u64; + +/// Incoming messages come via subscriptions, and end up looking like this. +#[derive(Clone,Debug)] +pub enum ToAggregator { + FromShardWebsocket(ConnId, FromShardWebsocket), + FromFeedWebsocket(ConnId, FromFeedWebsocket), + FromFindLocation(NodeId, find_location::Location) +} + +/// An incoming shard connection can send these messages to the aggregator. +#[derive(Clone,Debug)] +pub enum FromShardWebsocket { + /// When the socket is opened, it'll send this first + /// so that we have a way to communicate back to it. + Initialize { + channel: mpsc::Sender, + }, + /// Tell the aggregator about a new node. + Add { + local_id: LocalId, + ip: Option, + node: common::types::NodeDetails, + genesis_hash: common::types::BlockHash + }, + /// Update/pass through details about a node. + Update { + local_id: LocalId, + payload: node::Payload + }, + /// Tell the aggregator that a node has been removed when it disconnects. + Remove { + local_id: LocalId, + }, + /// The shard is disconnected. + Disconnected +} + +/// The aggregator can these messages back to a shard connection. +#[derive(Debug)] +pub enum ToShardWebsocket { + /// Mute messages to the core by passing the shard-local ID of them. + Mute { + local_id: LocalId, + reason: internal_messages::MuteReason + } +} + +/// An incoming feed connection can send these messages to the aggregator. +#[derive(Clone,Debug)] +pub enum FromFeedWebsocket { + /// When the socket is opened, it'll send this first + /// so that we have a way to communicate back to it. + /// Unbounded so that slow feeds don't block aggregato + /// progress. + Initialize { + channel: mpsc::UnboundedSender, + }, + /// The feed can subscribe to a chain to receive + /// messages relating to it. + Subscribe { + chain: Box + }, + /// The feed wants finality info for the chain, too. + SendFinality, + /// The feed doesn't want any more finality info for the chain. + NoMoreFinality, + /// An explicit ping message. + Ping { + chain: Box + }, + /// The feed is disconnected. + Disconnected +} + +// The frontend sends text based commands; parse them into these messages: +impl FromStr for FromFeedWebsocket { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + let (cmd, chain) = match s.find(':') { + Some(idx) => (&s[..idx], s[idx+1..].into()), + None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`")) + }; + match cmd { + "ping" => Ok(FromFeedWebsocket::Ping { chain }), + "subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }), + "send-finality" => Ok(FromFeedWebsocket::SendFinality), + "no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality), + _ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)) + } + } +} + +/// The aggregator can these messages back to a feed connection. +#[derive(Clone,Debug)] +pub enum ToFeedWebsocket { + Bytes(Vec) +} + +/// Instances of this are responsible for handling incoming and +/// outgoing messages in the main aggregator loop. +pub struct InnerLoop { + /// Messages from the outside world come into this: + rx_from_external: mpsc::Receiver, + + /// The state of our chains and nodes lives here: + node_state: State, + /// We maintain a mapping between NodeId and ConnId+LocalId, so that we know + /// which messages are about which nodes. + node_ids: BiMap, + + /// Keep track of how to send messages out to feeds. + feed_channels: HashMap>, + /// Keep track of how to send messages out to shards. + shard_channels: HashMap>, + + /// Which chain is a feed subscribed to? + feed_conn_id_to_chain: HashMap>, + /// Which feeds are subscribed to a given chain (needs to stay in sync with above)? + chain_to_feed_conn_ids: HashMap, HashSet>, + + /// These feeds want finality info, too. + feed_conn_id_finality: HashSet, + + /// Send messages here to make location requests, which are sent back into the loop. + tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)> +} + +impl InnerLoop { + /// Create a new inner loop handler with the various state it needs. + pub fn new( + rx_from_external: mpsc::Receiver, + tx_to_aggregator: mpsc::Sender, + denylist: Vec + ) -> Self { + + let tx_to_locator = find_location(tx_to_aggregator.with(|(node_id, msg)| { + future::ok::<_,mpsc::SendError>(ToAggregator::FromFindLocation(node_id, msg)) + })); + + InnerLoop { + rx_from_external, + node_state: State::new(denylist), + node_ids: BiMap::new(), + feed_channels: HashMap::new(), + shard_channels: HashMap::new(), + feed_conn_id_to_chain: HashMap::new(), + chain_to_feed_conn_ids: HashMap::new(), + feed_conn_id_finality: HashSet::new(), + tx_to_locator + } + } + + /// Start handling and responding to incoming messages. + pub async fn handle(mut self) { + while let Some(msg) = self.rx_from_external.next().await { + match msg { + ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { + self.handle_from_feed(feed_conn_id, msg).await + }, + ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { + self.handle_from_shard(shard_conn_id, msg).await + }, + ToAggregator::FromFindLocation(node_id, location) => { + self.handle_from_find_location(node_id, location).await + } + } + } + } + + async fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { + // TODO: Update node location here + } + + /// Handle messages coming from shards. + async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) { + match msg { + FromShardWebsocket::Initialize { channel } => { + self.shard_channels.insert(shard_conn_id, channel); + }, + FromShardWebsocket::Add { local_id, ip, node, genesis_hash } => { + match self.node_state.add_node(genesis_hash, node) { + state::AddNodeResult::ChainOnDenyList => { + if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { + let _ = shard_conn.send(ToShardWebsocket::Mute { + local_id, + reason: MuteReason::ChainNotAllowed + }).await; + } + }, + state::AddNodeResult::ChainOverQuota => { + if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { + let _ = shard_conn.send(ToShardWebsocket::Mute { + local_id, + reason: MuteReason::Overquota + }).await; + } + }, + state::AddNodeResult::NodeAddedToChain(details) => { + let node_id = details.id; + // Note the ID so that we know what node other messages are referring to: + self.node_ids.insert(node_id, (shard_conn_id, local_id)); + + let mut feed_serializer = FeedMessageSerializer::new(); + feed_serializer.push(feed_message::AddedNode(node_id, details.node)); + let chain_label = details.chain.label().to_owned(); + + if let Some(bytes) = feed_serializer.into_finalized() { + self.broadcast_to_chain_feeds( + &chain_label, + ToFeedWebsocket::Bytes(bytes) + ).await + } + + // TODO: The node has been added. use it's IP to find a location. + }, + } + }, + FromShardWebsocket::Remove { local_id } => { + if let Some(node_id) = self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { + // TODO: node_state.remove_node, Every feed should know about node count changes. + } + }, + FromShardWebsocket::Update { local_id, payload } => { + // TODO: Fill this all in... + let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) { + Some(id) => id, + None => return + }; + + if let Some(block) = payload.best_block() { + + } + + match payload { + node::Payload::SystemInterval(system_interval) => { + + }, + node::Payload::AfgAuthoritySet(_) => { + + }, + node::Payload::AfgFinalized(_) => { + + }, + node::Payload::AfgReceivedPrecommit(_) => { + + }, + node::Payload::AfgReceivedPrevote(_) => { + + }, + // This message should have been handled before the payload made it this far: + node::Payload::SystemConnected(_) => { + unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard"); + }, + // The following messages aren't handled at the moment. List them explicitly so + // that we have to make an explicit choice for any new messages: + node::Payload::BlockImport(_) | + node::Payload::NotifyFinalized(_) | + node::Payload::AfgReceivedCommit(_) | + node::Payload::TxPoolImport | + node::Payload::AfgFinalizedBlocksUpTo | + node::Payload::AuraPreSealedBlock | + node::Payload::PreparedBlockForProposing => {}, + } + + // TODO: node_state.update_node, then handle returned diffs + }, + FromShardWebsocket::Disconnected => { + // The shard has disconnected; remove the shard channel, but also + // remove any nodes associated with the shard, firing the relevant feed messages. + } + } + } + + /// Handle messages coming from feeds. + async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) { + match msg { + FromFeedWebsocket::Initialize { mut channel } => { + self.feed_channels.insert(feed_conn_id, channel.clone()); + + // Tell the new feed subscription some basic things to get it going: + let mut feed_serializer = FeedMessageSerializer::new(); + feed_serializer.push(feed_message::Version(31)); + for chain in self.node_state.iter_chains() { + feed_serializer.push(feed_message::AddedChain( + chain.label(), + chain.node_count() + )); + } + + // Send this to the channel that subscribed: + if let Some(bytes) = feed_serializer.into_finalized() { + let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await; + } + }, + FromFeedWebsocket::Ping { chain } => { + let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) { + Some(chan) => chan, + None => return + }; + + // Pong! + let mut feed_serializer = FeedMessageSerializer::new(); + feed_serializer.push(feed_message::Pong(&chain)); + if let Some(bytes) = feed_serializer.into_finalized() { + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; + } + }, + FromFeedWebsocket::Subscribe { chain } => { + let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) { + Some(chan) => chan, + None => return + }; + + // Unsubscribe from previous chain if subscribed to one: + let old_chain_label = self.feed_conn_id_to_chain.remove(&feed_conn_id); + if let Some(old_chain_label) = &old_chain_label { + if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_chain_label) { + map.remove(&feed_conn_id); + } + } + + // Untoggle request for finality feeds: + self.feed_conn_id_finality.remove(&feed_conn_id); + + // Get the chain we're subscribing to, ignoring the rest if it doesn't exist. + let chain = match self.node_state.get_chain_by_label(&chain) { + Some(chain) => chain, + None => return + }; + + // Send messages to the feed about the new chain: + let mut feed_serializer = FeedMessageSerializer::new(); + if let Some(old_chain_label) = old_chain_label { + feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label)); + } + feed_serializer.push(feed_message::SubscribedTo(chain.label())); + feed_serializer.push(feed_message::TimeSync(now())); + feed_serializer.push(feed_message::BestBlock ( + chain.best_block().height, + chain.timestamp(), + chain.average_block_time() + )); + feed_serializer.push(feed_message::BestFinalized ( + chain.finalized_block().height, + chain.finalized_block().hash + )); + for (idx, (gid, node)) in chain.nodes().enumerate() { + // Send subscription confirmation and chain head before doing all the nodes, + // and continue sending batches of 32 nodes a time over the wire subsequently + if idx % 32 == 0 { + if let Some(bytes) = feed_serializer.finalize() { + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; + } + } + feed_serializer.push(feed_message::AddedNode(gid, node)); + feed_serializer.push(feed_message::FinalizedBlock( + gid, + node.finalized().height, + node.finalized().hash, + )); + if node.stale() { + feed_serializer.push(feed_message::StaleNode(gid)); + } + } + if let Some(bytes) = feed_serializer.into_finalized() { + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; + } + + // Actually make a note of the new chain subsciption: + self.feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into()); + self.chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id); + }, + FromFeedWebsocket::SendFinality => { + self.feed_conn_id_finality.insert(feed_conn_id); + }, + FromFeedWebsocket::NoMoreFinality => { + self.feed_conn_id_finality.remove(&feed_conn_id); + }, + FromFeedWebsocket::Disconnected => { + // The feed has disconnected; clean up references to it: + if let Some(chain) = self.feed_conn_id_to_chain.remove(&feed_conn_id) { + self.chain_to_feed_conn_ids.remove(&chain); + } + self.feed_channels.remove(&feed_conn_id); + self.feed_conn_id_finality.remove(&feed_conn_id); + }, + } + } + + /// Send a message to all chain feeds. + async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) { + if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) { + for &feed_id in feeds { + // How much faster would it be if we processed these in parallel? + if let Some(chan) = self.feed_channels.get_mut(&feed_id) { + chan.send(message.clone()).await; + } + } + } + } +} \ No newline at end of file diff --git a/backend/telemetry/src/aggregator/mod.rs b/backend/telemetry/src/aggregator/mod.rs new file mode 100644 index 0000000..30622d3 --- /dev/null +++ b/backend/telemetry/src/aggregator/mod.rs @@ -0,0 +1,8 @@ +mod aggregator; +mod inner_loop; +mod find_location; + +// Expose the various message types that can be worked with externally: +pub use inner_loop::{ FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket }; + +pub use aggregator::*; \ No newline at end of file diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index 125b634..434f3b1 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -151,8 +151,8 @@ async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut }; let internal_msg = match msg { - ToShardWebsocket::Mute { local_id } => { - internal_messages::FromTelemetryCore::Mute { local_id } + ToShardWebsocket::Mute { local_id, reason } => { + internal_messages::FromTelemetryCore::Mute { local_id, reason } } }; diff --git a/backend/telemetry/src/state/chain.rs b/backend/telemetry/src/state/chain.rs index ce6fcc8..b4fb611 100644 --- a/backend/telemetry/src/state/chain.rs +++ b/backend/telemetry/src/state/chain.rs @@ -1,23 +1,25 @@ use std::sync::Arc; use std::collections::{ HashSet, HashMap }; use common::types::{ BlockHash }; -use common::internal_messages::{ GlobalId }; -use super::node::Node; -use common::types::{Block, NodeDetails, NodeId, NodeLocation, Timestamp}; +use common::types::{Block, NodeDetails, NodeLocation, Timestamp}; use common::util::{now, DenseMap, NumStats}; +use common::most_seen::{ MostSeen, self }; use common::node::Payload; use std::iter::IntoIterator; +use once_cell::sync::Lazy; + +use super::node::Node; +use super::NodeId; pub type ChainId = usize; -pub type Label = Arc; +pub type Label = Box; pub struct Chain { - /// Label of this chain, along with count of nodes that use this label - label: (Label, usize), - /// Chain genesis hash - genesis_hash: BlockHash, + /// Labels that nodes use for this chain. We keep track of + /// the most commonly used label as nodes are added/removed. + labels: MostSeen