Remove broken "Consensus" tab from UI and related code (#434)

* Remove consensus-tab related stuff, and unused messages, from the telemetry backend

* also remove AfgAuthoritySet feed message, and handle same from node

* Blat everything consensus related that I can find in the UI

* cargo fmt

* README: had -> has

Co-authored-by: David <dvdplm@gmail.com>

Co-authored-by: David <dvdplm@gmail.com>
This commit is contained in:
James Wilson
2021-11-25 12:34:36 +00:00
committed by GitHub
parent 73a4cf29b9
commit 17432d712f
26 changed files with 57 additions and 2021 deletions
@@ -25,7 +25,7 @@ use common::{
node_types::BlockHash,
time, MultiMapUnique,
};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
@@ -95,10 +95,6 @@ pub enum FromFeedWebsocket {
/// The feed can subscribe to a chain to receive
/// messages relating to it.
Subscribe { chain: BlockHash },
/// The feed wants finality info for the chain, too.
SendFinality,
/// The feed doesn't want any more finality info for the chain.
NoMoreFinality,
/// An explicit ping message.
Ping { value: Box<str> },
/// The feed is disconnected.
@@ -114,8 +110,6 @@ pub struct Metrics {
pub chains_subscribed_to: usize,
/// Number of subscribed feeds.
pub subscribed_feeds: usize,
/// Number of subscribed feeds that also asked for finality information.
pub subscribed_finality_feeds: usize,
/// How many messages are currently queued up in internal channels
/// waiting to be sent out to feeds.
pub total_messages_to_feeds: usize,
@@ -148,8 +142,6 @@ impl FromStr for FromFeedWebsocket {
"subscribe" => Ok(FromFeedWebsocket::Subscribe {
chain: value.parse()?,
}),
"send-finality" => Ok(FromFeedWebsocket::SendFinality),
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality),
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)),
}
}
@@ -178,9 +170,6 @@ pub struct InnerLoop {
/// Which feeds are subscribed to a given chain?
chain_to_feed_conn_ids: MultiMapUnique<BlockHash, ConnId>,
/// These feeds want finality info, too.
feed_conn_id_finality: HashSet<ConnId>,
/// Send messages here to make geographical location requests.
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
@@ -203,7 +192,6 @@ impl InnerLoop {
feed_channels: HashMap::new(),
shard_channels: HashMap::new(),
chain_to_feed_conn_ids: MultiMapUnique::new(),
feed_conn_id_finality: HashSet::new(),
tx_to_locator,
max_queue_len,
}
@@ -282,7 +270,6 @@ impl InnerLoop {
let connected_nodes = self.node_ids.len();
let subscribed_feeds = self.chain_to_feed_conn_ids.num_values();
let chains_subscribed_to = self.chain_to_feed_conn_ids.num_keys();
let subscribed_finality_feeds = self.feed_conn_id_finality.len();
let connected_shards = self.shard_channels.len();
let connected_feeds = self.feed_channels.len();
let total_messages_to_feeds: usize = self.feed_channels.values().map(|c| c.len()).sum();
@@ -292,7 +279,6 @@ impl InnerLoop {
timestamp_unix_ms,
chains_subscribed_to,
subscribed_feeds,
subscribed_finality_feeds,
total_messages_to_feeds,
current_messages_to_aggregator,
total_messages_to_aggregator,
@@ -429,23 +415,15 @@ impl InnerLoop {
};
let mut feed_message_serializer = FeedMessageSerializer::new();
let broadcast_finality =
self.node_state
.update_node(node_id, payload, &mut feed_message_serializer);
self.node_state
.update_node(node_id, payload, &mut feed_message_serializer);
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
let genesis_hash = chain.genesis_hash();
if broadcast_finality {
self.finalize_and_broadcast_to_chain_finality_feeds(
&genesis_hash,
feed_message_serializer,
);
} else {
self.finalize_and_broadcast_to_chain_feeds(
&genesis_hash,
feed_message_serializer,
);
}
self.finalize_and_broadcast_to_chain_feeds(
&genesis_hash,
feed_message_serializer,
);
}
}
FromShardWebsocket::Disconnected => {
@@ -509,9 +487,6 @@ impl InnerLoop {
// Unsubscribe from previous chain if subscribed to one:
let old_genesis_hash = self.chain_to_feed_conn_ids.remove_value(&feed_conn_id);
// Untoggle request for finality feeds:
self.feed_conn_id_finality.remove(&feed_conn_id);
// Get old chain if there was one:
let node_state = &self.node_state;
let old_chain =
@@ -583,17 +558,10 @@ impl InnerLoop {
self.chain_to_feed_conn_ids
.insert(new_genesis_hash, feed_conn_id);
}
FromFeedWebsocket::SendFinality => {
self.feed_conn_id_finality.insert(feed_conn_id);
}
FromFeedWebsocket::NoMoreFinality => {
self.feed_conn_id_finality.remove(&feed_conn_id);
}
FromFeedWebsocket::Disconnected => {
// The feed has disconnected; clean up references to it:
self.chain_to_feed_conn_ids.remove_value(&feed_conn_id);
self.feed_channels.remove(&feed_conn_id);
self.feed_conn_id_finality.remove(&feed_conn_id);
}
}
}
@@ -706,32 +674,4 @@ impl InnerLoop {
let _ = chan.send(message.clone());
}
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to chain finality feeds
fn finalize_and_broadcast_to_chain_finality_feeds(
&mut self,
genesis_hash: &BlockHash,
serializer: FeedMessageSerializer,
) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes));
}
}
/// Send a message to all chain finality feeds.
fn broadcast_to_chain_finality_feeds(
&mut self,
genesis_hash: &BlockHash,
message: ToFeedWebsocket,
) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get_values(genesis_hash) {
// Get all feeds for the chain, but only broadcast to those feeds that
// are also subscribed to receive finality updates.
for &feed_id in feeds.union(&self.feed_conn_id_finality) {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.send(message.clone());
}
}
}
}
}
+2 -33
View File
@@ -25,7 +25,6 @@ use common::node_types::{
};
use serde_json::to_writer;
type Address = Box<str>;
type FeedNodeId = usize;
pub trait FeedMessage {
@@ -119,10 +118,8 @@ actions! {
13: SubscribedTo,
14: UnsubscribedFrom,
15: Pong<'_>,
16: AfgFinalized,
17: AfgReceivedPrevote,
18: AfgReceivedPrecommit,
19: AfgAuthoritySet,
// Note; some now-unused messages were removed between IDs 15 and 20.
// We maintain existing IDs for backward compatibility.
20: StaleNode,
21: NodeIOUpdate<'_>,
}
@@ -177,34 +174,6 @@ pub struct UnsubscribedFrom(pub BlockHash);
#[derive(Serialize)]
pub struct Pong<'a>(pub &'a str);
#[derive(Serialize)]
pub struct AfgFinalized(pub Address, pub BlockNumber, pub BlockHash);
#[derive(Serialize)]
pub struct AfgReceivedPrevote(
pub Address,
pub BlockNumber,
pub BlockHash,
pub Option<Address>,
);
#[derive(Serialize)]
pub struct AfgReceivedPrecommit(
pub Address,
pub BlockNumber,
pub BlockHash,
pub Option<Address>,
);
#[derive(Serialize)]
pub struct AfgAuthoritySet(
pub Address,
pub Address,
pub Address,
pub BlockNumber,
pub BlockHash,
);
#[derive(Serialize)]
pub struct StaleNode(pub FeedNodeId);
-5
View File
@@ -539,11 +539,6 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper:
"telemetry_core_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_feeds, m.timestamp_unix_ms
);
let _ = write!(
&mut s,
"telemetry_core_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_finality_feeds, m.timestamp_unix_ms
);
let _ = write!(
&mut s,
"telemetry_core_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n",
+9 -51
View File
@@ -15,8 +15,8 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use common::node_message::Payload;
use common::node_types::BlockHash;
use common::node_types::{Block, Timestamp};
use common::node_types::{BlockHash, BlockNumber};
use common::{id_type, time, DenseMap, MostSeen, NumStats};
use once_cell::sync::Lazy;
use std::collections::HashSet;
@@ -149,13 +149,12 @@ impl Chain {
}
/// Attempt to update the best block seen in this chain.
/// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false).
pub fn update_node(
&mut self,
nid: ChainNodeId,
payload: Payload,
feed: &mut FeedMessageSerializer,
) -> bool {
) {
if let Some(block) = payload.best_block() {
self.handle_block(block, nid, feed);
}
@@ -163,65 +162,26 @@ impl Chain {
if let Some(node) = self.nodes.get_mut(nid) {
match payload {
Payload::SystemInterval(ref interval) => {
// Send a feed message if any of the relevant node details change:
if node.update_hardware(interval) {
feed.push(feed_message::Hardware(nid.into(), node.hardware()));
}
if let Some(stats) = node.update_stats(interval) {
feed.push(feed_message::NodeStatsUpdate(nid.into(), stats));
}
if let Some(io) = node.update_io(interval) {
feed.push(feed_message::NodeIOUpdate(nid.into(), io));
}
}
Payload::AfgAuthoritySet(authority) => {
node.set_validator_address(authority.authority_id.clone());
return false;
}
Payload::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
feed.push(feed_message::AfgFinalized(
addr,
finalized_number,
finalized.finalized_hash,
));
}
// If our node validator address (and thus details) change, send an
// updated "add node" feed message:
if node.set_validator_address(authority.authority_id.clone()) {
feed.push(feed_message::AddedNode(nid.into(), &node));
}
return true;
return;
}
Payload::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) = precommit.target_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() {
let voter = precommit.voter.clone();
feed.push(feed_message::AfgReceivedPrecommit(
addr,
finalized_number,
precommit.target_hash,
voter,
));
}
}
return true;
}
Payload::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) = prevote.target_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() {
let voter = prevote.voter.clone();
feed.push(feed_message::AfgReceivedPrevote(
addr,
finalized_number,
prevote.target_hash,
voter,
));
}
}
return true;
}
Payload::AfgReceivedCommit(_) => {}
_ => (),
_ => {}
}
if let Some(block) = payload.finalized_block() {
@@ -242,8 +202,6 @@ impl Chain {
}
}
}
false
}
fn handle_block(&mut self, block: &Block, nid: ChainNodeId, feed: &mut FeedMessageSerializer) {
+7 -2
View File
@@ -213,8 +213,13 @@ impl Node {
self.stale
}
pub fn set_validator_address(&mut self, addr: Box<str>) {
self.details.validator = Some(addr);
pub fn set_validator_address(&mut self, addr: Box<str>) -> bool {
if self.details.validator.as_ref() == Some(&addr) {
false
} else {
self.details.validator = Some(addr);
true
}
}
pub fn startup_time(&self) -> Option<Timestamp> {
+2 -3
View File
@@ -213,18 +213,17 @@ impl State {
}
/// Attempt to update the best block seen, given a node and block.
/// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false).
pub fn update_node(
&mut self,
NodeId(chain_id, chain_node_id): NodeId,
payload: Payload,
feed: &mut FeedMessageSerializer,
) -> bool {
) {
let chain = match self.chains.get_mut(chain_id) {
Some(chain) => chain,
None => {
log::error!("Cannot find chain for node with ID {:?}", chain_id);
return false;
return;
}
};