make it obvious that unbounded channels don't need to await, and tidy up some bits

This commit is contained in:
James Wilson
2021-07-12 10:01:23 +01:00
parent 4046edc47d
commit f2f122285e
5 changed files with 30 additions and 34 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
- name: Run tests
working-directory: ./backend
run: cargo test --verbose
run: cargo test --verbose --features e2e
- name: Build, release and call telemetry executable
working-directory: ./backend
+1
View File
@@ -5,6 +5,7 @@ This folder contains the rust crates and documentation specific to the telemetry
- [telemetry_core](./telemetry_core): The Telemetry Core. This aggregates data received from shards and allows UI feeds to connect and receive this information.
- [telemetry_shard](./telemetry_shard): A Shard. It's expected that multiple of these will run. Nodes will connect to Shard instances and send JSON telemetry to them, and Shard instances will each connect to the Telemetry Core and relay on relevant data to it.
- [common](./common): common code shared between the telemetry shard and core
- [test_utils](./test_utils): Test utilities, primarily focused around making it easy to run end-to-end tests.
- [docs](./docs): Material supporting the documentation lives here
# Architecture
+4
View File
@@ -5,6 +5,10 @@ authors = ["Parity Technologies Ltd. <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0"
[features]
# Enable this feature when running tests to also run the e2e tests:
e2e = []
[dependencies]
anyhow = "1.0.41"
bimap = "0.6.1"
@@ -203,8 +203,7 @@ impl InnerLoop {
self.finalize_and_broadcast_to_chain_feeds(
&chain_genesis_hash,
feed_message_serializer,
)
.await;
);
}
}
}
@@ -265,8 +264,7 @@ impl InnerLoop {
self.finalize_and_broadcast_to_chain_feeds(
&genesis_hash,
feed_messages_for_chain,
)
.await;
);
// Tell everybody about the new node count and potential rename:
let mut feed_messages_for_all = FeedMessageSerializer::new();
@@ -276,13 +274,12 @@ impl InnerLoop {
}
feed_messages_for_all
.push(feed_message::AddedChain(&new_chain_label, chain_node_count));
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all)
.await;
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all);
// Ask for the grographical location of the node.
// Currently we only geographically locate IPV4 addresses so ignore IPV6.
if let Some(IpAddr::V4(ip_v4)) = ip {
let _ = self.tx_to_locator.send((node_id, ip_v4)).await;
let _ = self.tx_to_locator.unbounded_send((node_id, ip_v4));
}
}
}
@@ -325,14 +322,12 @@ impl InnerLoop {
self.finalize_and_broadcast_to_chain_finality_feeds(
&genesis_hash,
feed_message_serializer,
)
.await;
);
} else {
self.finalize_and_broadcast_to_chain_feeds(
&genesis_hash,
feed_message_serializer,
)
.await;
);
}
}
}
@@ -346,8 +341,7 @@ impl InnerLoop {
.collect();
// ... and remove them:
self.remove_nodes_and_broadcast_result(node_ids_to_remove)
.await;
self.remove_nodes_and_broadcast_result(node_ids_to_remove).await;
}
}
}
@@ -506,11 +500,9 @@ impl InnerLoop {
&mut feed_messages_for_all,
);
}
self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_messages_for_chain)
.await;
self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_messages_for_chain);
}
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all)
.await;
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all);
}
/// Remove a single node by its ID, pushing any messages we'd want to send
@@ -555,19 +547,18 @@ impl InnerLoop {
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to feeds for the chain.
async fn finalize_and_broadcast_to_chain_feeds(
fn finalize_and_broadcast_to_chain_feeds(
&mut self,
genesis_hash: &BlockHash,
serializer: FeedMessageSerializer,
) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_chain_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes))
.await;
self.broadcast_to_chain_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes));
}
}
/// Send a message to all chain feeds.
async fn broadcast_to_chain_feeds(
fn broadcast_to_chain_feeds(
&mut self,
genesis_hash: &BlockHash,
message: ToFeedWebsocket,
@@ -575,41 +566,39 @@ impl InnerLoop {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
for &feed_id in feeds {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.send(message.clone()).await;
let _ = chan.unbounded_send(message.clone());
}
}
}
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to all feeds
async fn finalize_and_broadcast_to_all_feeds(&mut self, serializer: FeedMessageSerializer) {
fn finalize_and_broadcast_to_all_feeds(&mut self, serializer: FeedMessageSerializer) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes))
.await;
self.broadcast_to_all_feeds(ToFeedWebsocket::Bytes(bytes));
}
}
/// Send a message to everybody.
async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) {
fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) {
for chan in self.feed_channels.values_mut() {
let _ = chan.send(message.clone()).await;
let _ = chan.unbounded_send(message.clone());
}
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to chain finality feeds
async fn finalize_and_broadcast_to_chain_finality_feeds(
fn finalize_and_broadcast_to_chain_finality_feeds(
&mut self,
genesis_hash: &BlockHash,
serializer: FeedMessageSerializer,
) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes))
.await;
self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes));
}
}
/// Send a message to all chain finality feeds.
async fn broadcast_to_chain_finality_feeds(
fn broadcast_to_chain_finality_feeds(
&mut self,
genesis_hash: &BlockHash,
message: ToFeedWebsocket,
@@ -619,7 +608,7 @@ impl InnerLoop {
// are also subscribed to receive finality updates.
for &feed_id in feeds.union(&self.feed_conn_id_finality) {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.send(message.clone()).await;
let _ = chan.unbounded_send(message.clone());
}
}
}
+3 -1
View File
@@ -1,3 +1,5 @@
#![cfg(feature = "e2e")]
use test_utils::{feed_message_de::FeedMessage, server::Server};
// use serde_json::json;
@@ -24,4 +26,4 @@ async fn can_ping_feed() {
// Tidy up:
server.shutdown().await;
}
}