diff --git a/README.md b/README.md index a9c01f5..4c8d8b6 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,7 @@ If you'd like to get things runing manually using Docker, you can do the followi substrate-telemetry-frontend ``` - **NOTE:** Here we used `SUBSTRATE_TELEMETRY_URL=ws://localhost:8000/feed`. This will work if you test with everything running locally on your machine but NOT if your backend runs on a remote server. Keep in mind that the frontend docker image is serving a static site running your browser. The `SUBSTRATE_TELEMETRY_URL` is the WebSocket url that your browser will use to reach the backend. Say your backend runs on a remore server at `192.168.0.100`, you will need to set the IP/url accordingly in `SUBSTRATE_TELEMETRY_URL`. + **NOTE:** Here we used `SUBSTRATE_TELEMETRY_URL=ws://localhost:8000/feed`. This will work if you test with everything running locally on your machine but NOT if your backend runs on a remote server. Keep in mind that the frontend docker image is serving a static site running your browser. The `SUBSTRATE_TELEMETRY_URL` is the WebSocket url that your browser will use to reach the backend. Say your backend runs on a remote server at `foo.example.com`, you will need to set the IP/url accordingly in `SUBSTRATE_TELEMETRY_URL` (in this case, to `ws://foo.example.com/feed`). With these running, you'll be able to navigate to [http://localhost:3000](http://localhost:3000) to view the UI. If you'd like to connect a node and have it send telemetry to your running shard, you can run the following: diff --git a/backend/common/src/byte_size.rs b/backend/common/src/byte_size.rs index 0f6922d..f9b1d9b 100644 --- a/backend/common/src/byte_size.rs +++ b/backend/common/src/byte_size.rs @@ -23,7 +23,8 @@ impl ByteSize { pub fn new(bytes: usize) -> ByteSize { ByteSize(bytes) } - pub fn into_bytes(self) -> usize { + /// Return the number of bytes stored within. + pub fn num_bytes(self) -> usize { self.0 } } @@ -101,7 +102,7 @@ mod test { for (s, expected) in cases { let b: ByteSize = s.parse().unwrap(); - assert_eq!(b.into_bytes(), expected); + assert_eq!(b.num_bytes(), expected); } } } diff --git a/backend/common/src/either_sink.rs b/backend/common/src/either_sink.rs index 6e911af..e823fb6 100644 --- a/backend/common/src/either_sink.rs +++ b/backend/common/src/either_sink.rs @@ -11,7 +11,7 @@ pin_project! { /// A simple enum that delegates implementation to one of /// the two possible sinks contained within. -impl EitherSink { +impl EitherSink { pub fn a(val: A) -> Self { EitherSink::A { inner: val } } @@ -20,38 +20,47 @@ impl EitherSink { } } -impl Sink for EitherSink +impl Sink for EitherSink where A: Sink, - B: Sink + B: Sink, { type Error = Error; - fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { match self.project() { - EitherSinkInner::A{ inner } => inner.poll_ready(cx), - EitherSinkInner::B{ inner } => inner.poll_ready(cx) + EitherSinkInner::A { inner } => inner.poll_ready(cx), + EitherSinkInner::B { inner } => inner.poll_ready(cx), } } fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { match self.project() { - EitherSinkInner::A{ inner } => inner.start_send(item), - EitherSinkInner::B{ inner } => inner.start_send(item) + EitherSinkInner::A { inner } => inner.start_send(item), + EitherSinkInner::B { inner } => inner.start_send(item), } } - fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { match self.project() { - EitherSinkInner::A{ inner } => inner.poll_flush(cx), - EitherSinkInner::B{ inner } => inner.poll_flush(cx) + EitherSinkInner::A { inner } => inner.poll_flush(cx), + EitherSinkInner::B { inner } => inner.poll_flush(cx), } } - fn poll_close(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { match self.project() { - EitherSinkInner::A{ inner } => inner.poll_close(cx), - EitherSinkInner::B{ inner } => inner.poll_close(cx) + EitherSinkInner::A { inner } => inner.poll_close(cx), + EitherSinkInner::B { inner } => inner.poll_close(cx), } } } diff --git a/backend/common/src/http_utils.rs b/backend/common/src/http_utils.rs index e53dd23..4f4ecfd 100644 --- a/backend/common/src/http_utils.rs +++ b/backend/common/src/http_utils.rs @@ -153,7 +153,7 @@ fn header_contains_value( pub fn trim(x: &[u8]) -> &[u8] { let from = match x.iter().position(|x| !x.is_ascii_whitespace()) { Some(i) => i, - None => return &x[0..0], + None => return &[], }; let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap(); &x[from..=to] diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index cd7013b..85e14fa 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -27,15 +27,15 @@ pub mod ws_client; mod assign_id; mod dense_map; +mod either_sink; mod mean_list; mod most_seen; mod num_stats; -mod either_sink; // Export a bunch of common bits at the top level for ease of import: pub use assign_id::AssignId; pub use dense_map::DenseMap; +pub use either_sink::EitherSink; pub use mean_list::MeanList; pub use most_seen::MostSeen; pub use num_stats::NumStats; -pub use either_sink::EitherSink; diff --git a/backend/common/src/most_seen.rs b/backend/common/src/most_seen.rs index bc38d8b..05e5987 100644 --- a/backend/common/src/most_seen.rs +++ b/backend/common/src/most_seen.rs @@ -28,6 +28,8 @@ pub struct MostSeen { impl Default for MostSeen { fn default() -> Self { + // This sets the "most seen item" to the default value for the type, + // and notes that nobody has actually seen it yet (current_count is 0). Self { current_best: T::default(), current_count: 0, @@ -38,6 +40,9 @@ impl Default for MostSeen { impl MostSeen { pub fn new(item: T) -> Self { + // This starts us off with an item that we've seen. This item is set as + // the "most seen item" and the current_count is set to 1, as we've seen it + // once by virtue of providing it here. Self { current_best: item, current_count: 1, diff --git a/backend/common/src/node_types.rs b/backend/common/src/node_types.rs index bcfebb5..d06131f 100644 --- a/backend/common/src/node_types.rs +++ b/backend/common/src/node_types.rs @@ -26,7 +26,7 @@ pub type BlockNumber = u64; pub type Timestamp = u64; pub use primitive_types::H256 as BlockHash; -/// +/// Basic node details. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct NodeDetails { pub chain: Box, @@ -38,13 +38,22 @@ pub struct NodeDetails { pub startup_time: Option>, } -/// +/// A couple of node statistics. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct NodeStats { pub peers: u64, pub txcount: u64, } +// # A note about serialization/deserialization of types in this file: +// +// Some of the types here are sent to UI feeds. In an effort to keep the +// amount of bytes sent to a minimum, we have written custom serializers +// for those types. +// +// For testing purposes, it's useful to be able to deserialize from some +// of these types so that we can test message feed things, so custom +// deserializers exist to undo the work of the custom serializers. impl Serialize for NodeStats { fn serialize(&self, serializer: S) -> Result where @@ -67,7 +76,7 @@ impl<'de> Deserialize<'de> for NodeStats { } } -/// +/// Node IO details. #[derive(Default)] pub struct NodeIO { pub used_state_cache_size: MeanList, @@ -85,7 +94,7 @@ impl Serialize for NodeIO { } } -/// +/// Concise block details #[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq)] pub struct Block { pub hash: BlockHash, @@ -101,7 +110,7 @@ impl Block { } } -/// +/// Node hardware details. #[derive(Default)] pub struct NodeHardware { /// Upload uses means @@ -126,7 +135,7 @@ impl Serialize for NodeHardware { } } -/// +/// Node location details #[derive(Debug, Clone, PartialEq)] pub struct NodeLocation { pub latitude: f32, @@ -161,7 +170,7 @@ impl<'de> Deserialize<'de> for NodeLocation { } } -/// +/// Verbose block details #[derive(Debug, Clone, Copy, PartialEq)] pub struct BlockDetails { pub block: Block, diff --git a/backend/common/src/ready_chunks_all.rs b/backend/common/src/ready_chunks_all.rs index ef2d274..974c3ec 100644 --- a/backend/common/src/ready_chunks_all.rs +++ b/backend/common/src/ready_chunks_all.rs @@ -72,7 +72,7 @@ impl Stream for ReadyChunksAll { return if this.items.is_empty() { Poll::Pending } else { - Poll::Ready(Some(mem::replace(this.items, Vec::new()))) + Poll::Ready(Some(mem::take(this.items))) } } @@ -87,7 +87,7 @@ impl Stream for ReadyChunksAll { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index f101c47..0ec6256 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -36,8 +36,8 @@ pub type RawReceiver = /// A websocket connection. From this, we can either expose the raw connection /// or expose a cancel-safe interface to it. pub struct Connection { - tx: soketto::connection::Sender>, - rx: soketto::connection::Receiver>, + tx: RawSender, + rx: RawReceiver, } impl Connection { diff --git a/backend/telemetry_core/benches/subscribe.rs b/backend/telemetry_core/benches/subscribe.rs index 9b3c7de..390efa3 100644 --- a/backend/telemetry_core/benches/subscribe.rs +++ b/backend/telemetry_core/benches/subscribe.rs @@ -78,7 +78,7 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { // Give those messages a chance to be handled. This, of course, // assumes that those messages _can_ be handled this quickly. If not, - // we'll start to skew benchmark results with the "time taklen to add node". + // we'll start to skew benchmark results with the "time taken to add node". tokio::time::sleep(Duration::from_millis(250)).await; // Start a bunch of feeds: diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index f3b916c..8ebfa25 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -1,10 +1,10 @@ use super::aggregator::Aggregator; use super::inner_loop; +use common::EitherSink; use futures::{Sink, SinkExt, StreamExt}; use inner_loop::FromShardWebsocket; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use common::EitherSink; #[derive(Clone)] pub struct AggregatorSet(Arc); @@ -42,7 +42,7 @@ impl AggregatorSet { // if we don't actually need it. if self.0.aggregators.len() == 1 { let sub = self.0.aggregators[0].subscribe_shard(); - return EitherSink::a(sub) + return EitherSink::a(sub); } let mut conns: Vec<_> = self diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 813c740..6cfec73 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -438,7 +438,7 @@ impl InnerLoop { // (which is helpful for the UI as it tries to maintain a sorted list of nodes). The chunk // size is the max number of node info we fit into 1 message; smaller messages allow the UI // to react a little faster and not have to wait for a larger update to come in. A chunk size - // of 64 means each mesage is ~32k. + // of 64 means each message is ~32k. use rayon::prelude::*; let all_feed_messages: Vec<_> = new_chain .nodes_slice() diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index 00a737a..680a123 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -110,7 +110,7 @@ impl Locator { // Return location quickly if it's cached: let cached_loc = { let cache_reader = self.cache.read(); - cache_reader.get(&ip).map(|o| o.clone()) + cache_reader.get(&ip).cloned() }; if let Some(loc) = cached_loc { return Ok(loc); diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index b02b3f3..e7f1414 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -213,7 +213,7 @@ where // connection anyway. let msg_info = tokio::select! { msg_info = ws_recv.receive_data(&mut bytes) => msg_info, - _ = &mut recv_closer_rx => { break } + _ = &mut recv_closer_rx => break }; // Handle the socket closing, or errors receiving the message. diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 722d173..4e62d76 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -191,7 +191,7 @@ where { // Limit the number of bytes based on a rolling total and the incoming bytes per second // that has been configured via the CLI opts. - let bytes_per_second = bytes_per_second.into_bytes(); + let bytes_per_second = bytes_per_second.num_bytes(); let mut rolling_total_bytes = RollingTotalBuilder::new() .granularity(Duration::from_secs(1)) .window_size_multiple(10)