Passing a callback isn't worth the extra code; just pass a feed thing

This commit is contained in:
James Wilson
2021-06-30 14:20:58 +01:00
parent 770739c7c8
commit 06bd660599
3 changed files with 42 additions and 140 deletions
+2 -48
View File
@@ -13,7 +13,7 @@ use std::{net::{IpAddr, Ipv4Addr}, str::FromStr};
use futures::channel::{ mpsc };
use futures::{ SinkExt, StreamExt };
use std::collections::{ HashMap, HashSet };
use crate::state::{ self, State, NodeId, OnUpdateNode };
use crate::state::{ self, State, NodeId };
use crate::feed_message::{ self, FeedMessageSerializer };
use crate::find_location;
@@ -304,54 +304,8 @@ impl InnerLoop {
async fn handle_from_shard_update(&mut self, node_id: NodeId, payload: node::Payload) {
let mut feed_message_serializer = FeedMessageSerializer::new();
let mut broadcast_finality = false;
self.node_state.update_node(node_id, payload, |msg| {
match msg {
OnUpdateNode::StaleNode(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::BestBlock(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::BestFinalized(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::ImportedBlock(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::Hardware(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::NodeStatsUpdate(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::NodeIOUpdate(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::AfgFinalized(to_feed) => {
feed_message_serializer.push(to_feed);
// All messages sent in an update leading to this message are only
// broadcast to feeds subscribed to chain finality info
broadcast_finality = true;
}
OnUpdateNode::AfgReceivedPrecommit(to_feed) => {
feed_message_serializer.push(to_feed);
// All messages sent in an update leading to this message are only
// broadcast to feeds subscribed to chain finality info
broadcast_finality = true;
}
OnUpdateNode::AfgReceivedPrevote(to_feed) => {
feed_message_serializer.push(to_feed);
// All messages sent in an update leading to this message are only
// broadcast to feeds subscribed to chain finality info
broadcast_finality = true;
}
OnUpdateNode::FinalizedBlock(to_feed) => {
feed_message_serializer.push(to_feed);
}
}
});
let broadcast_finality = self.node_state.update_node(node_id, payload, &mut feed_message_serializer);
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
let genesis_hash = *chain.genesis_hash();
+34 -83
View File
@@ -6,7 +6,7 @@ use common::most_seen::MostSeen;
use common::node::Payload;
use once_cell::sync::Lazy;
use crate::feed_message;
use crate::feed_message::{self, FeedMessageSerializer};
use super::node::Node;
use super::NodeId;
@@ -46,54 +46,6 @@ pub struct RemoveNodeResult {
pub chain_renamed: bool
}
/// When we update a node, we subscribe and receive various messages
/// about the update that take this form. The expectation is that we'll
/// push and broadcast these messages to feeds. This allows the caller to
/// retain control over exactly how/when that will happen.
pub enum OnUpdateNode<'a> {
StaleNode(feed_message::StaleNode),
BestBlock(feed_message::BestBlock),
BestFinalized(feed_message::BestFinalized),
ImportedBlock(feed_message::ImportedBlock<'a>),
Hardware(feed_message::Hardware<'a>),
NodeStatsUpdate(feed_message::NodeStatsUpdate<'a>),
NodeIOUpdate(feed_message::NodeIOUpdate<'a>),
AfgFinalized(feed_message::AfgFinalized),
AfgReceivedPrecommit(feed_message::AfgReceivedPrecommit),
AfgReceivedPrevote(feed_message::AfgReceivedPrevote),
FinalizedBlock(feed_message::FinalizedBlock)
}
macro_rules! into_on_update {
($name:ident) => {
impl <'a> From<feed_message::$name> for OnUpdateNode<'a> {
fn from(val: feed_message::$name) -> Self {
OnUpdateNode::$name(val)
}
}
}
}
macro_rules! into_on_update_lt {
($name:ident) => {
impl <'a> From<feed_message::$name<'a>> for OnUpdateNode<'a> {
fn from(val: feed_message::$name<'a>) -> Self {
OnUpdateNode::$name(val)
}
}
}
}
into_on_update!(StaleNode);
into_on_update!(BestBlock);
into_on_update!(BestFinalized);
into_on_update_lt!(ImportedBlock);
into_on_update_lt!(Hardware);
into_on_update_lt!(NodeStatsUpdate);
into_on_update_lt!(NodeIOUpdate);
into_on_update!(AfgFinalized);
into_on_update!(AfgReceivedPrecommit);
into_on_update!(AfgReceivedPrevote);
into_on_update!(FinalizedBlock);
/// Labels of chains we consider "first party". These chains allow any
/// number of nodes to connect.
static FIRST_PARTY_NETWORKS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
@@ -157,44 +109,44 @@ impl Chain {
}
/// Attempt to update the best block seen in this chain.
pub fn update_node<OnUpdate>(&mut self, all_nodes: &mut DenseMap<Node>, nid: NodeId, payload: Payload, mut on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
/// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false).
pub fn update_node(&mut self, all_nodes: &mut DenseMap<Node>, nid: NodeId, payload: Payload, feed: &mut FeedMessageSerializer) -> bool {
if let Some(block) = payload.best_block() {
self.handle_block(all_nodes, block, nid, &mut on_update);
self.handle_block(all_nodes, block, nid, feed);
}
if let Some(node) = all_nodes.get_mut(nid) {
match payload {
Payload::SystemInterval(ref interval) => {
if node.update_hardware(interval) {
on_update(feed_message::Hardware(nid, node.hardware()).into());
feed.push(feed_message::Hardware(nid, node.hardware()));
}
if let Some(stats) = node.update_stats(interval) {
on_update(feed_message::NodeStatsUpdate(nid, stats).into());
feed.push(feed_message::NodeStatsUpdate(nid, stats));
}
if let Some(io) = node.update_io(interval) {
on_update(feed_message::NodeIOUpdate(nid, io).into());
feed.push(feed_message::NodeIOUpdate(nid, io));
}
}
Payload::AfgAuthoritySet(authority) => {
node.set_validator_address(authority.authority_id.clone());
return;
return false;
}
Payload::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
on_update(feed_message::AfgFinalized(
feed.push(feed_message::AfgFinalized(
addr,
finalized_number,
finalized.finalized_hash,
).into());
));
}
}
return;
return true;
}
Payload::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) =
@@ -202,15 +154,15 @@ impl Chain {
{
if let Some(addr) = node.details().validator.clone() {
let voter = precommit.voter.clone();
on_update(feed_message::AfgReceivedPrecommit(
feed.push(feed_message::AfgReceivedPrecommit(
addr,
finalized_number,
precommit.target_hash,
voter,
).into());
));
}
}
return;
return true;
}
Payload::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) =
@@ -218,15 +170,15 @@ impl Chain {
{
if let Some(addr) = node.details().validator.clone() {
let voter = prevote.voter.clone();
on_update(feed_message::AfgReceivedPrevote(
feed.push(feed_message::AfgReceivedPrevote(
addr,
finalized_number,
prevote.target_hash,
voter,
).into());
));
}
}
return;
return true;
}
Payload::AfgReceivedCommit(_) => {}
_ => (),
@@ -234,29 +186,29 @@ impl Chain {
if let Some(block) = payload.finalized_block() {
if let Some(finalized) = node.update_finalized(block) {
on_update(feed_message::FinalizedBlock(
feed.push(feed_message::FinalizedBlock(
nid,
finalized.height,
finalized.hash,
).into());
));
if finalized.height > self.finalized.height {
self.finalized = *finalized;
on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into());
feed.push(feed_message::BestFinalized(finalized.height, finalized.hash));
}
}
}
}
false
}
fn handle_block<OnUpdate>(&mut self, all_nodes: &mut DenseMap<Node>, block: &Block, nid: NodeId, mut on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
fn handle_block(&mut self, all_nodes: &mut DenseMap<Node>, block: &Block, nid: NodeId, feed: &mut FeedMessageSerializer) {
let mut propagation_time = None;
let now = now();
let nodes_len = self.node_ids.len();
self.update_stale_nodes(all_nodes, now, &mut on_update);
self.update_stale_nodes(all_nodes, now, feed);
let node = match all_nodes.get_mut(nid) {
Some(node) => node,
@@ -278,11 +230,11 @@ impl Chain {
self.average_block_time = Some(self.block_times.average());
}
self.timestamp = Some(now);
on_update(feed_message::BestBlock(
feed.push(feed_message::BestBlock(
self.best.height,
now,
self.average_block_time,
).into());
));
propagation_time = Some(0);
} else if block.height == self.best.height {
if let Some(timestamp) = self.timestamp {
@@ -291,16 +243,15 @@ impl Chain {
}
if let Some(details) = node.update_details(now, propagation_time) {
on_update(feed_message::ImportedBlock(nid, details).into());
feed.push(feed_message::ImportedBlock(nid, details));
}
}
}
/// Check if the chain is stale (has not received a new best block in a while).
/// If so, find a new best block, ignoring any stale nodes and marking them as such.
fn update_stale_nodes<OnUpdate>(&mut self, all_nodes: &mut DenseMap<Node>, now: u64, mut on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
fn update_stale_nodes(&mut self, all_nodes: &mut DenseMap<Node>, now: u64, feed: &mut FeedMessageSerializer) {
let threshold = now - STALE_TIMEOUT;
let timestamp = match self.timestamp {
Some(ts) => ts,
@@ -331,7 +282,7 @@ impl Chain {
finalized = *node.finalized();
}
} else {
on_update(feed_message::StaleNode(nid).into());
feed.push(feed_message::StaleNode(nid));
}
}
@@ -341,12 +292,12 @@ impl Chain {
self.block_times.reset();
self.timestamp = timestamp;
on_update(feed_message::BestBlock(
feed.push(feed_message::BestBlock(
self.best.height,
timestamp.unwrap_or(now),
None,
).into());
on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into());
));
feed.push(feed_message::BestFinalized(finalized.height, finalized.hash));
}
}
+6 -9
View File
@@ -5,6 +5,7 @@ use common::types::{Block, NodeDetails, Timestamp};
use common::util::{DenseMap};
use common::node::Payload;
use std::iter::IntoIterator;
use crate::feed_message::FeedMessageSerializer;
use crate::find_location;
use super::chain::{ self, Chain };
@@ -52,9 +53,6 @@ pub struct NodeAddedToChain<'a> {
pub has_chain_label_changed: bool
}
/// During a node update, we get given various messages about the update
pub type OnUpdateNode<'a> = chain::OnUpdateNode<'a>;
/// if removing a node is successful, we get this information back.
pub struct RemovedNode {
/// How many nodes remain on the chain (0 if the chain was removed)
@@ -217,20 +215,19 @@ impl State {
}
/// Attempt to update the best block seen, given a node and block.
pub fn update_node<OnUpdate>(&mut self, node_id: NodeId, payload: Payload, on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
/// Returns a boolean which denotes whether the output is for finalization feeds (true) or not (false).
pub fn update_node(&mut self, node_id: NodeId, payload: Payload, feed: &mut FeedMessageSerializer) -> bool {
let chain_id = match self.chains_by_node.get(&node_id) {
Some(chain_id) => *chain_id,
None => { log::error!("Cannot find chain_id for node with ID {}", node_id); return }
None => { log::error!("Cannot find chain_id for node with ID {}", node_id); return false }
};
let chain = match self.chains.get_mut(chain_id) {
Some(chain) => chain,
None => { log::error!("Cannot find chain for node with ID {}", node_id); return }
None => { log::error!("Cannot find chain for node with ID {}", node_id); return false }
};
chain.update_node(&mut self.nodes, node_id, payload, on_update);
chain.update_node(&mut self.nodes, node_id, payload, feed)
}
/// Update the location for a node. Return `false` if the node was not found.